1 /** 2 from: 3 https://github.com/jnorwood/file_parallel/blob/master/unzip_parallel.d 4 */ 5 6 module unzip_parallel; 7 import std.file; 8 import std.path; 9 import std.parallelism; 10 import std.algorithm; 11 import std.stdio; 12 import std.stream; 13 import std.datetime; 14 import std.zlib; 15 16 // unzip a zip file in parallel. 17 // Creates the destination directory if it doesn't exist. 18 // Overwrites if any destination files exist, other than the root destDir 19 // This is derived from std.zip operations, which is has 32 bit limitations, 20 // i.e. 4GB archive entries. The original std.zip operations have been split up 21 // to enable parallel operation when doing the deflate, as well as to attempt to reduce the 22 // peak memory use while processing a large zip file. It does not break 23 // individual zip archive entries up so there is the possibility of running out of 24 // memory if you have a large individual entry in the zip file. 25 26 class UzdException : Exception 27 { 28 this(string msg) 29 { 30 super("UzdException: " ~ msg); 31 } 32 } 33 34 void unzipParallel (string pathname , string destDir){ 35 DirEntry deSrc = DirEntry(pathname); 36 string [] files; 37 38 if (!exists(destDir)){ 39 mkdirRecurse (destDir); // makes dest root and all required parents 40 } 41 DirEntry destDe = DirEntry(destDir); 42 if(!destDe.isDir()){ 43 throw new FileException( destDe.name, " is not a directory"); 44 } 45 string destName = destDe.name ~ '/'; 46 47 // this is needed when doing the modification time corrections 48 immutable TimeZone tz = LocalTime(); 49 50 if(!deSrc.isDir()){ 51 auto f = new std.stream.File(deSrc.name, FileMode.In); 52 ZipArchive2 za; 53 ArchiveMember2[] directoryFull = getArchiveDirectory(za,f); 54 ArchiveMember2[] directory = directoryFull; 55 56 // for the folder entries (ending in /), just create the folder 57 // if it doesn't already exist 58 // This is not done in parallel, but is a very short time relative to the 59 // file creations for regular files... on the order of 2% of the total task 60 // A small improvement might be to do these depth first, since the recursive 61 // action would then create multiple directory entries in parallel. 62 63 // Note that we aren't updatign the folder times until later, after all the files have 64 // been added 65 foreach (ref ArchiveMember2 am; directory) 66 { 67 if (am.name[$-1]=='/'){ 68 string destFolder = destName ~ am.name; 69 if (!exists(destFolder)){ 70 mkdirRecurse (destFolder); // makes dest root and all required parents 71 } 72 } 73 } 74 75 // ok, this is just some chunk size so we don't try to process all the files at once 76 enum CHUNK_SZ = 40_000_000; 77 78 while (directory.length!=0){ 79 ulong len = 0; 80 81 // this will be a slice of the directory that will be inflated in parallel 82 ArchiveMember2[] subd; 83 84 // read the compressed data for some number of entries into the am structure. 85 // Limit the number of entries processed by the cumulative compressed length. 86 // The read of compressed data from the file is not done in parallel. 87 foreach (int j, ref ArchiveMember2 am; directory) 88 { 89 // The zip folder name entries all end with '/'. Exclude them 90 if (am.name[$-1]!='/'){ 91 // we are reading these per entry. That is a lot of small seeks. 92 // It might be faster to sum up the sizes and read the chunk at once, 93 // but we would also need to pre-sort the directory entries by offset. 94 readCompressedData(za.endrecOffset,am,f); 95 len += am.compressedSize; 96 if (len > CHUNK_SZ){ 97 subd = directory[0..j+1]; 98 directory = directory[j+1..$]; 99 break; 100 } 101 } 102 } 103 104 // This handles the last block of the directory 105 if (len <= CHUNK_SZ){ 106 subd = directory; 107 directory = directory[$..$]; 108 } 109 110 // parallel foreach for inflation of the regular files 111 foreach(ref ArchiveMember2 am; taskPool.parallel(subd,1)) { 112 // again, this excludes folder name entries, which end in '/' 113 if (am.name[$-1]!='/'){ 114 // this call does the inflation of the compressed data 115 expand2(am); 116 117 // now create the destination filename and write it out 118 // looks like the std.file.write is limited to 4GB 119 // Could we handle larger files if expansion done in fragments? 120 string destFilename = destName ~ am.name; 121 std.file.write(destFilename,am.expandedData); 122 123 // update the file's modification time based on the zip data 124 SysTime st = DosFileTimeToSysTime(am.time, tz); 125 //std.file.setTimes(destFilename, st, st); 126 setTimes(destFilename, st, st); 127 128 // garbage collector didn't do this for some reason? 129 // Probably something with the c call for the inflate in expand2 130 delete(am.expandedData); 131 am.expandedData = null; 132 } 133 } 134 } 135 // parallel foreach to set timestamp on folders 136 // This requires a fix to the issue at this link ... errors when setting ts on folders 137 // http://d.puremagic.com/issues/show_bug.cgi?id=7819 138 // There is a working fix provided in that issue 139 foreach(ref ArchiveMember2 am; taskPool.parallel(directoryFull,100)) { 140 if (am.name[$-1]=='/'){ 141 string folderName = destName ~ am.name[0..$-1]; // trim the trailing / 142 SysTime st = DosFileTimeToSysTime(am.time, tz); 143 // This setTimes call currently throws an error when trying to set times on folders 144 // but a fix in setTimes allows it. Uncomment the line below 145 // to enable restore of timestamps on folders when that is fixed 146 // setTimes(folderName, st, st); 147 } 148 } 149 } 150 else { 151 throw new FileException( deSrc.name, " needs to be a regular zip archive, not a folder"); 152 } 153 } 154 155 /* ============ Reading the zip archive directory from near file end =================== */ 156 157 /** 158 * 159 * Fills in the property endrecOffset in za reference. 160 * For each ArchiveMember2 structure in the directory, fills in 161 * properties offset, compressionMethod, time, 162 * crc32, compressedSize, expandedSize, name[], 163 * Use readCompressedData() later to fill in the compressedData 164 * Use expand2() later to uncompress the data for each ArchiveMember2. 165 * 166 * Params: 167 * za = reference to the ZipArchive2 struncture. za.endrecOffset will be set 168 */ 169 170 // a zip number related to the search for the directory at end of zip file 171 enum ZIP_MAGIC_66000 = 66000; 172 173 ArchiveMember2[] getArchiveDirectory (ref ZipArchive2 za, std.stream.File f) 174 { ptrdiff_t iend; 175 ptrdiff_t i; 176 ptrdiff_t endcommentlength; 177 size_t directorySize; 178 size_t directoryOffset; 179 180 ulong flen = f.size(); 181 182 // just read the directory at the end of the file 183 long fst; 184 uint fsz; 185 if (flen > ZIP_MAGIC_66000){ 186 fst = flen - ZIP_MAGIC_66000; 187 fsz = ZIP_MAGIC_66000; 188 f.seek (fst,SeekPos.Set); 189 } 190 else{ 191 fst = 0; 192 fsz = flen & 0x1ffff; 193 } 194 195 auto data = new ubyte[fsz]; 196 197 // some utility functions that reference the local auto data just created 198 ushort getUshort(size_t i) 199 { 200 version (LittleEndian) 201 { 202 return *cast(ushort *)&data[i]; 203 } 204 else 205 { 206 ubyte b0 = data[i]; 207 ubyte b1 = data[i + 1]; 208 return (b1 << 8) | b0; 209 } 210 } 211 212 uint getUint(size_t i) 213 { 214 version (LittleEndian) 215 { 216 return *cast(uint *)&data[i]; 217 } 218 else 219 { 220 return bswap(*cast(uint *)&data[i]); 221 } 222 } 223 224 if (data.length > 0) 225 f.read(data); 226 227 //this.data = cast(ubyte[]) buffer; 228 229 // Find 'end record index' by searching backwards for signature 230 iend = data.length - ZIP_MAGIC_66000; 231 if (iend < 0) 232 iend = 0; 233 for (i = data.length - 22; 1; i--) 234 { 235 if (i < iend) 236 throw new UzdException("no end record"); 237 238 if (data[i .. i + 4] == cast(ubyte[])"PK\x05\x06") 239 { 240 endcommentlength = getUshort(i + 20); 241 if (i + 22 + endcommentlength > data.length) 242 continue; 243 //za.comment = cast(string)(data[i + 22 .. i + 22 + endcommentlength]); 244 za.endrecOffset = fst+i; 245 break; 246 } 247 } 248 249 250 // Read end record data 251 // not needed za.diskNumber = getUshort(i + 4); 252 // not needed za.diskStartDir = getUshort(i + 6); 253 254 // changed these to local vars since the returned directory array has its own size 255 uint numEntries = getUshort(i + 8); 256 uint totalEntries = getUshort(i + 10); 257 258 if (numEntries != totalEntries) 259 throw new UzdException("multiple disk zips not supported"); 260 261 directorySize = getUint(i + 12); 262 directoryOffset = getUint(i + 16); 263 264 if (directoryOffset + directorySize > flen) 265 throw new UzdException("corrupted directory"); 266 267 f.seek (directoryOffset,SeekPos.Set); 268 data = new ubyte[directorySize]; 269 if (data.length >0) 270 f.read(data); 271 i=0; 272 273 ArchiveMember2[] directory = new ArchiveMember2[numEntries]; 274 275 foreach (ref de; directory) 276 { 277 /* The format of an entry is: 278 * 'PK' 1, 2 279 * directory info 280 * path 281 * extra data 282 * comment 283 */ 284 285 uint offset; 286 uint namelen; 287 uint extralen; 288 uint commentlen; 289 290 if (data[i .. i + 4] != cast(ubyte[])"PK\x01\x02") 291 throw new UzdException("invalid directory entry 1"); 292 //de.madeVersion = getUshort(i + 4); 293 //de.extractVersion = getUshort(i + 6); 294 de.flags = getUshort(i + 8); 295 de.compressionMethod = getUshort(i + 10); 296 //DosFileTimeToSysTime may be needed to put in form that can be use to restore file time 297 de.time = cast(DosFileTime)getUint(i + 12); 298 de.crc32 = getUint(i + 16); 299 de.compressedSize = getUint(i + 20); 300 de.expandedSize = getUint(i + 24); 301 namelen = getUshort(i + 28); 302 extralen = getUshort(i + 30); 303 commentlen = getUshort(i + 32); 304 //de.diskNumber = getUshort(i + 34); 305 //de.internalAttributes = getUshort(i + 36); 306 //de.externalAttributes = getUint(i + 38); 307 de.offset = getUint(i + 42); 308 i += 46; 309 310 if (i + namelen + extralen + commentlen > directoryOffset + directorySize) 311 throw new UzdException("invalid directory entry 2"); 312 313 de.name = cast(string)(data[i .. i + namelen]); 314 i += namelen; 315 //de.extra = data[i .. i + extralen]; 316 i += extralen; 317 //de.comment = cast(string)(data[i .. i + commentlen]); 318 i += commentlen; 319 } 320 if (i != directorySize) 321 throw new UzdException("invalid directory entry 3"); 322 return directory; 323 } 324 325 /***** 326 * get the compressed data into de.compressedData 327 * 328 * Could also compare the other properties from the directory, 329 * but those are commented out for now . 330 */ 331 void readCompressedData(ulong endrecOffset, ref ArchiveMember2 de, std.stream.File f) 332 { 333 uint namelen; 334 uint extralen; 335 f.seek(de.offset,SeekPos.Set); 336 auto data = new ubyte[30]; 337 if (data.length >0) 338 f.read(data); 339 340 /* ============ Utility operations that work on the local data array =================== */ 341 342 ushort getUshort(size_t i) 343 { 344 version (LittleEndian) 345 { 346 return *cast(ushort *)&data[i]; 347 } 348 else 349 { 350 ubyte b0 = data[i]; 351 ubyte b1 = data[i + 1]; 352 return (b1 << 8) | b0; 353 } 354 } 355 356 /++ 357 uint getUint(int i) 358 { 359 version (LittleEndian) 360 { 361 return *cast(uint *)&data[i]; 362 } 363 else 364 { 365 return bswap(*cast(uint *)&data[i]); 366 } 367 } 368 ++/ 369 370 if (data[0 .. 4] != cast(ubyte[])"PK\x03\x04") 371 throw new UzdException("invalid directory entry 4"); 372 373 // These values should match what is in the main zip archive directory 374 // but we aren't checking this for now 375 376 //de.extractVersion = getUshort(4); 377 //de.flags = getUshort(6); 378 //de.compressionMethod = getUshort(8); 379 //de.time = cast(DosFileTime)getUint(10); 380 //de.crc32 = getUint(14); 381 //de.compressedSize = getUint(18); 382 //de.expandedSize = getUint(22); 383 384 namelen = getUshort(26); 385 extralen = getUshort(28); 386 387 /++ 388 debug(print) 389 { 390 printf("\t\texpandedSize = %d\n", de.expandedSize); 391 printf("\t\tcompressedSize = %d\n", de.compressedSize); 392 printf("\t\tnamelen = %d\n", namelen); 393 printf("\t\textralen = %d\n", extralen); 394 } 395 ++/ 396 397 if (de.flags & 1) 398 throw new UzdException("encryption not supported"); 399 400 long i; 401 i = de.offset + 30 + namelen + extralen; 402 if (i + de.compressedSize > endrecOffset) 403 throw new UzdException("invalid directory entry 5"); 404 405 f.seek(i,SeekPos.Set); 406 data = new ubyte[de.compressedSize]; 407 // rawRead will throw an error if the data length is 0 408 if (data.length >0){ 409 f.read(data); 410 } 411 412 de.compressedData = data[0 .. de.compressedSize]; 413 //debug(print) arrayPrint(de.compressedData); 414 return; 415 } 416 417 /***** 418 * Decompress the contents of archive member de and return the expanded 419 * data in de.expandedData. 420 * This was originally a portion of std.zip's expand 421 * Delete the compressedData as we have no further use for it once we have the 422 * uncompressed version required to write the file. 423 */ 424 void expand2(ref ArchiveMember2 de) 425 { 426 427 switch (de.compressionMethod) 428 { 429 case 0: 430 de.expandedData = de.compressedData; 431 return; 432 433 case 8: 434 // -15 is a magic value used to decompress zip files. 435 // It has the effect of not requiring the 2 byte header 436 // and 4 byte trailer. 437 de.expandedData = cast(ubyte[])std.zlib.uncompress(cast(void[])de.compressedData, de.expandedSize, -15); 438 delete(de.compressedData); 439 de.compressedData = null; 440 return; 441 442 default: 443 throw new UzdException("unsupported compression method"); 444 } 445 } 446 447 /** 448 * A member of the ZipArchive directory, originally from a class in std.zip 449 * Commenting out the members we aren't using for this unzip task. 450 */ 451 struct ArchiveMember2 452 { 453 //ushort madeVersion = 20; /// Read Only 454 //ushort extractVersion = 20; /// Read Only 455 ushort flags; /// Read/Write: normally set to 0 456 ushort compressionMethod; /// Read/Write: 0 for compression, 8 for deflate 457 std.datetime.DosFileTime time; /// Read/Write: Last modified time of the member. It's in the DOS date/time format. 458 uint crc32; /// Read Only: cyclic redundancy check (CRC) value 459 uint compressedSize; /// Read Only: size of data of member in compressed form. 460 uint expandedSize; /// Read Only: size of data of member in expanded form. 461 //ushort diskNumber; /// Read Only: should be 0. 462 //ushort internalAttributes; /// Read/Write 463 //uint externalAttributes; /// Read/Write 464 465 uint offset; 466 467 /** 468 * Read/Write: Usually the file name of the archive member; it is used to 469 * index the archive directory for the member. Each member must have a unique 470 * name[]. Do not change without removing member from the directory first. 471 */ 472 string name; 473 474 //ubyte[] extra; /// Read/Write: extra data for this member. 475 //string comment; /// Read/Write: comment associated with this member. 476 ubyte[] compressedData; /// Read Only: data of member in compressed form. 477 ubyte[] expandedData; /// Read/Write: data of member in uncompressed form. 478 479 /++ 480 debug(print) 481 { 482 void print() 483 { 484 printf("name = '%.*s'\n", name.length, name.ptr); 485 printf("\tcomment = '%.*s'\n", comment.length, comment.ptr); 486 printf("\tmadeVersion = x%04x\n", madeVersion); 487 printf("\textractVersion = x%04x\n", extractVersion); 488 printf("\tflags = x%04x\n", flags); 489 printf("\tcompressionMethod = %d\n", compressionMethod); 490 printf("\ttime = %d\n", time); 491 printf("\tcrc32 = x%08x\n", crc32); 492 printf("\texpandedSize = %d\n", expandedSize); 493 printf("\tcompressedSize = %d\n", compressedSize); 494 printf("\tinternalAttributes = x%04x\n", internalAttributes); 495 printf("\texternalAttributes = x%08x\n", externalAttributes); 496 } 497 } 498 ++/ 499 } 500 501 // This struct was originally the ZipArchive class, but has been whittled away 502 // until all that is left is endrecOffset, which is just used for boundary checks later 503 504 struct ZipArchive2 505 { 506 ulong endrecOffset; // hmm... this is all that is left 507 508 // not needed uint diskNumber; /// Read Only: 0 since multi-disk zip archives are not supported. 509 // not needed uint diskStartDir; /// Read Only: 0 since multi-disk zip archives are not supported. 510 //uint numEntries; /// Read Only: number of ArchiveMembers in the directory. 511 //uint totalEntries; /// Read Only: same as totalEntries. 512 //string comment; /// Read/Write: the archive comment. Must be less than 65536 bytes in length. 513 514 /** 515 * Read Only: array indexed by the name of each member of the archive. 516 * Example: 517 * All the members of the archive can be accessed with a foreach loop: 518 * -------------------- 519 * ZipArchive2 archive = new ZipArchive2(data); 520 * foreach (ArchiveMember am; archive.directory) 521 * { 522 * writefln("member name is '%s'", am.name); 523 * } 524 * -------------------- 525 */ 526 527 /++ 528 debug (print) 529 { 530 void print() 531 { 532 printf("\tdiskNumber = %u\n", diskNumber); 533 printf("\tdiskStartDir = %u\n", diskStartDir); 534 printf("\tnumEntries = %u\n", numEntries); 535 printf("\ttotalEntries = %u\n", totalEntries); 536 printf("\tcomment = '%.*s'\n", comment.length, comment.ptr); 537 } 538 } 539 540 /* ============ Creating a new archive =================== */ 541 542 /** Constructor to use when creating a new archive. 543 */ 544 this() 545 { 546 } 547 ++/ 548 }