1 /**
2 from:
3 https://github.com/jnorwood/file_parallel/blob/master/unzip_parallel.d
4 */
5 
6 module unzip_parallel;
7 import std.file;
8 import std.path;
9 import std.parallelism;
10 import std.algorithm;
11 import std.stdio;
12 import std.stream;
13 import std.datetime;
14 import std.zlib;
15 
16 // unzip a zip file  in parallel.
17 // Creates the destination directory if it doesn't exist.
18 // Overwrites if any destination files exist, other than the root destDir
19 // This is derived from std.zip operations, which is has 32 bit limitations,
20 // i.e. 4GB archive entries.  The original std.zip operations have been split up
21 // to enable parallel operation when doing the deflate, as well as to attempt to reduce the 
22 // peak memory use while processing a large zip file.  It does not break
23 // individual zip archive entries up so there is the possibility of running out of
24 // memory if you have a large individual entry in the zip file. 
25 
26 class UzdException : Exception
27 {
28     this(string msg)
29     {
30         super("UzdException: " ~ msg);
31     }
32 }
33 
34 void unzipParallel (string pathname , string destDir){
35     DirEntry deSrc = DirEntry(pathname);
36     string [] files;
37 
38     if (!exists(destDir)){
39         mkdirRecurse (destDir); // makes dest root and all required parents
40     }
41     DirEntry destDe = DirEntry(destDir);
42     if(!destDe.isDir()){        
43         throw new FileException( destDe.name, " is not a directory"); 
44     }
45     string destName = destDe.name ~ '/';
46 
47     // this is needed when doing the modification time corrections
48     immutable TimeZone tz = LocalTime();
49 
50     if(!deSrc.isDir()){
51         auto f = new std.stream.File(deSrc.name, FileMode.In);
52         ZipArchive2 za; 
53         ArchiveMember2[] directoryFull = getArchiveDirectory(za,f);
54         ArchiveMember2[] directory = directoryFull;
55 
56         // for the folder entries (ending in /), just create the folder
57         // if it doesn't already exist
58         // This is not done in parallel, but is a very short time relative to the
59         // file creations for regular files... on the order of 2% of the total task
60         // A small improvement might be to do these depth first, since the recursive
61         // action would then create multiple directory entries in parallel.
62 
63         // Note that we aren't updatign the folder times until later, after all the files have
64         // been added
65         foreach (ref ArchiveMember2 am;  directory)
66         {
67             if (am.name[$-1]=='/'){
68                 string destFolder = destName ~ am.name;
69                 if (!exists(destFolder)){
70                     mkdirRecurse (destFolder); // makes dest root and all required parents
71                 }
72             }
73         }
74 
75         // ok, this is just some chunk size so we don't try to process all the files at once
76         enum CHUNK_SZ =  40_000_000;
77 
78         while (directory.length!=0){
79             ulong len = 0;
80 
81             // this will be a slice of the directory that will be inflated in parallel
82             ArchiveMember2[] subd;
83 
84             // read the compressed data for some number of entries into the am structure.
85             // Limit the number of entries processed by the cumulative compressed length.
86             // The read of compressed data from the file is not done in parallel.
87             foreach (int j, ref ArchiveMember2 am;  directory)
88             {
89                 // The zip folder name entries all end with '/'.  Exclude them 
90                 if (am.name[$-1]!='/'){
91                     // we are reading these per entry.  That is a lot of small seeks.
92                     // It might be faster to sum up the sizes and read the chunk at once,
93                     // but we would also need to pre-sort the directory entries by offset.
94                     readCompressedData(za.endrecOffset,am,f); 
95                     len += am.compressedSize;
96                     if (len > CHUNK_SZ){
97                         subd = directory[0..j+1];
98                         directory = directory[j+1..$];
99                         break;
100                     }
101                 }
102             }
103 
104             // This handles the last block of the directory
105             if (len <= CHUNK_SZ){
106                 subd = directory;
107                 directory = directory[$..$]; 
108             }
109 
110             // parallel foreach for inflation of the regular files
111             foreach(ref ArchiveMember2 am;  taskPool.parallel(subd,1)) {
112                 // again, this excludes folder name entries, which end in '/'
113                 if (am.name[$-1]!='/'){
114                     // this call does the inflation of the compressed data
115                     expand2(am);
116 
117                     // now create the destination filename and write it out
118                     // looks like the std.file.write is limited to 4GB
119                     // Could we handle larger files if expansion done in fragments?
120                     string destFilename = destName ~ am.name;
121                     std.file.write(destFilename,am.expandedData);
122 
123                     // update the file's modification time based on the zip data
124                     SysTime st = DosFileTimeToSysTime(am.time, tz);
125                     //std.file.setTimes(destFilename, st, st); 
126                     setTimes(destFilename, st, st); 
127 
128                     // garbage collector didn't do this for some reason?
129                     // Probably something with the c call for the inflate in expand2
130                     delete(am.expandedData);
131                     am.expandedData = null;
132                 }
133             }
134         }
135         // parallel foreach to set timestamp on folders
136         // This requires a fix to the issue at this link ... errors when setting ts on folders
137         // http://d.puremagic.com/issues/show_bug.cgi?id=7819
138         // There is a working fix provided in that issue
139         foreach(ref ArchiveMember2 am;  taskPool.parallel(directoryFull,100)) {
140             if (am.name[$-1]=='/'){
141                 string folderName = destName ~ am.name[0..$-1]; // trim the trailing /
142                 SysTime st = DosFileTimeToSysTime(am.time, tz);
143                 // This setTimes call currently throws an error when trying to set times on folders
144                 // but a fix in setTimes allows it.  Uncomment the line below
145                 // to enable restore of timestamps on folders when that is fixed
146                 // setTimes(folderName, st, st); 
147             }
148         }
149     }
150     else    { 
151         throw new FileException( deSrc.name, " needs to be a regular zip archive, not a folder");
152     }
153 }
154 
155 /* ============ Reading the zip archive directory from near file end =================== */
156 
157 /**
158 *
159 * Fills in the property  endrecOffset in za reference.
160 * For each ArchiveMember2 structure in the directory, fills in
161 * properties   offset, compressionMethod, time,
162 * crc32, compressedSize, expandedSize,   name[], 
163 * Use readCompressedData() later to fill in the compressedData
164 * Use expand2() later to uncompress the data for each ArchiveMember2.
165 *
166 * Params:
167 *  za = reference to the ZipArchive2 struncture.  za.endrecOffset will be set
168 */
169 
170 // a zip number related to the search for the directory at end of zip file
171 enum ZIP_MAGIC_66000 = 66000;
172 
173 ArchiveMember2[] getArchiveDirectory (ref ZipArchive2 za,  std.stream.File f)
174 {   ptrdiff_t iend;
175     ptrdiff_t i;
176     ptrdiff_t endcommentlength;
177     size_t directorySize;
178     size_t directoryOffset;
179 
180     ulong flen = f.size();
181 
182     // just read the directory at the end of the file
183     long fst;
184     uint fsz;
185     if (flen > ZIP_MAGIC_66000){
186         fst =  flen - ZIP_MAGIC_66000;
187         fsz = ZIP_MAGIC_66000;
188         f.seek (fst,SeekPos.Set);
189     }
190     else{
191         fst = 0;
192         fsz = flen & 0x1ffff;
193     }
194 
195     auto data = new ubyte[fsz];
196 
197     // some utility functions that reference the local auto data just created
198     ushort getUshort(size_t i)
199     {
200         version (LittleEndian)
201         {
202             return *cast(ushort *)&data[i];
203         }
204         else
205         {
206             ubyte b0 = data[i];
207             ubyte b1 = data[i + 1];
208             return (b1 << 8) | b0;
209         }
210     }
211 
212     uint getUint(size_t i)
213     {
214         version (LittleEndian)
215         {
216             return *cast(uint *)&data[i];
217         }
218         else
219         {
220             return bswap(*cast(uint *)&data[i]);
221         }
222     }
223 
224     if (data.length > 0)
225     f.read(data);
226 
227     //this.data = cast(ubyte[]) buffer;
228 
229     // Find 'end record index' by searching backwards for signature
230     iend =  data.length - ZIP_MAGIC_66000;
231     if (iend < 0)
232         iend = 0;
233     for (i = data.length - 22; 1; i--)
234     {
235         if (i < iend)
236             throw new UzdException("no end record");
237 
238         if (data[i .. i + 4] == cast(ubyte[])"PK\x05\x06")
239         {
240             endcommentlength = getUshort(i + 20);
241             if (i + 22 + endcommentlength > data.length)
242                 continue;
243             //za.comment = cast(string)(data[i + 22 .. i + 22 + endcommentlength]);
244             za.endrecOffset = fst+i;
245             break;
246         }
247     }
248 
249 
250     // Read end record data
251     // not needed za.diskNumber = getUshort(i + 4);
252     // not needed za.diskStartDir = getUshort(i + 6);
253 
254     // changed these to local vars since the returned directory array has its own size
255     uint numEntries = getUshort(i + 8);
256     uint totalEntries = getUshort(i + 10);
257 
258     if (numEntries != totalEntries)
259         throw new UzdException("multiple disk zips not supported");
260 
261     directorySize = getUint(i + 12);
262     directoryOffset = getUint(i + 16);
263 
264     if (directoryOffset + directorySize > flen)
265         throw new UzdException("corrupted directory");
266 
267     f.seek (directoryOffset,SeekPos.Set);
268     data = new ubyte[directorySize];
269     if (data.length >0)
270         f.read(data);
271     i=0;
272 
273     ArchiveMember2[] directory = new ArchiveMember2[numEntries];
274 
275     foreach (ref de; directory)
276     {
277         /* The format of an entry is:
278         *  'PK' 1, 2
279         *  directory info
280         *  path
281         *  extra data
282         *  comment
283         */
284 
285         uint offset;
286         uint namelen;
287         uint extralen;
288         uint commentlen;
289 
290         if (data[i .. i + 4] != cast(ubyte[])"PK\x01\x02")
291             throw new UzdException("invalid directory entry 1");
292         //de.madeVersion = getUshort(i + 4);
293         //de.extractVersion = getUshort(i + 6);
294         de.flags = getUshort(i + 8);
295         de.compressionMethod = getUshort(i + 10);
296         //DosFileTimeToSysTime may be needed to put in form that can be use to restore file time
297         de.time = cast(DosFileTime)getUint(i + 12);
298         de.crc32 = getUint(i + 16);
299         de.compressedSize = getUint(i + 20);
300         de.expandedSize = getUint(i + 24);
301         namelen = getUshort(i + 28);
302         extralen = getUshort(i + 30);
303         commentlen = getUshort(i + 32);
304         //de.diskNumber = getUshort(i + 34);
305         //de.internalAttributes = getUshort(i + 36);
306         //de.externalAttributes = getUint(i + 38);
307         de.offset = getUint(i + 42);
308         i += 46;
309 
310         if (i + namelen + extralen + commentlen > directoryOffset + directorySize)
311             throw new UzdException("invalid directory entry 2");
312 
313         de.name = cast(string)(data[i .. i + namelen]);
314         i += namelen;
315         //de.extra = data[i .. i + extralen];
316         i += extralen;
317         //de.comment = cast(string)(data[i .. i + commentlen]);
318         i += commentlen;
319     }
320     if (i !=  directorySize)
321         throw new UzdException("invalid directory entry 3");
322     return directory;
323 }
324 
325 /*****
326 * get the compressed data into de.compressedData
327 *
328 * Could also compare the other properties from the directory,
329 * but those are commented out for now . 
330 */
331 void readCompressedData(ulong endrecOffset, ref ArchiveMember2 de, std.stream.File f)
332 {   
333     uint namelen;
334     uint extralen;
335     f.seek(de.offset,SeekPos.Set);
336     auto data = new ubyte[30];
337     if (data.length >0)
338     f.read(data);
339 
340     /* ============ Utility operations that work on the local data array =================== */
341 
342     ushort getUshort(size_t i)
343     {
344         version (LittleEndian)
345         {
346             return *cast(ushort *)&data[i];
347         }
348         else
349         {
350             ubyte b0 = data[i];
351             ubyte b1 = data[i + 1];
352             return (b1 << 8) | b0;
353         }
354     }
355 
356     /++
357     uint getUint(int i)
358     {
359         version (LittleEndian)
360         {
361             return *cast(uint *)&data[i];
362         }
363         else
364         {
365             return bswap(*cast(uint *)&data[i]);
366         }
367     }
368     ++/
369 
370     if (data[0 .. 4] != cast(ubyte[])"PK\x03\x04")
371         throw new UzdException("invalid directory entry 4");
372 
373     // These values should match what is in the main zip archive directory
374     // but we aren't checking this for now
375 
376     //de.extractVersion = getUshort(4);
377     //de.flags = getUshort(6);
378     //de.compressionMethod = getUshort(8);
379     //de.time = cast(DosFileTime)getUint(10);
380     //de.crc32 = getUint(14);
381     //de.compressedSize = getUint(18);
382     //de.expandedSize = getUint(22);
383 
384     namelen = getUshort(26);
385     extralen = getUshort(28);
386 
387     /++
388     debug(print)
389     {
390         printf("\t\texpandedSize = %d\n", de.expandedSize);
391         printf("\t\tcompressedSize = %d\n", de.compressedSize);
392         printf("\t\tnamelen = %d\n", namelen);
393         printf("\t\textralen = %d\n", extralen);
394     }
395     ++/
396 
397     if (de.flags & 1)
398         throw new UzdException("encryption not supported");
399 
400     long i;
401     i = de.offset + 30 + namelen + extralen;
402     if (i + de.compressedSize > endrecOffset)
403         throw new UzdException("invalid directory entry 5");
404 
405     f.seek(i,SeekPos.Set);
406     data = new ubyte[de.compressedSize];
407     // rawRead will throw an error if the data length is 0
408     if (data.length >0){
409         f.read(data);
410     }
411 
412     de.compressedData = data[0 .. de.compressedSize];
413     //debug(print) arrayPrint(de.compressedData);
414     return; 
415 }
416 
417 /*****
418 * Decompress the contents of archive member de and return the expanded
419 * data in de.expandedData.
420 * This was originally a portion  of std.zip's expand
421 * Delete the compressedData as we have no further use for it once we have the 
422 * uncompressed version required to write the file.  
423 */
424 void expand2(ref ArchiveMember2 de)
425 {   
426 
427     switch (de.compressionMethod)
428     {
429         case 0:
430             de.expandedData = de.compressedData;
431             return;
432 
433         case 8:
434             // -15 is a magic value used to decompress zip files.
435             // It has the effect of not requiring the 2 byte header
436             // and 4 byte trailer.
437             de.expandedData = cast(ubyte[])std.zlib.uncompress(cast(void[])de.compressedData, de.expandedSize, -15);
438             delete(de.compressedData);
439             de.compressedData = null;
440             return;
441 
442         default:
443             throw new UzdException("unsupported compression method");
444     }
445 }
446 
447 /**
448 * A member of the ZipArchive directory, originally from a class in std.zip
449 * Commenting out the members we aren't using for this unzip task.
450 */
451 struct ArchiveMember2
452 {
453     //ushort madeVersion = 20;       /// Read Only
454     //ushort extractVersion = 20;    /// Read Only
455     ushort flags;                  /// Read/Write: normally set to 0
456     ushort compressionMethod;      /// Read/Write: 0 for compression, 8 for deflate
457     std.datetime.DosFileTime time; /// Read/Write: Last modified time of the member. It's in the DOS date/time format.
458     uint crc32;                    /// Read Only: cyclic redundancy check (CRC) value
459     uint compressedSize;           /// Read Only: size of data of member in compressed form.
460     uint expandedSize;             /// Read Only: size of data of member in expanded form.
461     //ushort diskNumber;             /// Read Only: should be 0.
462     //ushort internalAttributes;     /// Read/Write
463     //uint externalAttributes;       /// Read/Write
464 
465     uint offset;
466 
467     /**
468     * Read/Write: Usually the file name of the archive member; it is used to
469     * index the archive directory for the member. Each member must have a unique
470     * name[]. Do not change without removing member from the directory first.
471     */
472     string name;
473 
474     //ubyte[] extra;              /// Read/Write: extra data for this member.
475     //string comment;             /// Read/Write: comment associated with this member.
476     ubyte[] compressedData;     /// Read Only: data of member in compressed form.
477     ubyte[] expandedData;       /// Read/Write: data of member in uncompressed form.
478 
479     /++
480     debug(print)
481     {
482         void print()
483         {
484             printf("name = '%.*s'\n", name.length, name.ptr);
485             printf("\tcomment = '%.*s'\n", comment.length, comment.ptr);
486             printf("\tmadeVersion = x%04x\n", madeVersion);
487             printf("\textractVersion = x%04x\n", extractVersion);
488             printf("\tflags = x%04x\n", flags);
489             printf("\tcompressionMethod = %d\n", compressionMethod);
490             printf("\ttime = %d\n", time);
491             printf("\tcrc32 = x%08x\n", crc32);
492             printf("\texpandedSize = %d\n", expandedSize);
493             printf("\tcompressedSize = %d\n", compressedSize);
494             printf("\tinternalAttributes = x%04x\n", internalAttributes);
495             printf("\texternalAttributes = x%08x\n", externalAttributes);
496         }
497     }
498     ++/
499 }
500 
501 // This struct was originally the ZipArchive class, but has been whittled away
502 // until all that is left is endrecOffset, which is just used for boundary checks later
503 
504 struct ZipArchive2
505 {
506     ulong endrecOffset; // hmm... this is all that is left
507 
508     // not needed uint diskNumber;    /// Read Only: 0 since multi-disk zip archives are not supported.
509     // not needed uint diskStartDir;  /// Read Only: 0 since multi-disk zip archives are not supported.
510     //uint numEntries;    /// Read Only: number of ArchiveMembers in the directory.
511     //uint totalEntries;  /// Read Only: same as totalEntries.
512     //string comment;     /// Read/Write: the archive comment. Must be less than 65536 bytes in length.
513 
514     /**
515     * Read Only: array indexed by the name of each member of the archive.
516     * Example:
517     *  All the members of the archive can be accessed with a foreach loop:
518     * --------------------
519     * ZipArchive2 archive = new ZipArchive2(data);
520     * foreach (ArchiveMember am; archive.directory)
521     * {
522     *     writefln("member name is '%s'", am.name);
523     * }
524     * --------------------
525     */
526 
527     /++
528     debug (print)
529     {
530         void print()
531         {
532             printf("\tdiskNumber = %u\n", diskNumber);
533             printf("\tdiskStartDir = %u\n", diskStartDir);
534             printf("\tnumEntries = %u\n", numEntries);
535             printf("\ttotalEntries = %u\n", totalEntries);
536             printf("\tcomment = '%.*s'\n", comment.length, comment.ptr);
537         }
538     }
539 
540     /* ============ Creating a new archive =================== */
541 
542     /** Constructor to use when creating a new archive.
543     */
544     this()
545     {
546     }
547     ++/
548 }