🐛 Fix getFileFromDisks returning null due to not waiting for readable event
This commit is contained in:
parent
89d1b9e644
commit
1c91b51686
2 changed files with 49 additions and 12 deletions
|
@ -11,33 +11,64 @@ interface IGetFileFromDisksOptions {
|
||||||
storedSize: number;
|
storedSize: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
function getStream(disks: string[], index: number, offset: number) {
|
function getStream(disks: string[], index: number, offset: number, length: number = Infinity) {
|
||||||
return fs.createReadStream(disks[index], { start: offset });
|
return fs.createReadStream(disks[index], { start: offset, end: offset + length });
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Takes a list of ReadableStreams (the disks), as well as the offset and length, and returns a stream for just one file. */
|
/** Takes a list of ReadableStreams (the disks), as well as the offset and length, and returns a stream for just one file. */
|
||||||
export default function getFileFromDisks(disks: string[], { diskStart, offset, storedSize }: IGetFileFromDisksOptions): stream.Readable {
|
export default function getFileFromDisks(disks: string[], { diskStart, offset, storedSize }: IGetFileFromDisksOptions): stream.Readable {
|
||||||
let curDiskIndex = diskStart;
|
let curDiskIndex = diskStart;
|
||||||
let curDisk = getStream(disks, diskStart, offset);
|
let curDisk = getStream(disks, diskStart, offset);
|
||||||
|
let localFileHeaderLength = 0;
|
||||||
|
let totalRead = 0;
|
||||||
|
|
||||||
//Create new stream that concatenates disks until storedSize is reached, then ends the stream.
|
//Create new stream that concatenates disks until storedSize is reached, then ends the stream.
|
||||||
const outputStream = new stream.Readable({
|
const outputStream = new stream.Duplex({
|
||||||
read(num) {
|
/*read(num) {
|
||||||
|
if (num === undefined) {
|
||||||
|
throw new Error('Expected to receive number of bytes when reading from stream.');
|
||||||
|
}
|
||||||
|
|
||||||
|
totalRead += num;
|
||||||
|
//end of file reached
|
||||||
|
if (localFileHeaderLength !== 0 && totalRead >= localFileHeaderLength + storedSize) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
const chunk = curDisk.read(num);
|
const chunk = curDisk.read(num);
|
||||||
//transparently switch to next disk as soon as we finished reading current disk
|
//transparently switch to next disk as soon as we finished reading current disk
|
||||||
if (chunk === null) {
|
if (chunk === null) {
|
||||||
curDiskIndex += 1;
|
curDiskIndex += 1;
|
||||||
curDisk = getStream(disks, curDiskIndex, 0);
|
curDisk = getStream(disks, curDiskIndex, 0, (localFileHeaderLength === 0) ? Infinity : localFileHeaderLength + storedSize - totalRead);
|
||||||
//await new Promise((resolve) => { curDisk.on('readable', () => { resolve(); }); });
|
//TODO: await new Promise((resolve) => { curDisk.on('readable', () => { resolve(); }); });
|
||||||
return curDisk.read(num);
|
return curDisk.read(num);
|
||||||
} else {
|
} else {
|
||||||
return chunk;
|
return chunk;
|
||||||
}
|
}
|
||||||
},
|
},*/
|
||||||
});
|
});
|
||||||
|
|
||||||
|
const onData = (chunk: Buffer) => {
|
||||||
|
outputStream.write(chunk);
|
||||||
|
totalRead += chunk.length;
|
||||||
|
};
|
||||||
|
const onEnd = () => {
|
||||||
|
curDiskIndex += 1;
|
||||||
|
if (curDiskIndex >= disks.length || (localFileHeaderLength !== 0 && totalRead >= localFileHeaderLength + storedSize)) {
|
||||||
|
outputStream.end();
|
||||||
|
} else {
|
||||||
|
curDisk = getStream(disks, curDiskIndex, 0, (localFileHeaderLength === 0) ? Infinity : localFileHeaderLength + storedSize - totalRead);
|
||||||
|
//set up new listeners for data and end
|
||||||
|
curDisk.on('data', onData);
|
||||||
|
curDisk.on('end', onEnd);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
curDisk.on('data', onData);
|
||||||
|
curDisk.on('end', onEnd);
|
||||||
|
|
||||||
//Read local file header
|
//Read local file header
|
||||||
readLocalFileHeader(outputStream);
|
localFileHeaderLength = readLocalFileHeader(outputStream);
|
||||||
|
|
||||||
//TODO: now that local file header has been read, restrict length of stream to storedSize
|
//TODO: now that local file header has been read, restrict length of stream to storedSize
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,10 @@
|
||||||
import * as stream from 'stream';
|
import * as stream from 'stream';
|
||||||
|
|
||||||
/** Reads the local file header, which is included before each stored file, and advances the stream accordingly. */
|
/** Reads the local file header, which is included before
|
||||||
export default function readLocalFileHeader(inputStream: stream.Readable) {
|
* each stored file, and advances the stream accordingly.
|
||||||
|
* Returns length of the local file header.
|
||||||
|
*/
|
||||||
|
export default function readLocalFileHeader(inputStream: stream.Readable): number {
|
||||||
const localFileHeader: Buffer = inputStream.read(30);
|
const localFileHeader: Buffer = inputStream.read(30);
|
||||||
|
|
||||||
//Local file header signature
|
//Local file header signature
|
||||||
|
@ -16,7 +19,10 @@ export default function readLocalFileHeader(inputStream: stream.Readable) {
|
||||||
const localExtraSize = localFileHeader.readUInt16LE(28);
|
const localExtraSize = localFileHeader.readUInt16LE(28);
|
||||||
|
|
||||||
//skip local file name and extra field
|
//skip local file name and extra field
|
||||||
if (localFilenameSize + localExtraSize > 0) {
|
const additionalLength = localFilenameSize + localExtraSize;
|
||||||
inputStream.read(localFilenameSize + localExtraSize);
|
if (additionalLength > 0) {
|
||||||
|
inputStream.read(additionalLength);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return 30 + additionalLength;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue