From 07de050588915e3b81d157598bc5476450953ceb Mon Sep 17 00:00:00 2001 From: C-3PO Date: Thu, 5 Jul 2018 22:58:41 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Wait=20with=20reading=20local=20?= =?UTF-8?q?file=20header=20until=20stream=20is=20ready?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/ssn/getPatch.ts | 4 ++-- src/ssn/streams/getFileFromDisks.ts | 10 +++++----- src/ssn/streams/readLocalFileHeader.ts | 17 ++++++++++++++++- 3 files changed, 23 insertions(+), 8 deletions(-) diff --git a/src/ssn/getPatch.ts b/src/ssn/getPatch.ts index 7b96eff..a99f379 100644 --- a/src/ssn/getPatch.ts +++ b/src/ssn/getPatch.ts @@ -38,7 +38,7 @@ export default async function getPatch(product: Product, from: number, to: numbe //Extract newly added files fileEntries.filter((file) => file.diffType === SsnDiffType.NewFile).forEach(async (file) => { - const fileStream = getFileFromDisks(diskFilenames, { diskStart: file.diskNumberStart, offset: file.offset, storedSize: file.compressedSize }); + const fileStream = await getFileFromDisks(diskFilenames, { diskStart: file.diskNumberStart, offset: file.offset, storedSize: file.compressedSize }); const fileContents = extractFileStream(file, fileStream); console.debug(await streamToArrayBuffer(fileContents)); //TODO: need to write to disk @@ -46,7 +46,7 @@ export default async function getPatch(product: Product, from: number, to: numbe //Extract changed files fileEntries.filter((file) => file.diffType === SsnDiffType.Changed).forEach(async (file) => { - const fileStream = getFileFromDisks(diskFilenames, { diskStart: file.diskNumberStart, offset: file.offset, storedSize: file.compressedSize }); + const fileStream = await getFileFromDisks(diskFilenames, { diskStart: file.diskNumberStart, offset: file.offset, storedSize: file.compressedSize }); const fileContents = extractFileStream(file, fileStream); console.debug(await streamToArrayBuffer(fileContents)); //TODO: need to apply diffing, then write to disk diff --git a/src/ssn/streams/getFileFromDisks.ts b/src/ssn/streams/getFileFromDisks.ts index e3f6b5c..6a949c6 100644 --- a/src/ssn/streams/getFileFromDisks.ts +++ b/src/ssn/streams/getFileFromDisks.ts @@ -16,7 +16,7 @@ function getStream(disks: string[], index: number, offset: number, length: numbe } /** Takes a list of ReadableStreams (the disks), as well as the offset and length, and returns a stream for just one file. */ -export default function getFileFromDisks(disks: string[], { diskStart, offset, storedSize }: IGetFileFromDisksOptions): stream.Readable { +export default async function getFileFromDisks(disks: string[], { diskStart, offset, storedSize }: IGetFileFromDisksOptions): Promise { let curDiskIndex = diskStart; let curDisk = getStream(disks, diskStart, offset); let localFileHeaderLength = 0; @@ -56,6 +56,9 @@ export default function getFileFromDisks(disks: string[], { diskStart, offset, s //TODO } }; + const onError = (error: any) => { + console.error(error); + }; const onEnd = () => { curDiskIndex += 1; //End if we are at end of file or end of disks @@ -69,16 +72,13 @@ export default function getFileFromDisks(disks: string[], { diskStart, offset, s curDisk.on('error', onError); } }; - const onError = (error: any) => { - console.error(error); - }; curDisk.on('data', onData); curDisk.on('end', onEnd); curDisk.on('error', onError); //Read local file header - localFileHeaderLength = readLocalFileHeader(outputStream); + localFileHeaderLength = await readLocalFileHeader(outputStream); //TODO: now that local file header has been read, restrict length of stream to storedSize diff --git a/src/ssn/streams/readLocalFileHeader.ts b/src/ssn/streams/readLocalFileHeader.ts index 35d2488..adc8eb5 100644 --- a/src/ssn/streams/readLocalFileHeader.ts +++ b/src/ssn/streams/readLocalFileHeader.ts @@ -1,10 +1,24 @@ import * as stream from 'stream'; +/** Returns a promise that resolves as soon as the given stream has the given number of bytes ready for reading. */ +function waitReadableLength(inputStream: stream.Readable, minLength: number): Promise { + return new Promise((resolve) => { + const interval = setInterval(() => { + if (inputStream.readableLength >= minLength) { + clearInterval(interval); + resolve(); + } + }, 100); + }); +} + /** Reads the local file header, which is included before * each stored file, and advances the stream accordingly. * Returns length of the local file header. */ -export default function readLocalFileHeader(inputStream: stream.Readable): number { +export default async function readLocalFileHeader(inputStream: stream.Readable): Promise { + //TODO: need to wait until + await waitReadableLength(inputStream, 30); const localFileHeader: Buffer = inputStream.read(30); //Local file header signature @@ -21,6 +35,7 @@ export default function readLocalFileHeader(inputStream: stream.Readable): numbe //skip local file name and extra field const additionalLength = localFilenameSize + localExtraSize; if (additionalLength > 0) { + await waitReadableLength(inputStream, additionalLength); inputStream.read(additionalLength); }