diff --git a/src/ssn/streams/getFileFromDisks.ts b/src/ssn/streams/getFileFromDisks.ts index 4b87b50..8602456 100644 --- a/src/ssn/streams/getFileFromDisks.ts +++ b/src/ssn/streams/getFileFromDisks.ts @@ -11,33 +11,64 @@ interface IGetFileFromDisksOptions { storedSize: number; } -function getStream(disks: string[], index: number, offset: number) { - return fs.createReadStream(disks[index], { start: offset }); +function getStream(disks: string[], index: number, offset: number, length: number = Infinity) { + return fs.createReadStream(disks[index], { start: offset, end: offset + length }); } /** Takes a list of ReadableStreams (the disks), as well as the offset and length, and returns a stream for just one file. */ export default function getFileFromDisks(disks: string[], { diskStart, offset, storedSize }: IGetFileFromDisksOptions): stream.Readable { let curDiskIndex = diskStart; let curDisk = getStream(disks, diskStart, offset); + let localFileHeaderLength = 0; + let totalRead = 0; //Create new stream that concatenates disks until storedSize is reached, then ends the stream. - const outputStream = new stream.Readable({ - read(num) { + const outputStream = new stream.Duplex({ + /*read(num) { + if (num === undefined) { + throw new Error('Expected to receive number of bytes when reading from stream.'); + } + + totalRead += num; + //end of file reached + if (localFileHeaderLength !== 0 && totalRead >= localFileHeaderLength + storedSize) { + return null; + } + const chunk = curDisk.read(num); //transparently switch to next disk as soon as we finished reading current disk if (chunk === null) { curDiskIndex += 1; - curDisk = getStream(disks, curDiskIndex, 0); - //await new Promise((resolve) => { curDisk.on('readable', () => { resolve(); }); }); + curDisk = getStream(disks, curDiskIndex, 0, (localFileHeaderLength === 0) ? Infinity : localFileHeaderLength + storedSize - totalRead); + //TODO: await new Promise((resolve) => { curDisk.on('readable', () => { resolve(); }); }); return curDisk.read(num); } else { return chunk; } - }, + },*/ }); + const onData = (chunk: Buffer) => { + outputStream.write(chunk); + totalRead += chunk.length; + }; + const onEnd = () => { + curDiskIndex += 1; + if (curDiskIndex >= disks.length || (localFileHeaderLength !== 0 && totalRead >= localFileHeaderLength + storedSize)) { + outputStream.end(); + } else { + curDisk = getStream(disks, curDiskIndex, 0, (localFileHeaderLength === 0) ? Infinity : localFileHeaderLength + storedSize - totalRead); + //set up new listeners for data and end + curDisk.on('data', onData); + curDisk.on('end', onEnd); + } + }; + + curDisk.on('data', onData); + curDisk.on('end', onEnd); + //Read local file header - readLocalFileHeader(outputStream); + localFileHeaderLength = readLocalFileHeader(outputStream); //TODO: now that local file header has been read, restrict length of stream to storedSize diff --git a/src/ssn/streams/readLocalFileHeader.ts b/src/ssn/streams/readLocalFileHeader.ts index 04b6849..35d2488 100644 --- a/src/ssn/streams/readLocalFileHeader.ts +++ b/src/ssn/streams/readLocalFileHeader.ts @@ -1,7 +1,10 @@ import * as stream from 'stream'; -/** Reads the local file header, which is included before each stored file, and advances the stream accordingly. */ -export default function readLocalFileHeader(inputStream: stream.Readable) { +/** Reads the local file header, which is included before + * each stored file, and advances the stream accordingly. + * Returns length of the local file header. + */ +export default function readLocalFileHeader(inputStream: stream.Readable): number { const localFileHeader: Buffer = inputStream.read(30); //Local file header signature @@ -16,7 +19,10 @@ export default function readLocalFileHeader(inputStream: stream.Readable) { const localExtraSize = localFileHeader.readUInt16LE(28); //skip local file name and extra field - if (localFilenameSize + localExtraSize > 0) { - inputStream.read(localFilenameSize + localExtraSize); + const additionalLength = localFilenameSize + localExtraSize; + if (additionalLength > 0) { + inputStream.read(additionalLength); } + + return 30 + additionalLength; }