Add an abstract BasePDFStreamReader class, that all the old IPDFStreamReader implementations inherit from

Given that there's no less than *five* different, but very similar, implementations this helps reduce code duplication and simplifies maintenance.

Also, remove the `rangeChunkSize` not defined checks in all the relevant stream-constructor implementations.
Note how the API, since some time, always validates *and* provides that parameter when creating a `BasePDFStreamReader`-instance.
This commit is contained in:
Jonas Jenwald 2026-01-30 08:01:55 +01:00
parent 4a8fb4dde1
commit 54d8c5e7b4
7 changed files with 184 additions and 269 deletions

View File

@ -13,7 +13,10 @@
* limitations under the License.
*/
import { BasePDFStream } from "../shared/base_pdf_stream.js";
import {
BasePDFStream,
BasePDFStreamReader,
} from "../shared/base_pdf_stream.js";
class PDFWorkerStream extends BasePDFStream {
constructor(source) {
@ -21,42 +24,23 @@ class PDFWorkerStream extends BasePDFStream {
}
}
/** @implements {IPDFStreamReader} */
class PDFWorkerStreamReader {
constructor(stream) {
const { msgHandler } = stream._source;
this.onProgress = null;
class PDFWorkerStreamReader extends BasePDFStreamReader {
_reader = null;
this._contentLength = null;
this._isRangeSupported = false;
this._isStreamingSupported = false;
constructor(stream) {
super(stream);
const { msgHandler } = stream._source;
const readableStream = msgHandler.sendWithStream("GetReader");
this._reader = readableStream.getReader();
this._headersReady = msgHandler
.sendWithPromise("ReaderHeadersReady")
.then(data => {
this._isStreamingSupported = data.isStreamingSupported;
this._isRangeSupported = data.isRangeSupported;
this._contentLength = data.contentLength;
});
}
msgHandler.sendWithPromise("ReaderHeadersReady").then(data => {
this._contentLength = data.contentLength;
this._isStreamingSupported = data.isStreamingSupported;
this._isRangeSupported = data.isRangeSupported;
get headersReady() {
return this._headersReady;
}
get contentLength() {
return this._contentLength;
}
get isStreamingSupported() {
return this._isStreamingSupported;
}
get isRangeSupported() {
return this._isRangeSupported;
this._headersCapability.resolve();
}, this._headersCapability.reject);
}
async read() {

View File

@ -14,6 +14,10 @@
*/
import { AbortException, warn } from "../shared/util.js";
import {
BasePDFStream,
BasePDFStreamReader,
} from "../shared/base_pdf_stream.js";
import {
createHeaders,
createResponseError,
@ -22,7 +26,6 @@ import {
validateRangeRequestCapabilities,
validateResponseStatus,
} from "./network_utils.js";
import { BasePDFStream } from "../shared/base_pdf_stream.js";
if (typeof PDFJSDev !== "undefined" && PDFJSDev.test("MOZCENTRAL")) {
throw new Error(
@ -30,15 +33,15 @@ if (typeof PDFJSDev !== "undefined" && PDFJSDev.test("MOZCENTRAL")) {
);
}
function createFetchOptions(headers, withCredentials, abortController) {
return {
function fetchUrl(url, headers, withCredentials, abortController) {
return fetch(url, {
method: "GET",
headers,
signal: abortController.signal,
mode: "cors",
credentials: withCredentials ? "include" : "same-origin",
redirect: "follow",
};
});
}
function getArrayBuffer(val) {
@ -62,34 +65,29 @@ class PDFFetchStream extends BasePDFStream {
}
}
/** @implements {IPDFStreamReader} */
class PDFFetchStreamReader {
constructor(stream) {
this._stream = stream;
this._reader = null;
this._loaded = 0;
this._filename = null;
const source = stream._source;
this._withCredentials = source.withCredentials || false;
this._contentLength = source.length;
this._headersCapability = Promise.withResolvers();
this._disableRange = source.disableRange || false;
this._rangeChunkSize = source.rangeChunkSize;
if (!this._rangeChunkSize && !this._disableRange) {
this._disableRange = true;
}
class PDFFetchStreamReader extends BasePDFStreamReader {
_abortController = new AbortController();
this._abortController = new AbortController();
this._isStreamingSupported = !source.disableStream;
this._isRangeSupported = !source.disableRange;
_reader = null;
constructor(stream) {
super(stream);
const {
disableRange,
disableStream,
length,
rangeChunkSize,
url,
withCredentials,
} = stream._source;
this._contentLength = length;
this._isStreamingSupported = !disableStream;
this._isRangeSupported = !disableRange;
// Always create a copy of the headers.
const headers = new Headers(stream.headers);
const url = source.url;
fetch(
url,
createFetchOptions(headers, this._withCredentials, this._abortController)
)
fetchUrl(url, headers, withCredentials, this._abortController)
.then(response => {
stream._responseOrigin = getResponseOrigin(response.url);
@ -97,7 +95,6 @@ class PDFFetchStreamReader {
throw createResponseError(response.status, url);
}
this._reader = response.body.getReader();
this._headersCapability.resolve();
const responseHeaders = response.headers;
@ -105,8 +102,8 @@ class PDFFetchStreamReader {
validateRangeRequestCapabilities({
responseHeaders,
isHttp: stream.isHttp,
rangeChunkSize: this._rangeChunkSize,
disableRange: this._disableRange,
rangeChunkSize,
disableRange,
});
this._isRangeSupported = allowRangeRequests;
@ -120,30 +117,10 @@ class PDFFetchStreamReader {
if (!this._isStreamingSupported && this._isRangeSupported) {
this.cancel(new AbortException("Streaming is disabled."));
}
this._headersCapability.resolve();
})
.catch(this._headersCapability.reject);
this.onProgress = null;
}
get headersReady() {
return this._headersCapability.promise;
}
get filename() {
return this._filename;
}
get contentLength() {
return this._contentLength;
}
get isRangeSupported() {
return this._isRangeSupported;
}
get isStreamingSupported() {
return this._isStreamingSupported;
}
async read() {
@ -182,10 +159,7 @@ class PDFFetchStreamRangeReader {
headers.append("Range", `bytes=${begin}-${end - 1}`);
const url = source.url;
fetch(
url,
createFetchOptions(headers, this._withCredentials, this._abortController)
)
fetchUrl(url, headers, this._withCredentials, this._abortController)
.then(response => {
const responseOrigin = getResponseOrigin(response.url);

View File

@ -14,6 +14,10 @@
*/
import { assert, stringToBytes, warn } from "../shared/util.js";
import {
BasePDFStream,
BasePDFStreamReader,
} from "../shared/base_pdf_stream.js";
import {
createHeaders,
createResponseError,
@ -21,7 +25,6 @@ import {
getResponseOrigin,
validateRangeRequestCapabilities,
} from "./network_utils.js";
import { BasePDFStream } from "../shared/base_pdf_stream.js";
if (typeof PDFJSDev !== "undefined" && PDFJSDev.test("MOZCENTRAL")) {
throw new Error(
@ -42,7 +45,7 @@ class PDFNetworkStream extends BasePDFStream {
_responseOrigin = null;
constructor(source) {
super(source, PDFNetworkStreamFullReader, PDFNetworkStreamRangeReader);
super(source, PDFNetworkStreamReader, PDFNetworkStreamRangeReader);
this.url = source.url;
this.isHttp = /^https?:/i.test(this.url);
this.headers = createHeaders(this.isHttp, source.httpHeaders);
@ -166,40 +169,34 @@ class PDFNetworkStream extends BasePDFStream {
}
}
/** @implements {IPDFStreamReader} */
class PDFNetworkStreamFullReader {
class PDFNetworkStreamReader extends BasePDFStreamReader {
_cachedChunks = [];
_done = false;
_requests = [];
_storedError = null;
constructor(stream) {
this._stream = stream;
const { disableRange, length, rangeChunkSize } = stream._source;
super(stream);
const { length } = stream._source;
this._contentLength = length;
// Note that `XMLHttpRequest` doesn't support streaming, and range requests
// will be enabled (if supported) in `this.#onHeadersReceived` below.
this._fullRequestXhr = stream._request({
onHeadersReceived: this._onHeadersReceived.bind(this),
onDone: this._onDone.bind(this),
onError: this._onError.bind(this),
onProgress: this._onProgress.bind(this),
onHeadersReceived: this.#onHeadersReceived.bind(this),
onDone: this.#onDone.bind(this),
onError: this.#onError.bind(this),
onProgress: this.#onProgress.bind(this),
});
this._headersCapability = Promise.withResolvers();
this._disableRange = disableRange || false;
this._contentLength = length; // Optional
this._rangeChunkSize = rangeChunkSize;
if (!this._rangeChunkSize && !this._disableRange) {
this._disableRange = true;
}
this._isStreamingSupported = false;
this._isRangeSupported = false;
this._cachedChunks = [];
this._requests = [];
this._done = false;
this._storedError = undefined;
this._filename = null;
this.onProgress = null;
}
_onHeadersReceived() {
#onHeadersReceived() {
const stream = this._stream;
const { disableRange, rangeChunkSize } = stream._source;
const fullRequestXhr = this._fullRequestXhr;
stream._responseOrigin = getResponseOrigin(fullRequestXhr.responseURL);
@ -222,8 +219,8 @@ class PDFNetworkStreamFullReader {
validateRangeRequestCapabilities({
responseHeaders,
isHttp: stream.isHttp,
rangeChunkSize: this._rangeChunkSize,
disableRange: this._disableRange,
rangeChunkSize,
disableRange,
});
if (allowRangeRequests) {
@ -245,10 +242,10 @@ class PDFNetworkStreamFullReader {
this._headersCapability.resolve();
}
_onDone(chunk) {
#onDone(chunk) {
if (this._requests.length > 0) {
const requestCapability = this._requests.shift();
requestCapability.resolve({ value: chunk, done: false });
const capability = this._requests.shift();
capability.resolve({ value: chunk, done: false });
} else {
this._cachedChunks.push(chunk);
}
@ -256,49 +253,29 @@ class PDFNetworkStreamFullReader {
if (this._cachedChunks.length > 0) {
return;
}
for (const requestCapability of this._requests) {
requestCapability.resolve({ value: undefined, done: true });
for (const capability of this._requests) {
capability.resolve({ value: undefined, done: true });
}
this._requests.length = 0;
}
_onError(status) {
#onError(status) {
this._storedError = createResponseError(status, this._stream.url);
this._headersCapability.reject(this._storedError);
for (const requestCapability of this._requests) {
requestCapability.reject(this._storedError);
for (const capability of this._requests) {
capability.reject(this._storedError);
}
this._requests.length = 0;
this._cachedChunks.length = 0;
}
_onProgress(evt) {
#onProgress(evt) {
this.onProgress?.({
loaded: evt.loaded,
total: evt.lengthComputable ? evt.total : this._contentLength,
});
}
get filename() {
return this._filename;
}
get isRangeSupported() {
return this._isRangeSupported;
}
get isStreamingSupported() {
return this._isStreamingSupported;
}
get contentLength() {
return this._contentLength;
}
get headersReady() {
return this._headersCapability.promise;
}
async read() {
await this._headersCapability.promise;
@ -312,16 +289,16 @@ class PDFNetworkStreamFullReader {
if (this._done) {
return { value: undefined, done: true };
}
const requestCapability = Promise.withResolvers();
this._requests.push(requestCapability);
return requestCapability.promise;
const capability = Promise.withResolvers();
this._requests.push(capability);
return capability.promise;
}
cancel(reason) {
this._done = true;
this._headersCapability.reject(reason);
for (const requestCapability of this._requests) {
requestCapability.resolve({ value: undefined, done: true });
for (const capability of this._requests) {
capability.resolve({ value: undefined, done: true });
}
this._requests.length = 0;

View File

@ -15,7 +15,10 @@
/* globals process */
import { AbortException, assert, warn } from "../shared/util.js";
import { BasePDFStream } from "../shared/base_pdf_stream.js";
import {
BasePDFStream,
BasePDFStreamReader,
} from "../shared/base_pdf_stream.js";
import { createResponseError } from "./network_utils.js";
if (typeof PDFJSDev !== "undefined" && PDFJSDev.test("MOZCENTRAL")) {
@ -63,7 +66,7 @@ function getArrayBuffer(val) {
class PDFNodeStream extends BasePDFStream {
constructor(source) {
super(source, PDFNodeStreamFsFullReader, PDFNodeStreamFsRangeReader);
super(source, PDFNodeStreamReader, PDFNodeStreamFsRangeReader);
this.url = parseUrlOrPath(source.url);
assert(
this.url.protocol === "file:",
@ -72,26 +75,17 @@ class PDFNodeStream extends BasePDFStream {
}
}
class PDFNodeStreamFsFullReader {
_headersCapability = Promise.withResolvers();
class PDFNodeStreamReader extends BasePDFStreamReader {
_reader = null;
constructor(stream) {
this.onProgress = null;
const source = stream._source;
this._contentLength = source.length; // optional
this._loaded = 0;
this._filename = null;
super(stream);
const { disableRange, disableStream, length, rangeChunkSize } =
stream._source;
this._disableRange = source.disableRange || false;
this._rangeChunkSize = source.rangeChunkSize;
if (!this._rangeChunkSize && !this._disableRange) {
this._disableRange = true;
}
this._isStreamingSupported = !source.disableStream;
this._isRangeSupported = !source.disableRange;
this._contentLength = length;
this._isStreamingSupported = !disableStream;
this._isRangeSupported = !disableRange;
const url = stream.url;
const fs = process.getBuiltinModule("fs");
@ -104,7 +98,7 @@ class PDFNodeStreamFsFullReader {
this._reader = readableStream.getReader();
const { size } = stat;
if (size <= 2 * this._rangeChunkSize) {
if (size <= 2 * rangeChunkSize) {
// The file size is smaller than the size of two chunks, so it doesn't
// make any sense to abort the request and retry with a range request.
this._isRangeSupported = false;
@ -128,26 +122,6 @@ class PDFNodeStreamFsFullReader {
});
}
get headersReady() {
return this._headersCapability.promise;
}
get filename() {
return this._filename;
}
get contentLength() {
return this._contentLength;
}
get isRangeSupported() {
return this._isRangeSupported;
}
get isStreamingSupported() {
return this._isStreamingSupported;
}
async read() {
await this._headersCapability.promise;
const { value, done } = await this._reader.read();

View File

@ -13,12 +13,14 @@
* limitations under the License.
*/
/** @typedef {import("../interfaces").IPDFStreamReader} IPDFStreamReader */
// eslint-disable-next-line max-len
/** @typedef {import("../interfaces").IPDFStreamRangeReader} IPDFStreamRangeReader */
import {
BasePDFStream,
BasePDFStreamReader,
} from "../shared/base_pdf_stream.js";
import { assert } from "../shared/util.js";
import { BasePDFStream } from "../shared/base_pdf_stream.js";
import { isPdfFile } from "./display_utils.js";
function getArrayBuffer(val) {
@ -30,7 +32,7 @@ function getArrayBuffer(val) {
}
class PDFDataTransportStream extends BasePDFStream {
_pdfDataRangeTransport = null;
_progressiveDone = false;
_queuedChunks = [];
@ -40,22 +42,14 @@ class PDFDataTransportStream extends BasePDFStream {
PDFDataTransportStreamReader,
PDFDataTransportStreamRangeReader
);
const { pdfDataRangeTransport, disableRange, disableStream } = source;
const { length, initialData, progressiveDone, contentDispositionFilename } =
pdfDataRangeTransport;
this._progressiveDone = progressiveDone;
this._contentDispositionFilename = contentDispositionFilename;
const { pdfDataRangeTransport } = source;
const { initialData, progressiveDone } = pdfDataRangeTransport;
if (initialData?.length > 0) {
const buffer = getArrayBuffer(initialData);
this._queuedChunks.push(buffer);
}
this._pdfDataRangeTransport = pdfDataRangeTransport;
this._isStreamingSupported = !disableStream;
this._isRangeSupported = !disableRange;
this._contentLength = length;
this._progressiveDone = progressiveDone;
pdfDataRangeTransport.addRangeListener((begin, chunk) => {
this.#onReceiveData(begin, chunk);
@ -113,7 +107,7 @@ class PDFDataTransportStream extends BasePDFStream {
if (reader) {
reader.onDone = () => this._rangeReaders.delete(reader);
this._pdfDataRangeTransport.requestDataRange(begin, end);
this._source.pdfDataRangeTransport.requestDataRange(begin, end);
}
return reader;
}
@ -121,27 +115,37 @@ class PDFDataTransportStream extends BasePDFStream {
cancelAllRequests(reason) {
super.cancelAllRequests(reason);
this._pdfDataRangeTransport.abort();
this._source.pdfDataRangeTransport.abort();
}
}
/** @implements {IPDFStreamReader} */
class PDFDataTransportStreamReader {
class PDFDataTransportStreamReader extends BasePDFStreamReader {
_done = false;
_queuedChunks = null;
_requests = [];
constructor(stream) {
this._stream = stream;
this._done = stream._progressiveDone || false;
this._filename = isPdfFile(stream._contentDispositionFilename)
? stream._contentDispositionFilename
: null;
super(stream);
const { pdfDataRangeTransport, disableRange, disableStream } =
stream._source;
const { length, contentDispositionFilename } = pdfDataRangeTransport;
this._queuedChunks = stream._queuedChunks || [];
this._loaded = 0;
for (const chunk of this._queuedChunks) {
this._loaded += chunk.byteLength;
}
this._requests = [];
this._headersReady = Promise.resolve();
this._done = stream._progressiveDone;
this.onProgress = null;
this._contentLength = length;
this._isStreamingSupported = !disableStream;
this._isRangeSupported = !disableRange;
if (isPdfFile(contentDispositionFilename)) {
this._filename = contentDispositionFilename;
}
this._headersCapability.resolve();
}
_enqueue(chunk) {
@ -149,34 +153,14 @@ class PDFDataTransportStreamReader {
return; // Ignore new data.
}
if (this._requests.length > 0) {
const requestCapability = this._requests.shift();
requestCapability.resolve({ value: chunk, done: false });
const capability = this._requests.shift();
capability.resolve({ value: chunk, done: false });
} else {
this._queuedChunks.push(chunk);
}
this._loaded += chunk.byteLength;
}
get headersReady() {
return this._headersReady;
}
get filename() {
return this._filename;
}
get isRangeSupported() {
return this._stream._isRangeSupported;
}
get isStreamingSupported() {
return this._stream._isStreamingSupported;
}
get contentLength() {
return this._stream._contentLength;
}
async read() {
if (this._queuedChunks.length > 0) {
const chunk = this._queuedChunks.shift();
@ -185,24 +169,21 @@ class PDFDataTransportStreamReader {
if (this._done) {
return { value: undefined, done: true };
}
const requestCapability = Promise.withResolvers();
this._requests.push(requestCapability);
return requestCapability.promise;
const capability = Promise.withResolvers();
this._requests.push(capability);
return capability.promise;
}
cancel(reason) {
this._done = true;
for (const requestCapability of this._requests) {
requestCapability.resolve({ value: undefined, done: true });
for (const capability of this._requests) {
capability.resolve({ value: undefined, done: true });
}
this._requests.length = 0;
}
progressiveDone() {
if (this._done) {
return;
}
this._done = true;
this._done ||= true;
}
}

View File

@ -94,18 +94,38 @@ class BasePDFStream {
/**
* Interface for a PDF binary data reader.
*
* @interface
*/
class IPDFStreamReader {
constructor() {
/**
* Sets or gets the progress callback. The callback can be useful when the
* isStreamingSupported property of the object is defined as false.
* The callback is called with one parameter: an object with the loaded and
* total properties.
*/
this.onProgress = null;
class BasePDFStreamReader {
/**
* Sets or gets the progress callback. The callback can be useful when the
* isStreamingSupported property of the object is defined as false.
* The callback is called with one parameter: an object with the loaded and
* total properties.
*/
onProgress = null;
_contentLength = 0;
_filename = null;
_headersCapability = Promise.withResolvers();
_isRangeSupported = false;
_isStreamingSupported = false;
_loaded = 0;
_stream = null;
constructor(stream) {
if (
(typeof PDFJSDev === "undefined" || PDFJSDev.test("TESTING")) &&
this.constructor === BasePDFStreamReader
) {
unreachable("Cannot initialize BasePDFStreamReader.");
}
this._stream = stream;
}
/**
@ -114,7 +134,7 @@ class IPDFStreamReader {
* @type {Promise}
*/
get headersReady() {
return Promise.resolve();
return this._headersCapability.promise;
}
/**
@ -124,7 +144,7 @@ class IPDFStreamReader {
* header is missing/invalid.
*/
get filename() {
return null;
return this._filename;
}
/**
@ -133,7 +153,7 @@ class IPDFStreamReader {
* @type {number} The data length (or 0 if unknown).
*/
get contentLength() {
return 0;
return this._contentLength;
}
/**
@ -143,7 +163,7 @@ class IPDFStreamReader {
* @type {boolean}
*/
get isRangeSupported() {
return false;
return this._isRangeSupported;
}
/**
@ -152,7 +172,7 @@ class IPDFStreamReader {
* @type {boolean}
*/
get isStreamingSupported() {
return false;
return this._isStreamingSupported;
}
/**
@ -163,13 +183,17 @@ class IPDFStreamReader {
* set to true.
* @returns {Promise}
*/
async read() {}
async read() {
unreachable("Abstract method `read` called");
}
/**
* Cancels all pending read requests and closes the stream.
* @param {Object} reason
*/
cancel(reason) {}
cancel(reason) {
unreachable("Abstract method `cancel` called");
}
}
/**
@ -195,4 +219,4 @@ class IPDFStreamRangeReader {
cancel(reason) {}
}
export { BasePDFStream, IPDFStreamRangeReader, IPDFStreamReader };
export { BasePDFStream, BasePDFStreamReader, IPDFStreamRangeReader };

View File

@ -35,6 +35,7 @@ describe("fetch_stream", function () {
it("read with streaming", async function () {
const stream = new PDFFetchStream({
url: getPdfUrl(),
rangeChunkSize: 32768,
disableStream: false,
disableRange: true,
});