🐛 Wait with reading local file header until stream is ready

This commit is contained in:
C-3PO 2018-07-05 22:58:41 +02:00
parent 86ba51731c
commit 07de050588
Signed by: c3po
GPG key ID: 62993C4BB4D86F24
3 changed files with 23 additions and 8 deletions

View file

@ -38,7 +38,7 @@ export default async function getPatch(product: Product, from: number, to: numbe
//Extract newly added files //Extract newly added files
fileEntries.filter((file) => file.diffType === SsnDiffType.NewFile).forEach(async (file) => { fileEntries.filter((file) => file.diffType === SsnDiffType.NewFile).forEach(async (file) => {
const fileStream = getFileFromDisks(diskFilenames, { diskStart: file.diskNumberStart, offset: file.offset, storedSize: file.compressedSize }); const fileStream = await getFileFromDisks(diskFilenames, { diskStart: file.diskNumberStart, offset: file.offset, storedSize: file.compressedSize });
const fileContents = extractFileStream(file, fileStream); const fileContents = extractFileStream(file, fileStream);
console.debug(await streamToArrayBuffer(fileContents)); console.debug(await streamToArrayBuffer(fileContents));
//TODO: need to write to disk //TODO: need to write to disk
@ -46,7 +46,7 @@ export default async function getPatch(product: Product, from: number, to: numbe
//Extract changed files //Extract changed files
fileEntries.filter((file) => file.diffType === SsnDiffType.Changed).forEach(async (file) => { fileEntries.filter((file) => file.diffType === SsnDiffType.Changed).forEach(async (file) => {
const fileStream = getFileFromDisks(diskFilenames, { diskStart: file.diskNumberStart, offset: file.offset, storedSize: file.compressedSize }); const fileStream = await getFileFromDisks(diskFilenames, { diskStart: file.diskNumberStart, offset: file.offset, storedSize: file.compressedSize });
const fileContents = extractFileStream(file, fileStream); const fileContents = extractFileStream(file, fileStream);
console.debug(await streamToArrayBuffer(fileContents)); console.debug(await streamToArrayBuffer(fileContents));
//TODO: need to apply diffing, then write to disk //TODO: need to apply diffing, then write to disk

View file

@ -16,7 +16,7 @@ function getStream(disks: string[], index: number, offset: number, length: numbe
} }
/** Takes a list of ReadableStreams (the disks), as well as the offset and length, and returns a stream for just one file. */ /** Takes a list of ReadableStreams (the disks), as well as the offset and length, and returns a stream for just one file. */
export default function getFileFromDisks(disks: string[], { diskStart, offset, storedSize }: IGetFileFromDisksOptions): stream.Readable { export default async function getFileFromDisks(disks: string[], { diskStart, offset, storedSize }: IGetFileFromDisksOptions): Promise<stream.Readable> {
let curDiskIndex = diskStart; let curDiskIndex = diskStart;
let curDisk = getStream(disks, diskStart, offset); let curDisk = getStream(disks, diskStart, offset);
let localFileHeaderLength = 0; let localFileHeaderLength = 0;
@ -56,6 +56,9 @@ export default function getFileFromDisks(disks: string[], { diskStart, offset, s
//TODO //TODO
} }
}; };
const onError = (error: any) => {
console.error(error);
};
const onEnd = () => { const onEnd = () => {
curDiskIndex += 1; curDiskIndex += 1;
//End if we are at end of file or end of disks //End if we are at end of file or end of disks
@ -69,16 +72,13 @@ export default function getFileFromDisks(disks: string[], { diskStart, offset, s
curDisk.on('error', onError); curDisk.on('error', onError);
} }
}; };
const onError = (error: any) => {
console.error(error);
};
curDisk.on('data', onData); curDisk.on('data', onData);
curDisk.on('end', onEnd); curDisk.on('end', onEnd);
curDisk.on('error', onError); curDisk.on('error', onError);
//Read local file header //Read local file header
localFileHeaderLength = readLocalFileHeader(outputStream); localFileHeaderLength = await readLocalFileHeader(outputStream);
//TODO: now that local file header has been read, restrict length of stream to storedSize //TODO: now that local file header has been read, restrict length of stream to storedSize

View file

@ -1,10 +1,24 @@
import * as stream from 'stream'; import * as stream from 'stream';
/** Returns a promise that resolves as soon as the given stream has the given number of bytes ready for reading. */
function waitReadableLength(inputStream: stream.Readable, minLength: number): Promise<void> {
return new Promise((resolve) => {
const interval = setInterval(() => {
if (inputStream.readableLength >= minLength) {
clearInterval(interval);
resolve();
}
}, 100);
});
}
/** Reads the local file header, which is included before /** Reads the local file header, which is included before
* each stored file, and advances the stream accordingly. * each stored file, and advances the stream accordingly.
* Returns length of the local file header. * Returns length of the local file header.
*/ */
export default function readLocalFileHeader(inputStream: stream.Readable): number { export default async function readLocalFileHeader(inputStream: stream.Readable): Promise<number> {
//TODO: need to wait until
await waitReadableLength(inputStream, 30);
const localFileHeader: Buffer = inputStream.read(30); const localFileHeader: Buffer = inputStream.read(30);
//Local file header signature //Local file header signature
@ -21,6 +35,7 @@ export default function readLocalFileHeader(inputStream: stream.Readable): numbe
//skip local file name and extra field //skip local file name and extra field
const additionalLength = localFilenameSize + localExtraSize; const additionalLength = localFilenameSize + localExtraSize;
if (additionalLength > 0) { if (additionalLength > 0) {
await waitReadableLength(inputStream, additionalLength);
inputStream.read(additionalLength); inputStream.read(additionalLength);
} }