From 007d729f1476f7f1ea34731ba9bd2becb702117e Mon Sep 17 00:00:00 2001 From: Luca Casonato Date: Sat, 30 Sep 2023 14:48:45 +0900 Subject: [PATCH] TransformStream cleanup using transformer.cancel Add a "cancel" hook to "Transformer". This allows users to perform resource cleanup when the readable side of the TransformStream is cancelled, or the writable side is aborted. To preserve existing behavior, when the readable side is cancelled with a reason, the writable side is always immediately aborted with that same reason. The same is true in the reverse case. This means that the status of both sides is always either "closed", "erroring", or "erroring" when the "cancel" hook is called. "flush" and "cancel" are never both called. As per existing behavior, when the writable side is closed the "flush" hook is called. If the readable side is cancelled while a promise returned from "flush" is still pending, "cancel" is not called. In this scenario the readable side ends up in the "errored" state, while the writable side ends up in the "closed" state. --- index.bs | 156 +++++++++++++++--- .../lib/Transformer.webidl | 2 + .../lib/abstract-ops/transform-streams.js | 117 +++++++++++-- reference-implementation/web-platform-tests | 2 +- 4 files changed, 234 insertions(+), 43 deletions(-) diff --git a/index.bs b/index.bs index fd4c34d2c..e05b6fd85 100644 --- a/index.bs +++ b/index.bs @@ -5501,6 +5501,7 @@ dictionary Transformer { TransformerStartCallback start; TransformerTransformCallback transform; TransformerFlushCallback flush; + TransformerCancelCallback cancel; any readableType; any writableType; }; @@ -5508,6 +5509,7 @@ dictionary Transformer { callback TransformerStartCallback = any (TransformStreamDefaultController controller); callback TransformerFlushCallback = Promise (TransformStreamDefaultController controller); callback TransformerTransformCallback = Promise (any chunk, TransformStreamDefaultController controller); +callback TransformerCancelCallback = Promise (any reason);
@@ -5570,6 +5572,25 @@ callback TransformerTransformCallback = Promise (any chunk, Transform {{Transformer/flush|flush()}}; the stream is already in the process of successfully closing down, and terminating it would be counterproductive.) +
cancel(reason)
+
+

A function called when the [=readable side=] is cancelled, or when the [=writable side=] is + aborted. + +

Typically this is used to clean up underlying transformer resources when the stream is aborted + or cancelled. + +

If the cancellation process is asynchronous, the function can return a promise to signal + success or failure; the result will be communicated to the caller of + {{WritableStream/abort()|stream.writable.abort()}} or + {{ReadableStream/cancel()|stream.readable.cancel()}}. Throwing an exception is treated the same + as returning a rejected promise. + +

(Note that there is no need to call + {{TransformStreamDefaultController/terminate()|controller.terminate()}} inside + {{Transformer/cancel|cancel()}}; the stream is already in the process of cancelling/aborting, and + terminating it would be counterproductive.) +

readableType

This property is reserved for future use, so any attempts to supply a value will throw an @@ -5583,8 +5604,8 @@ callback TransformerTransformCallback = Promise (any chunk, Transform The controller object passed to {{Transformer/start|start()}}, {{Transformer/transform|transform()}}, and {{Transformer/flush|flush()}} is an instance of -{{TransformStreamDefaultController}}, and has the ability to enqueue [=chunks=] to the [=readable -side=], or to terminate or error the stream. +{{TransformStreamDefaultController}}, and has the ability to enqueue [=chunks=] to the +[=readable side=], or to terminate or error the stream.

Constructor and properties

@@ -5738,6 +5759,16 @@ the following table: Internal Slot Description (non-normative) + + \[[cancelAlgorithm]] + A promise-returning algorithm, taking one argument (the reason for + cancellation), which communicates a requested cancellation to the [=transformer=] + + \[[finishPromise]] + A promise which resolves on completion of either the + [=TransformStreamDefaultController/[[cancelAlgorithm]]=] or the + [=TransformStreamDefaultController/[[flushAlgorithm]]=]. If this field is unpopulated (that is, + undefined), then neither of those algorithms have been [=invoked=] yet \[[flushAlgorithm]] A promise-returning algorithm which communicates a requested close to @@ -5831,8 +5862,7 @@ The following abstract operations operate on {{TransformStream}} instances at a 1. Let |pullAlgorithm| be the following steps: 1. Return ! [$TransformStreamDefaultSourcePullAlgorithm$](|stream|). 1. Let |cancelAlgorithm| be the following steps, taking a |reason| argument: - 1. Perform ! [$TransformStreamErrorWritableAndUnblockWrite$](|stream|, |reason|). - 1. Return [=a promise resolved with=] undefined. + 1. Return ! [$TransformStreamDefaultSourceCancelAlgorithm$](|stream|, |reason|). 1. Set |stream|.[=TransformStream/[[readable]]=] to ! [$CreateReadableStream$](|startAlgorithm|, |pullAlgorithm|, |cancelAlgorithm|, |readableHighWaterMark|, |readableSizeAlgorithm|). 1. Set |stream|.[=TransformStream/[[backpressure]]=] and @@ -5866,12 +5896,7 @@ The following abstract operations operate on {{TransformStream}} instances at a 1. Perform ! [$TransformStreamDefaultControllerClearAlgorithms$](|stream|.[=TransformStream/[[controller]]=]). 1. Perform ! [$WritableStreamDefaultControllerErrorIfNeeded$](|stream|.[=TransformStream/[[writable]]=].[=WritableStream/[[controller]]=], |e|). - 1. If |stream|.[=TransformStream/[[backpressure]]=] is true, perform ! [$TransformStreamSetBackpressure$](|stream|, - false). - -

The [$TransformStreamDefaultSinkWriteAlgorithm$] abstract operation could be - waiting for the promise stored in the [=TransformStream/[[backpressureChangePromise]]=] slot to - resolve. The call to [$TransformStreamSetBackpressure$] ensures that the promise always resolves. + 1. Perform ! [$TransformStreamUnblockWrite$](|stream|).

@@ -5886,6 +5911,19 @@ The following abstract operations operate on {{TransformStream}} instances at a 1. Set |stream|.[=TransformStream/[[backpressure]]=] to |backpressure|.
+
+ TransformStreamUnblockWrite(|stream|) performs the + following steps: + + 1. If |stream|.[=TransformStream/[[backpressure]]=] is true, perform ! [$TransformStreamSetBackpressure$](|stream|, + false). + +

The [$TransformStreamDefaultSinkWriteAlgorithm$] abstract operation could be + waiting for the promise stored in the [=TransformStream/[[backpressureChangePromise]]=] slot to + resolve. The call to [$TransformStreamSetBackpressure$] ensures that the promise always resolves. +

+

Default controllers

The following abstract operations support the implementaiton of the @@ -5894,7 +5932,8 @@ The following abstract operations support the implementaiton of the
SetUpTransformStreamDefaultController(|stream|, - |controller|, |transformAlgorithm|, |flushAlgorithm|) performs the following steps: + |controller|, |transformAlgorithm|, |flushAlgorithm|, |cancelAlgorithm|) performs the + following steps: 1. Assert: |stream| [=implements=] {{TransformStream}}. 1. Assert: |stream|.[=TransformStream/[[controller]]=] is undefined. @@ -5903,6 +5942,7 @@ The following abstract operations support the implementaiton of the 1. Set |controller|.[=TransformStreamDefaultController/[[transformAlgorithm]]=] to |transformAlgorithm|. 1. Set |controller|.[=TransformStreamDefaultController/[[flushAlgorithm]]=] to |flushAlgorithm|. + 1. Set |controller|.[=TransformStreamDefaultController/[[cancelAlgorithm]]=] to |cancelAlgorithm|.
@@ -5916,6 +5956,7 @@ The following abstract operations support the implementaiton of the 1. If |result| is an abrupt completion, return [=a promise rejected with=] |result|.\[[Value]]. 1. Otherwise, return [=a promise resolved with=] undefined. 1. Let |flushAlgorithm| be an algorithm which returns [=a promise resolved with=] undefined. + 1. Let |cancelAlgorithm| be an algorithm which returns [=a promise resolved with=] undefined. 1. If |transformerDict|["{{Transformer/transform}}"] [=map/exists=], set |transformAlgorithm| to an algorithm which takes an argument |chunk| and returns the result of [=invoking=] |transformerDict|["{{Transformer/transform}}"] with argument list « |chunk|, @@ -5923,8 +5964,12 @@ The following abstract operations support the implementaiton of the 1. If |transformerDict|["{{Transformer/flush}}"] [=map/exists=], set |flushAlgorithm| to an algorithm which returns the result of [=invoking=] |transformerDict|["{{Transformer/flush}}"] with argument list « |controller| » and [=callback this value=] |transformer|. + 1. If |transformerDict|["{{Transformer/cancel}}"] [=map/exists=], set |cancelAlgorithm| to an + algorithm which takes an argument |reason| and returns the result of [=invoking=] + |transformerDict|["{{Transformer/cancel}}"] with argument list « |reason| » and + [=callback this value=] |transformer|. 1. Perform ! [$SetUpTransformStreamDefaultController$](|stream|, |controller|, - |transformAlgorithm|, |flushAlgorithm|). + |transformAlgorithm|, |flushAlgorithm|, |cancelAlgorithm|).
@@ -5943,6 +5988,7 @@ The following abstract operations support the implementaiton of the 1. Set |controller|.[=TransformStreamDefaultController/[[transformAlgorithm]]=] to undefined. 1. Set |controller|.[=TransformStreamDefaultController/[[flushAlgorithm]]=] to undefined. + 1. Set |controller|.[=TransformStreamDefaultController/[[cancelAlgorithm]]=] to undefined.
@@ -6033,8 +6079,26 @@ side=] of [=transform streams=]. id="transform-stream-default-sink-abort-algorithm">TransformStreamDefaultSinkAbortAlgorithm(|stream|, |reason|) performs the following steps: - 1. Perform ! [$TransformStreamError$](|stream|, |reason|). - 1. Return [=a promise resolved with=] undefined. + 1. Let |controller| be |stream|.[=TransformStream/[[controller]]=]. + 1. If |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] is not undefined, return + |controller|.[=TransformStreamDefaultController/[[finishPromise]]=]. + 1. Let |readable| be |stream|.[=TransformStream/[[readable]]=]. + 1. Let |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] be a new promise. + 1. Let |cancelPromise| be the result of performing + |controller|.[=TransformStreamDefaultController/[[cancelAlgorithm]]=], passing |reason|. + 1. Perform ! [$TransformStreamDefaultControllerClearAlgorithms$](|controller|). + 1. [=React=] to |cancelPromise|: + 1. If |cancelPromise| was fulfilled, then: + 1. If |readable|.[=ReadableStream/[[state]]=] is "`errored`", [=reject=] + |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with + |readable|.[=ReadableStream/[[storedError]]=]. + 1. Otherwise: + 1. Perform ! [$ReadableStreamDefaultControllerError$](|readable|.[=ReadableStream/[[controller]]=], |reason|). + 1. [=Resolve=] |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with undefined. + 1. If |cancelPromise| was rejected with reason |r|, then: + 1. Perform ! [$ReadableStreamDefaultControllerError$](|readable|.[=ReadableStream/[[controller]]=], |r|). + 1. [=Reject=] |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with |r|. + 1. Return |controller|.[=TransformStreamDefaultController/[[finishPromise]]=].
@@ -6042,20 +6106,26 @@ side=] of [=transform streams=]. id="transform-stream-default-sink-close-algorithm">TransformStreamDefaultSinkCloseAlgorithm(|stream|) performs the following steps: - 1. Let |readable| be |stream|.[=TransformStream/[[readable]]=]. 1. Let |controller| be |stream|.[=TransformStream/[[controller]]=]. + 1. If |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] is not undefined, return + |controller|.[=TransformStreamDefaultController/[[finishPromise]]=]. + 1. Let |readable| be |stream|.[=TransformStream/[[readable]]=]. + 1. Let |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] be a new promise. 1. Let |flushPromise| be the result of performing |controller|.[=TransformStreamDefaultController/[[flushAlgorithm]]=]. 1. Perform ! [$TransformStreamDefaultControllerClearAlgorithms$](|controller|). - 1. Return the result of [=reacting=] to |flushPromise|: + 1. [=React=] to |flushPromise|: 1. If |flushPromise| was fulfilled, then: - 1. If |readable|.[=ReadableStream/[[state]]=] is "`errored`", throw + 1. If |readable|.[=ReadableStream/[[state]]=] is "`errored`", [=reject=] + |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with |readable|.[=ReadableStream/[[storedError]]=]. - 1. Perform ! - [$ReadableStreamDefaultControllerClose$](|readable|.[=ReadableStream/[[controller]]=]). + 1. Otherwise: + 1. Perform ! [$ReadableStreamDefaultControllerClose$](|readable|.[=ReadableStream/[[controller]]=]). + 1. [=Resolve=] |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with undefined. 1. If |flushPromise| was rejected with reason |r|, then: - 1. Perform ! [$TransformStreamError$](|stream|, |r|). - 1. Throw |readable|.[=ReadableStream/[[storedError]]=]. + 1. Perform ! [$ReadableStreamDefaultControllerError$](|readable|.[=ReadableStream/[[controller]]=], |r|). + 1. [=Reject=] |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with |r|. + 1. Return |controller|.[=TransformStreamDefaultController/[[finishPromise]]=].

Default sources

@@ -6063,6 +6133,35 @@ side=] of [=transform streams=]. The following abstract operation is used to implement the [=underlying source=] for the [=readable side=] of [=transform streams=]. +
+ TransformStreamDefaultSourceCancelAlgorithm(|stream|, + |reason|) performs the following steps: + + 1. Let |controller| be |stream|.[=TransformStream/[[controller]]=]. + 1. If |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] is not undefined, return + |controller|.[=TransformStreamDefaultController/[[finishPromise]]=]. + 1. Let |writable| be |stream|.[=TransformStream/[[writable]]=]. + 1. Let |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] be a new promise. + 1. Let |cancelPromise| be the result of performing + |controller|.[=TransformStreamDefaultController/[[cancelAlgorithm]]=], passing |reason|. + 1. Perform ! [$TransformStreamDefaultControllerClearAlgorithms$](|controller|). + 1. [=React=] to |cancelPromise|: + 1. If |cancelPromise| was fulfilled, then: + 1. If |writable|.[=WritableStream/[[state]]=] is "`errored`", [=reject=] + |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with + |writable|.[=WritableStream/[[storedError]]=]. + 1. Otherwise: + 1. Perform ! [$WritableStreamDefaultControllerErrorIfNeeded$](|writable|.[=WritableStream/[[controller]]=], |reason|). + 1. Perform ! [$TransformStreamUnblockWrite$](|stream|). + 1. [=Resolve=] |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with undefined. + 1. If |cancelPromise| was rejected with reason |r|, then: + 1. Perform ! [$WritableStreamDefaultControllerErrorIfNeeded$](|writable|.[=WritableStream/[[controller]]=], |r|). + 1. Perform ! [$TransformStreamUnblockWrite$](|stream|). + 1. [=Reject=] |controller|.[=TransformStreamDefaultController/[[finishPromise]]=] with |r|. + 1. Return |controller|.[=TransformStreamDefaultController/[[finishPromise]]=]. +
+
TransformStreamDefaultSourcePullAlgorithm(|stream|) @@ -7118,9 +7217,10 @@ reason.
To set up a newly-[=new|created-via-Web IDL=] {{TransformStream}} |stream| given an algorithm transformAlgorithm and an optional algorithm flushAlgorithm, perform the following steps. - |transformAlgorithm| and, if given, |flushAlgorithm|, may return a promise. + for="TransformStream/set up">transformAlgorithm, an optional algorithm flushAlgorithm, and an optional algorithm cancelAlgorithm, perform the following steps. + |transformAlgorithm| and, if given, |flushAlgorithm| and |cancelAlgorithm|, may return a promise. 1. Let |writableHighWaterMark| be 1. 1. Let |writableSizeAlgorithm| be an algorithm that returns 1. @@ -7136,12 +7236,18 @@ reason. null otherwise. If this throws an exception |e|, return [=a promise rejected with=] |e|. 1. If |result| is a {{Promise}}, then return |result|. 1. Return [=a promise resolved with=] undefined. + 1. Let |cancelAlgorithmWrapper| be an algorithm that runs these steps given a value |reason|: + 1. Let |result| be the result of running |cancelAlgorithm| given |reason|, if |cancelAlgorithm| + was given, or null otherwise. If this throws an exception |e|, return + [=a promise rejected with=] |e|. + 1. If |result| is a {{Promise}}, then return |result|. + 1. Return [=a promise resolved with=] undefined. 1. Let |startPromise| be [=a promise resolved with=] undefined. 1. Perform ! [$InitializeTransformStream$](|stream|, |startPromise|, |writableHighWaterMark|, |writableSizeAlgorithm|, |readableHighWaterMark|, |readableSizeAlgorithm|). 1. Let |controller| be a [=new=] {{TransformStreamDefaultController}}. 1. Perform ! [$SetUpTransformStreamDefaultController$](|stream|, |controller|, - |transformAlgorithmWrapper|, |flushAlgorithmWrapper|). + |transformAlgorithmWrapper|, |flushAlgorithmWrapper|, |cancelAlgorithmWrapper|). Other specifications should be careful when constructing their [=TransformStream/set up/transformAlgorithm=] to avoid [=in parallel=] reads from the given diff --git a/reference-implementation/lib/Transformer.webidl b/reference-implementation/lib/Transformer.webidl index eefea2b0d..792b8c6c5 100644 --- a/reference-implementation/lib/Transformer.webidl +++ b/reference-implementation/lib/Transformer.webidl @@ -2,6 +2,7 @@ dictionary Transformer { TransformerStartCallback start; TransformerTransformCallback transform; TransformerFlushCallback flush; + TransformerCancelCallback cancel; any readableType; any writableType; }; @@ -9,3 +10,4 @@ dictionary Transformer { callback TransformerStartCallback = any (TransformStreamDefaultController controller); callback TransformerFlushCallback = Promise (TransformStreamDefaultController controller); callback TransformerTransformCallback = Promise (any chunk, TransformStreamDefaultController controller); +callback TransformerCancelCallback = Promise (any reason); diff --git a/reference-implementation/lib/abstract-ops/transform-streams.js b/reference-implementation/lib/abstract-ops/transform-streams.js index 8e3f5fcc3..4edc5f10b 100644 --- a/reference-implementation/lib/abstract-ops/transform-streams.js +++ b/reference-implementation/lib/abstract-ops/transform-streams.js @@ -2,8 +2,8 @@ const assert = require('assert'); const verbose = require('debug')('streams:transform-stream:verbose'); -const { promiseResolvedWith, promiseRejectedWith, newPromise, resolvePromise, transformPromiseWith } = - require('../helpers/webidl.js'); +const { promiseResolvedWith, promiseRejectedWith, newPromise, resolvePromise, rejectPromise, uponPromise, + transformPromiseWith } = require('../helpers/webidl.js'); const { CreateReadableStream, ReadableStreamDefaultControllerClose, ReadableStreamDefaultControllerEnqueue, ReadableStreamDefaultControllerError, ReadableStreamDefaultControllerHasBackpressure, ReadableStreamDefaultControllerCanCloseOrEnqueue } = require('./readable-streams.js'); @@ -51,8 +51,7 @@ function InitializeTransformStream( } function cancelAlgorithm(reason) { - TransformStreamErrorWritableAndUnblockWrite(stream, reason); - return promiseResolvedWith(undefined); + return TransformStreamDefaultSourceCancelAlgorithm(stream, reason); } stream._readable = CreateReadableStream( @@ -77,6 +76,10 @@ function TransformStreamError(stream, e) { function TransformStreamErrorWritableAndUnblockWrite(stream, e) { TransformStreamDefaultControllerClearAlgorithms(stream._controller); WritableStreamDefaultControllerErrorIfNeeded(stream._writable._controller, e); + TransformStreamUnblockWrite(stream); +} + +function TransformStreamUnblockWrite(stream) { if (stream._backpressure === true) { // Pretend that pull() was called to permit any pending write() calls to complete. TransformStreamSetBackpressure() // cannot be called from enqueue() or pull() once the ReadableStream is errored, so this will will be the final time @@ -102,7 +105,8 @@ function TransformStreamSetBackpressure(stream, backpressure) { // Default controllers -function SetUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm) { +function SetUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm, + cancelAlgorithm) { assert(TransformStream.isImpl(stream)); assert(stream._controller === undefined); @@ -111,6 +115,7 @@ function SetUpTransformStreamDefaultController(stream, controller, transformAlgo controller._transformAlgorithm = transformAlgorithm; controller._flushAlgorithm = flushAlgorithm; + controller._cancelAlgorithm = cancelAlgorithm; } function SetUpTransformStreamDefaultControllerFromTransformer(stream, transformer, transformerDict) { @@ -126,6 +131,7 @@ function SetUpTransformStreamDefaultControllerFromTransformer(stream, transforme }; let flushAlgorithm = () => promiseResolvedWith(undefined); + let cancelAlgorithm = () => promiseResolvedWith(undefined); if ('transform' in transformerDict) { transformAlgorithm = chunk => transformerDict.transform.call(transformer, chunk, controller); @@ -133,13 +139,17 @@ function SetUpTransformStreamDefaultControllerFromTransformer(stream, transforme if ('flush' in transformerDict) { flushAlgorithm = () => transformerDict.flush.call(transformer, controller); } + if ('cancel' in transformerDict) { + cancelAlgorithm = reason => transformerDict.cancel.call(transformer, reason); + } - SetUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm); + SetUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm, cancelAlgorithm); } function TransformStreamDefaultControllerClearAlgorithms(controller) { controller._transformAlgorithm = undefined; controller._flushAlgorithm = undefined; + controller._cancelAlgorithm = undefined; } function TransformStreamDefaultControllerEnqueue(controller, chunk) { @@ -221,32 +231,69 @@ function TransformStreamDefaultSinkWriteAlgorithm(stream, chunk) { } function TransformStreamDefaultSinkAbortAlgorithm(stream, reason) { - // abort() is not called synchronously, so it is possible for abort() to be called when the stream is already - // errored. - TransformStreamError(stream, reason); - return promiseResolvedWith(undefined); + verbose('TransformStreamDefaultSinkAbortAlgorithm()'); + + const controller = stream._controller; + if (controller._finishPromise !== undefined) { + return controller._finishPromise; + } + + // stream._readable cannot change after construction, so caching it across a call to user code is safe. + const readable = stream._readable; + + // Assign the _finishPromise now so that if _cancelAlgorithm calls readable.cancel() internally, + // we don't run the _cancelAlgorithm again. + controller._finishPromise = newPromise(); + + const cancelPromise = controller._cancelAlgorithm(reason); + TransformStreamDefaultControllerClearAlgorithms(controller); + + uponPromise(cancelPromise, () => { + if (readable._state === 'errored') { + rejectPromise(controller._finishPromise, readable._storedError); + } else { + ReadableStreamDefaultControllerError(readable._controller, reason); + resolvePromise(controller._finishPromise); + } + }, r => { + ReadableStreamDefaultControllerError(readable._controller, r); + rejectPromise(controller._finishPromise, r); + }); + + return controller._finishPromise; } function TransformStreamDefaultSinkCloseAlgorithm(stream) { verbose('TransformStreamDefaultSinkCloseAlgorithm()'); + const controller = stream._controller; + if (controller._finishPromise !== undefined) { + return controller._finishPromise; + } + // stream._readable cannot change after construction, so caching it across a call to user code is safe. const readable = stream._readable; - const controller = stream._controller; + // Assign the _finishPromise now so that if _flushAlgorithm calls readable.cancel() internally, + // we don't also run the _cancelAlgorithm. + controller._finishPromise = newPromise(); + const flushPromise = controller._flushAlgorithm(); TransformStreamDefaultControllerClearAlgorithms(controller); - // Return a promise that is fulfilled with undefined on success. - return transformPromiseWith(flushPromise, () => { + uponPromise(flushPromise, () => { if (readable._state === 'errored') { - throw readable._storedError; + rejectPromise(controller._finishPromise, readable._storedError); + } else { + ReadableStreamDefaultControllerClose(readable._controller); + resolvePromise(controller._finishPromise); } - ReadableStreamDefaultControllerClose(readable._controller); }, r => { - TransformStreamError(stream, r); - throw readable._storedError; + ReadableStreamDefaultControllerError(readable._controller, r); + rejectPromise(controller._finishPromise, r); }); + + return controller._finishPromise; } // Default sources @@ -264,3 +311,39 @@ function TransformStreamDefaultSourcePullAlgorithm(stream) { // Prevent the next pull() call until there is backpressure. return stream._backpressureChangePromise; } + +function TransformStreamDefaultSourceCancelAlgorithm(stream, reason) { + verbose('TransformStreamDefaultSourceCancelAlgorithm()'); + + const controller = stream._controller; + if (controller._finishPromise !== undefined) { + return controller._finishPromise; + } + + // stream._writable cannot change after construction, so caching it across a call to user code is safe. + const writable = stream._writable; + + // Assign the _finishPromise now so that if _flushAlgorithm calls writable.abort() or + // writable.cancel() internally, we don't run the _cancelAlgorithm again, or also run the + // _flushAlgorithm. + controller._finishPromise = newPromise(); + + const cancelPromise = controller._cancelAlgorithm(reason); + TransformStreamDefaultControllerClearAlgorithms(controller); + + uponPromise(cancelPromise, () => { + if (writable._state === 'errored') { + rejectPromise(controller._finishPromise, writable._storedError); + } else { + WritableStreamDefaultControllerErrorIfNeeded(writable._controller, reason); + TransformStreamUnblockWrite(stream); + resolvePromise(controller._finishPromise); + } + }, r => { + WritableStreamDefaultControllerErrorIfNeeded(writable._controller, r); + TransformStreamUnblockWrite(stream); + rejectPromise(controller._finishPromise, r); + }); + + return controller._finishPromise; +} diff --git a/reference-implementation/web-platform-tests b/reference-implementation/web-platform-tests index 517e945bb..a8872d92b 160000 --- a/reference-implementation/web-platform-tests +++ b/reference-implementation/web-platform-tests @@ -1 +1 @@ -Subproject commit 517e945bbfaf903f37a35c11700eb96662efbdd3 +Subproject commit a8872d92b147fc87200eb0c14fe7a4a9e7cd4f73