From 663d4cd6e7635f27d5a0ae4bf01b9204046f8f48 Mon Sep 17 00:00:00 2001 From: Jonas Jenwald Date: Sun, 25 Jan 2026 12:34:47 +0100 Subject: [PATCH] Use standard `ReadableStream`s in the `src/display/node_stream.js` code Thanks to newer Node.js functionality, see https://nodejs.org/api/stream.html#streamreadabletowebstreamreadable-options, we can use standard `ReadableStream`s which help to significantly shorten and simplify the code. For older Node.js versions we use the `node-readable-to-web-readable-stream` package, see https://www.npmjs.com/package/node-readable-to-web-readable-stream, to get the same functionality. --- gulpfile.mjs | 1 + package-lock.json | 8 ++ package.json | 1 + src/display/node_stream.js | 224 ++++++++++++++----------------------- 4 files changed, 97 insertions(+), 137 deletions(-) diff --git a/gulpfile.mjs b/gulpfile.mjs index 6942909c3..3ad321ee3 100644 --- a/gulpfile.mjs +++ b/gulpfile.mjs @@ -2366,6 +2366,7 @@ function packageJson() { license: DIST_LICENSE, optionalDependencies: { "@napi-rs/canvas": "^0.1.88", + "node-readable-to-web-readable-stream": "^0.4.2", }, browser: { canvas: false, diff --git a/package-lock.json b/package-lock.json index 51ccbdc82..2d1d409c7 100644 --- a/package-lock.json +++ b/package-lock.json @@ -44,6 +44,7 @@ "jstransformer-nunjucks": "^1.2.0", "metalsmith": "^2.6.3", "metalsmith-html-relative": "^2.0.9", + "node-readable-to-web-readable-stream": "^0.4.2", "ordered-read-streams": "^2.0.0", "pngjs": "^7.0.0", "postcss": "^8.5.6", @@ -9373,6 +9374,13 @@ "node": ">= 0.4.0" } }, + "node_modules/node-readable-to-web-readable-stream": { + "version": "0.4.2", + "resolved": "https://registry.npmjs.org/node-readable-to-web-readable-stream/-/node-readable-to-web-readable-stream-0.4.2.tgz", + "integrity": "sha512-/cMZNI34v//jUTrI+UIo4ieHAB5EZRY/+7OmXZgBxaWBMcW2tGdceIw06RFxWxrKZ5Jp3sI2i5TsRo+CBhtVLQ==", + "dev": true, + "license": "MIT" + }, "node_modules/node-releases": { "version": "2.0.27", "resolved": "https://registry.npmjs.org/node-releases/-/node-releases-2.0.27.tgz", diff --git a/package.json b/package.json index 1f8daf412..5575db327 100644 --- a/package.json +++ b/package.json @@ -39,6 +39,7 @@ "jstransformer-nunjucks": "^1.2.0", "metalsmith": "^2.6.3", "metalsmith-html-relative": "^2.0.9", + "node-readable-to-web-readable-stream": "^0.4.2", "ordered-read-streams": "^2.0.0", "pngjs": "^7.0.0", "postcss": "^8.5.6", diff --git a/src/display/node_stream.js b/src/display/node_stream.js index e42a1b9e9..e2f3b9cc6 100644 --- a/src/display/node_stream.js +++ b/src/display/node_stream.js @@ -14,7 +14,7 @@ */ /* globals process */ -import { AbortException, assert } from "../shared/util.js"; +import { AbortException, assert, warn } from "../shared/util.js"; import { createResponseError } from "./network_utils.js"; if (typeof PDFJSDev !== "undefined" && PDFJSDev.test("MOZCENTRAL")) { @@ -33,6 +33,33 @@ function parseUrlOrPath(sourceUrl) { return new URL(url.pathToFileURL(sourceUrl)); } +function getReadableStream(readStream) { + const { Readable } = process.getBuiltinModule("stream"); + + if (typeof Readable.toWeb === "function") { + // See https://nodejs.org/api/stream.html#streamreadabletowebstreamreadable-options + return Readable.toWeb(readStream); + } + // Fallback to support Node.js versions older than `24.0.0` and `22.17.0`. + const require = process + .getBuiltinModule("module") + .createRequire(import.meta.url); + + const polyfill = require("node-readable-to-web-readable-stream"); + return polyfill.makeDefaultReadableStreamFromNodeReadable(readStream); +} + +function getArrayBuffer(val) { + if (val instanceof Uint8Array) { + return val.buffer; + } + if (val instanceof ArrayBuffer) { + return val; + } + warn(`getArrayBuffer - unexpected data format: ${val}`); + return new Uint8Array(val).buffer; +} + class PDFNodeStream { constructor(source) { this.source = source; @@ -59,11 +86,11 @@ class PDFNodeStream { return this._fullRequestReader; } - getRangeReader(start, end) { + getRangeReader(begin, end) { if (end <= this._progressiveDataLength) { return null; } - const rangeReader = new PDFNodeStreamFsRangeReader(this, start, end); + const rangeReader = new PDFNodeStreamFsRangeReader(this, begin, end); this._rangeRequestReaders.push(rangeReader); return rangeReader; } @@ -78,10 +105,11 @@ class PDFNodeStream { } class PDFNodeStreamFsFullReader { + _headersCapability = Promise.withResolvers(); + + _reader = null; + constructor(stream) { - this._url = stream.url; - this._done = false; - this._storedError = null; this.onProgress = null; const source = stream.source; this._contentLength = source.length; // optional @@ -97,13 +125,16 @@ class PDFNodeStreamFsFullReader { this._isStreamingSupported = !source.disableStream; this._isRangeSupported = !source.disableRange; - this._readableStream = null; - this._readCapability = Promise.withResolvers(); - this._headersCapability = Promise.withResolvers(); - + const url = stream.url; const fs = process.getBuiltinModule("fs"); - fs.promises.lstat(this._url).then( - stat => { + fs.promises + .lstat(url) + .then(stat => { + const readStream = fs.createReadStream(url); + const readableStream = getReadableStream(readStream); + + this._reader = readableStream.getReader(); + const { size } = stat; if (size <= 2 * this._rangeChunkSize) { // The file size is smaller than the size of two chunks, so it doesn't @@ -113,17 +144,20 @@ class PDFNodeStreamFsFullReader { // Setting right content length. this._contentLength = size; - this._setReadableStream(fs.createReadStream(this._url)); - this._headersCapability.resolve(); - }, - error => { - if (error.code === "ENOENT") { - error = createResponseError(/* status = */ 0, this._url.href); + // We need to stop reading when range is supported and streaming is + // disabled. + if (!this._isStreamingSupported && this._isRangeSupported) { + this.cancel(new AbortException("Streaming is disabled.")); + } + + this._headersCapability.resolve(); + }) + .catch(error => { + if (error.code === "ENOENT") { + error = createResponseError(/* status = */ 0, url.href); } - this._storedError = error; this._headersCapability.reject(error); - } - ); + }); } get headersReady() { @@ -147,91 +181,51 @@ class PDFNodeStreamFsFullReader { } async read() { - await this._readCapability.promise; - if (this._done) { - return { value: undefined, done: true }; + await this._headersCapability.promise; + const { value, done } = await this._reader.read(); + if (done) { + return { value, done }; } - if (this._storedError) { - throw this._storedError; - } - - const chunk = this._readableStream.read(); - if (chunk === null) { - this._readCapability = Promise.withResolvers(); - return this.read(); - } - this._loaded += chunk.length; + this._loaded += value.length; this.onProgress?.({ loaded: this._loaded, total: this._contentLength, }); - // Ensure that `read()` method returns ArrayBuffer. - const buffer = new Uint8Array(chunk).buffer; - return { value: buffer, done: false }; + return { value: getArrayBuffer(value), done: false }; } cancel(reason) { - // Call `this._error()` method when cancel is called - // before _readableStream is set. - if (!this._readableStream) { - this._error(reason); - return; - } - this._readableStream.destroy(reason); - } - - _error(reason) { - this._storedError = reason; - this._readCapability.resolve(); - } - - _setReadableStream(readableStream) { - this._readableStream = readableStream; - readableStream.on("readable", () => { - this._readCapability.resolve(); - }); - - readableStream.on("end", () => { - // Destroy readable to minimize resource usage. - readableStream.destroy(); - this._done = true; - this._readCapability.resolve(); - }); - - readableStream.on("error", reason => { - this._error(reason); - }); - - // We need to stop reading when range is supported and streaming is - // disabled. - if (!this._isStreamingSupported && this._isRangeSupported) { - this._error(new AbortException("streaming is disabled")); - } - - // Destroy ReadableStream if already in errored state. - if (this._storedError) { - this._readableStream.destroy(this._storedError); - } + this._reader?.cancel(reason); } } class PDFNodeStreamFsRangeReader { - constructor(stream, start, end) { - this._url = stream.url; - this._done = false; - this._storedError = null; + _readCapability = Promise.withResolvers(); + + _reader = null; + + constructor(stream, begin, end) { this.onProgress = null; this._loaded = 0; - this._readableStream = null; - this._readCapability = Promise.withResolvers(); const source = stream.source; this._isStreamingSupported = !source.disableStream; + const url = stream.url; const fs = process.getBuiltinModule("fs"); - this._setReadableStream( - fs.createReadStream(this._url, { start, end: end - 1 }) - ); + try { + const readStream = fs.createReadStream(url, { + start: begin, + end: end - 1, + }); + const readableStream = getReadableStream(readStream); + + this._reader = readableStream.getReader(); + + this._readCapability.resolve(); + } catch (error) { + this._readCapability.reject(error); + } } get isStreamingSupported() { @@ -240,62 +234,18 @@ class PDFNodeStreamFsRangeReader { async read() { await this._readCapability.promise; - if (this._done) { - return { value: undefined, done: true }; + const { value, done } = await this._reader.read(); + if (done) { + return { value, done }; } - if (this._storedError) { - throw this._storedError; - } - - const chunk = this._readableStream.read(); - if (chunk === null) { - this._readCapability = Promise.withResolvers(); - return this.read(); - } - this._loaded += chunk.length; + this._loaded += value.length; this.onProgress?.({ loaded: this._loaded }); - // Ensure that `read()` method returns ArrayBuffer. - const buffer = new Uint8Array(chunk).buffer; - return { value: buffer, done: false }; + return { value: getArrayBuffer(value), done: false }; } cancel(reason) { - // Call `this._error()` method when cancel is called - // before _readableStream is set. - if (!this._readableStream) { - this._error(reason); - return; - } - this._readableStream.destroy(reason); - } - - _error(reason) { - this._storedError = reason; - this._readCapability.resolve(); - } - - _setReadableStream(readableStream) { - this._readableStream = readableStream; - readableStream.on("readable", () => { - this._readCapability.resolve(); - }); - - readableStream.on("end", () => { - // Destroy readableStream to minimize resource usage. - readableStream.destroy(); - this._done = true; - this._readCapability.resolve(); - }); - - readableStream.on("error", reason => { - this._error(reason); - }); - - // Destroy readableStream if already in errored state. - if (this._storedError) { - this._readableStream.destroy(this._storedError); - } + this._reader?.cancel(reason); } }