1. Introduction
This section is non-normative.
Large swathes of the web platform are built on streaming data: that is, data that is created, processed, and consumed in an incremental fashion, without ever reading all of it into memory. The Streams Standard provides a common set of APIs for creating and interfacing with such streaming data, embodied in readable streams, writable streams, and transform streams.
These APIs have been designed to efficiently map to low-level I/O primitives, including specializations for byte streams where appropriate. They allow easy composition of multiple streams into pipe chains, or can be used directly via readers and writers. Finally, they are designed to automatically provide backpressure and queuing.
This standard provides the base stream primitives which other parts of the web platform can use to expose their
streaming data. For example, [FETCH] exposes Response
bodies as ReadableStream
instances. More generally, the
platform is full of streaming abstractions waiting to be expressed as streams: multimedia streams, file streams,
inter-global communication, and more benefit from being able to process data incrementally instead of buffering it all
into memory and processing it in one go. By providing the foundation for these streams to be exposed to developers, the
Streams Standard enables use cases like:
- Video effects: piping a readable video stream through a transform stream that applies effects in real time.
- Decompression: piping a file stream through a transform stream that selectively decompresses files from a .tgz archive, turning them into
img
elements as the user scrolls through an image gallery. - Image decoding: piping an HTTP response stream through a transform stream that decodes bytes into bitmap data,
and then through another transform that translates bitmaps into PNGs. If installed inside the
fetch
hook of a service worker, this would allow developers to transparently polyfill new image formats. [SERVICE-WORKERS]
Web developers can also use the APIs described here to create their own streams, with the same APIs as those provided by the platform. Other developers can then transparently compose platform-provided streams with those supplied by libraries. In this way, the APIs described here provide unifying abstraction for all streams, encouraging an ecosystem to grow around these shared and composable interfaces.
2. Model
A chunk is a single piece of data that is written to or read from a stream. It can be of any type;
streams can even contain chunks of different types. A chunk will often not be the most atomic unit of data for a given
stream; for example a byte stream might contain chunks consisting of 16 KiB Uint8Array
s, instead of single bytes.
2.1. Readable streams
A readable stream represents a source of data, from which you can read. In other words, data comes out of a readable stream. Concretely, a readable stream is an instance of the ReadableStream
class.
Although a readable stream can be created with arbitrary behavior, most readable streams wrap a lower-level I/O source, called the underlying source. There are two types of underlying source: push sources and pull sources.
Push sources push data at you, whether or not you are listening for it. They may also provide a mechanism for pausing and resuming the flow of data. An example push source is a TCP socket, where data is constantly being pushed from the OS level, at a rate that can be controlled by changing the TCP window size.
Pull sources require you to request data from them. The data may be available synchronously, e.g. if it is held by the operating system’s in-memory buffers, or asynchronously, e.g. if it has to be read from disk. An example pull source is a file handle, where you seek to specific locations and read specific amounts.
Readable streams are designed to wrap both types of sources behind a single, unified interface. For web
developer–created streams, the implementation details of a source are provided by an
object with certain methods and properties that is passed to the ReadableStream()
constructor.
Chunks are enqueued into the stream by the stream’s underlying source. They can then be read one at a
time via the stream’s public interface, in particular by using a readable stream reader acquired using the
stream’s getReader()
method.
Code that reads from a readable stream using its public interface is known as a consumer.
Consumers also have the ability to cancel a readable stream, using its cancel()
method. This indicates that the consumer has lost interest in the stream, and will
immediately close the stream, throw away any queued chunks, and execute any cancellation mechanism of the underlying source.
Consumers can also tee a readable stream using its tee()
method. This will lock the stream, making it no longer directly usable; however, it will
create two new streams, called branches, which can be consumed
independently.
For streams representing bytes, an extended version of the readable stream is provided to handle bytes
efficiently, in particular by minimizing copies. The underlying source for such a readable stream is called
an underlying byte source. A readable stream whose underlying source is an underlying byte source is
sometimes called a readable byte stream. Consumers of a readable byte stream can acquire a BYOB reader using the stream’s getReader()
method.
2.2. Writable streams
A writable stream represents a destination for data, into which you can write. In other words, data
goes in to a writable stream. Concretely, a writable stream is an instance of the WritableStream
class.
Analogously to readable streams, most writable streams wrap a lower-level I/O sink, called the underlying sink. Writable streams work to abstract away some of the complexity of the underlying sink, by queuing subsequent writes and only delivering them to the underlying sink one by one.
Chunks are written to the stream via its public interface, and are passed one at a time to the stream’s underlying sink. For web developer-created streams, the implementation details of the sink are provided by an object with certain methods that is passed to the WritableStream()
constructor.
Code that writes into a writable stream using its public interface is known as a producer.
Producers also have the ability to abort a writable stream, using its abort()
method. This indicates that the producer believes something has gone wrong, and that future
writes should be discontinued. It puts the stream in an errored state, even without a signal from the underlying
sink, and it discards all writes in the stream’s internal queue.
2.3. Transform streams
A transform stream consists of a pair of streams: a writable stream, known as its writable side, and a readable stream, known as its readable side. In a manner specific to the transform stream in question, writes to the writable side result in new data being made available for reading from the readable side.
Concretely, any object with a writable
property and a readable
property can serve as a
transform stream. However, the standard TransformStream
class makes it much easier to create such a pair that is
properly entangled. It wraps a transformer, which defines algorithms for the specific transformation to be
performed. For web developer–created streams, the implementation details of a transformer are provided by an object with certain methods and properties that is passed to the TransformStream()
constructor.
An identity transform stream is a type of transform stream which forwards all chunks written to its writable side to its readable side, without any changes. This can be useful in a variety of scenarios. By default, the TransformStream
constructor will
create an identity transform stream, when no transform()
method is present on the transformer object.
Some examples of potential transform streams include:
- A GZIP compressor, to which uncompressed bytes are written and from which compressed bytes are read;
- A video decoder, to which encoded bytes are written and from which uncompressed video frames are read;
- A text decoder, to which bytes are written and from which strings are read;
- A CSV-to-JSON converter, to which strings representing lines of a CSV file are written and from which corresponding JavaScript objects are read.
2.4. Pipe chains and backpressure
Streams are primarily used by piping them to each other. A readable stream can be piped directly to a
writable stream, using its pipeTo()
method, or it can be piped through one or more transform streams
first, using its pipeThrough()
method.
A set of streams piped together in this way is referred to as a pipe chain. In a pipe chain, the original source is the underlying source of the first readable stream in the chain; the ultimate sink is the underlying sink of the final writable stream in the chain.
Once a pipe chain is constructed, it will propagate signals regarding how fast chunks should flow through it. If any step in the chain cannot yet accept chunks, it propagates a signal backwards through the pipe chain, until eventually the original source is told to stop producing chunks so fast. This process of normalizing flow from the original source according to how fast the chain can process chunks is called backpressure.
Concretely, the original source is given the controller.desiredSize
(or byteController.desiredSize
) value, and can then adjust its rate of data
flow accordingly. This value is derived from the writer.desiredSize
corresponding to the ultimate sink, which gets updated as the ultimate sink finishes writing chunks. The pipeTo()
method used to construct the chain automatically ensures this information propagates back
through the pipe chain.
When teeing a readable stream, the backpressure signals from its two branches will aggregate, such that if neither branch is read from, a backpressure signal will be sent to the underlying source of the original stream.
Piping locks the readable and writable streams, preventing them from being manipulated for the duration of the pipe operation. This allows the implementation to perform important optimizations, such as directly shuttling data from the underlying source to the underlying sink while bypassing many of the intermediate queues.
2.5. Internal queues and queuing strategies
Both readable and writable streams maintain internal queues, which they use for similar purposes. In the case of a readable stream, the internal queue contains chunks that have been enqueued by the underlying source, but not yet read by the consumer. In the case of a writable stream, the internal queue contains chunks which have been written to the stream by the producer, but not yet processed and acknowledged by the underlying sink.
A queuing strategy is an object that determines how a stream should signal backpressure based on the state of its internal queue. The queuing strategy assigns a size to each chunk, and compares the total size of all chunks in the queue to a specified number, known as the high water mark. The resulting difference, high water mark minus total size, is used to determine the desired size to fill the stream’s queue.
For readable streams, an underlying source can use this desired size as a backpressure signal, slowing down chunk generation so as to try to keep the desired size above or at zero. For writable streams, a producer can behave similarly, avoiding writes that would cause the desired size to go negative.
Concretely, a queuing strategy for web developer–created streams is given by any JavaScript object
with a highWaterMark
property. For byte streams the highWaterMark
always has units of bytes. For other streams the default unit is chunks, but a size()
function can be included in the strategy object which returns the size
for a given chunk. This permits the highWaterMark
to be specified in
arbitrary floating-point units.
In JavaScript, such a strategy could be written manually as { highWaterMark: 3, size() { return 1; }}
,
or using the built-in CountQueuingStrategy
class, as new CountQueuingStrategy({ highWaterMark: 3 })
.
2.6. Locking
A readable stream reader, or simply reader, is an object that allows
direct reading of chunks from a readable stream. Without a reader, a consumer can only perform
high-level operations on the readable stream: canceling the stream, or piping the readable stream to a writable stream. A reader is acquired via the stream’s getReader()
method.
A readable byte stream has the ability to vend two types of readers: default readers and BYOB
readers. BYOB ("bring your own buffer") readers allow reading into a developer-supplied buffer, thus minimizing
copies. A non-byte readable stream can only vend default readers. Default readers are instances of the ReadableStreamDefaultReader
class, while BYOB readers are instances of ReadableStreamBYOBReader
.
Similarly, a writable stream writer, or simply writer, is an object that
allows direct writing of chunks to a writable stream. Without a writer, a producer can only perform
the high-level operations of aborting the stream or piping a readable stream
to the writable stream. Writers are represented by the WritableStreamDefaultWriter
class.
Under the covers, these high-level operations actually use a reader or writer themselves.
A given readable or writable stream only has at most one reader or writer at a time. We say in this case the stream is locked, and that the reader or writer is active. This state can be determined using the readableStream.locked
or writableStream.locked
properties.
A reader or writer also has the capability to release
its lock, which makes it no longer active, and allows further readers or writers to be acquired. This is done via
the defaultReader.releaseLock()
, byobReader.releaseLock()
, or writer.releaseLock()
method, as appropriate.
3. Readable streams
3.1. Using readable streams
readableStream. pipeTo( writableStream)
. then(() => console. log( "All data successfully written!" ))
. catch ( e => console. error( "Something went wrong!" , e));
readableStream. pipeTo( new WritableStream({
write( chunk) {
console. log( "Chunk received" , chunk);
},
close() {
console. log( "All data successfully read!" );
},
abort( e) {
console. error( "Something went wrong!" , e);
}
}));
By returning promises from your write()
implementation, you can signal backpressure to the
readable stream.
read()
method to get successive chunks. For example, this code
logs the next chunk in the stream, if available:
const reader = readableStream. getReader();
reader. read(). then(
({ value, done }) => {
if ( done) {
console. log( "The stream was already closed!" );
} else {
console. log( value);
}
},
e => console. error( "The stream became errored and cannot be read from!" , e)
);
This more manual method of reading a stream is mainly useful for library authors building new high-level operations on streams, beyond the provided ones of piping and teeing.
const reader = readableStream. getReader({ mode: "byob" });
let startingAB = new ArrayBuffer( 1024 );
readInto( startingAB)
. then( buffer => console. log( "The first 1024 bytes:" , buffer))
. catch ( e => console. error( "Something went wrong!" , e));
function readInto( buffer, offset = 0 ) {
if ( offset === buffer. byteLength) {
return Promise. resolve( buffer);
}
const view = new Uint8Array( buffer, offset, buffer. byteLength - offset);
return reader. read( view). then( newView => {
return readInto( newView. buffer, offset + newView. byteLength);
});
}
An important thing to note here is that the final buffer
value is different from the startingAB
, but it (and all intermediate buffers) shares the same backing memory allocation. At each
step, the buffer is transferred to a new ArrayBuffer
object. The newView
is a new Uint8Array
, with that ArrayBuffer
object as its buffer
property,
the offset that bytes were written to as its byteOffset
property, and the number of bytes that were
written as its byteLength
property.
3.2. Class ReadableStream
The ReadableStream
class is a concrete instance of the general readable stream concept. It is
adaptable to any chunk type, and maintains an internal queue to keep track of data supplied by the underlying
source but not yet read by any consumer.
3.2.1. Class definition
This section is non-normative.
If one were to write the ReadableStream
class in something close to the syntax of [ECMASCRIPT], it would look
like
class ReadableStream {
constructor( underlyingSource = {}, strategy = {})
get locked()
cancel( reason)
getIterator({ preventCancel } = {})
getReader({ mode } = {})
pipeThrough({ writable, readable },
{ preventClose, preventAbort, preventCancel, signal } = {})
pipeTo( dest, { preventClose, preventAbort, preventCancel, signal } = {})
tee()
[ @@asyncIterator] ({ preventCancel } = {})
}
3.2.2. Internal slots
Instances of ReadableStream
are created with the internal slots described in the following table:
Internal Slot | Description (non-normative) |
---|---|
[[disturbed]] | A boolean flag set to |
[[readableStreamController]] | A ReadableStreamDefaultController or ReadableByteStreamController created with
the ability to control the state and queue of this stream; also used for the IsReadableStream brand check
|
[[reader]] | A ReadableStreamDefaultReader or ReadableStreamBYOBReader instance, if the stream
is locked to a reader, or |
[[state]] | A string containing the stream’s current state, used internally; one of "readable" , "closed" , or "errored"
|
[[storedError]] | A value indicating how the stream failed, to be given as a failure reason or exception when trying to operate on an errored stream |
3.2.3. new ReadableStream(underlyingSource = {}, strategy = {})
underlyingSource
argument represents the underlying source, as described in §3.2.4 Underlying source API.
The strategy
argument represents the stream’s queuing strategy, as described in §6.1.1 The queuing strategy API. If it
is not provided, the default behavior will be the same as a CountQueuingStrategy
with a high water mark of 1.
- Perform !
InitializeReadableStream (this ). - Let size be ?
GetV (strategy,"size"
). - Let highWaterMark be ?
GetV (strategy,"highWaterMark"
). - Let type be ?
GetV (underlyingSource,"type"
). - Let typeString be ?
ToString (type). - If typeString is
"bytes"
,- If size is not
undefined , throw aRangeError exception. - If highWaterMark is
undefined , let highWaterMark be0 . - Set highWaterMark to ?
ValidateAndNormalizeHighWaterMark (highWaterMark). - Perform ?
SetUpReadableByteStreamControllerFromUnderlyingSource (this , underlyingSource, highWaterMark).
- If size is not
- Otherwise, if type is
undefined ,- Let sizeAlgorithm be ?
MakeSizeAlgorithmFromSizeFunction (size). - If highWaterMark is
undefined , let highWaterMark be1 . - Set highWaterMark to ?
ValidateAndNormalizeHighWaterMark (highWaterMark). - Perform ?
SetUpReadableStreamDefaultControllerFromUnderlyingSource (this , underlyingSource, highWaterMark, sizeAlgorithm).
- Let sizeAlgorithm be ?
- Otherwise, throw a
RangeError exception.
3.2.4. Underlying source API
This section is non-normative.
The ReadableStream()
constructor accepts as its first argument a JavaScript object representing the underlying
source. Such objects can contain any of the following properties:
start(controller)
-
A function that is called immediately during creation of the
ReadableStream
.Typically this is used adapt a push source by setting up relevant event listeners, as in the example of §8.1 A readable stream with an underlying push source (no backpressure support), or to acquire access to a pull source, as in §8.4 A readable stream with an underlying pull source.
If this setup process is asynchronous, it can return a promise to signal success or failure; a rejected promise will error the stream. Any thrown exceptions will be re-thrown by the
ReadableStream()
constructor. pull(controller)
-
A function that is called whenever the stream’s internal queue of chunks becomes not full, i.e. whenever the queue’s desired size becomes positive. Generally, it will be called repeatedly until the queue reaches its high water mark (i.e. until the desired size becomes non-positive).
For push sources, this can be used to resume a paused flow, as in §8.2 A readable stream with an underlying push source and backpressure support. For pull sources, it is used to acquire new chunks to enqueue into the stream, as in §8.4 A readable stream with an underlying pull source.
This function will not be called until
start()
successfully completes. Additionally, it will only be called repeatedly if it enqueues at least one chunk or fulfills a BYOB request; a no-oppull()
implementation will not be continually called.If the function returns a promise, then it will not be called again until that promise fulfills. (If the promise rejects, the stream will become errored.) This is mainly used in the case of pull sources, where the promise returned represents the process of acquiring a new chunk. Throwing an exception is treated the same as returning a rejected promise.
cancel(reason)
-
A function that is called whenever the consumer cancels the stream, via
stream.cancel()
,defaultReader.cancel()
, orbyobReader.cancel()
. It takes as its argument the same value as was passed to those methods by the consumer.Readable streams can additionally be canceled under certain conditions during piping; see the definition of the
pipeTo()
method for more details.For all streams, this is generally used to release access to the underlying resource; see for example §8.1 A readable stream with an underlying push source (no backpressure support).
If the shutdown process is asynchronous, it can return a promise to signal success or failure; the result will be communicated via the return value of the
cancel()
method that was called. Additionally, a rejected promise will error the stream, instead of letting it close. Throwing an exception is treated the same as returning a rejected promise. type
(byte streams only)-
Can be set to
"bytes"
to signal that the constructedReadableStream
is a readable byte stream. This ensures that the resultingReadableStream
will successfully be able to vend BYOB readers via itsgetReader()
method. It also affects thecontroller
argument passed to thestart()
andpull()
methods; see below.For an example of how to set up a readable byte stream, including using the different controller interface, see §8.3 A readable byte stream with an underlying push source (no backpressure support).
Setting any value other than
"bytes"
orundefined will cause theReadableStream()
constructor to throw an exception. autoAllocateChunkSize
(byte streams only)-
Can be set to a positive integer to cause the implementation to automatically allocate buffers for the underlying source code to write into. In this case, when a consumer is using a default reader, the stream implementation will automatically allocate an
ArrayBuffer
of the given size, so thatcontroller.byobRequest
is always present, as if the consumer was using a BYOB reader.This is generally used to cut down on the amount of code needed to handle consumers that use default readers, as can be seen by comparing §8.3 A readable byte stream with an underlying push source (no backpressure support) without auto-allocation to §8.5 A readable byte stream with an underlying pull source with auto-allocation.
The type of the controller
argument passed to the start()
and pull()
methods depends on the value of the type
option. If type
is set to undefined
(including via omission), controller
will be a ReadableStreamDefaultController
. If it’s set to "bytes"
, controller
will be a ReadableByteStreamController
.
3.2.5. Properties of the ReadableStream
prototype
3.2.5.1. get locked
locked
getter returns whether or not the readable stream is locked to a reader. - If !
IsReadableStream (this ) isfalse , throw aTypeError exception. - Return !
IsReadableStreamLocked (this ).
3.2.5.2. cancel(reason)
cancel
method cancels the stream, signaling a loss of interest
in the stream by a consumer. The supplied reason
argument will be given to the underlying source’s cancel()
method, which might or might not use it. - If !
IsReadableStream (this ) isfalse , return a promise rejected with aTypeError exception. - If !
IsReadableStreamLocked (this ) istrue , return a promise rejected with aTypeError exception. - Return !
ReadableStreamCancel (this , reason).
3.2.5.3. getIterator({ preventCancel } = {})
getIterator
method returns an async iterator which can be used to consume the stream. The return()
method of this iterator object will, by default, cancel the stream; it will also release the reader. - If !
IsReadableStream (this ) isfalse , throw aTypeError exception. - Let reader be ?
AcquireReadableStreamDefaultReader (this ). - Let iterator be !
ObjectCreate (ReadableStreamAsyncIteratorPrototype
). - Set iterator.[[asyncIteratorReader]] to reader.
- Set iterator.[[preventCancel]] to !
ToBoolean (preventCancel). - Return iterator.
3.2.5.4. getReader({ mode } = {})
getReader
method creates a reader of the type specified by the mode
option and locks the stream to the new reader. While the stream is locked, no other reader can be
acquired until this one is released.
This functionality is especially useful for creating abstractions that desire the ability to consume a stream in its entirety. By getting a reader for the stream, you can ensure nobody else can interleave reads with yours or cancel the stream, which would interfere with your abstraction.
When mode
is ReadableStreamDefaultReader
). The reader provides the ability to directly read individual chunks from the
stream via the reader’s read()
method.
When mode
is "byob"
, the getReader
method creates a BYOB reader (an
instance of ReadableStreamBYOBReader
). This feature only works on readable byte streams, i.e. streams which
were constructed specifically with the ability to handle "bring your own buffer" reading. The reader provides the
ability to directly read individual chunks from the stream via the reader’s read()
method, into developer-supplied buffers, allowing more precise control over allocation.
- If !
IsReadableStream (this ) isfalse , throw aTypeError exception. - If mode is
undefined , return ?AcquireReadableStreamDefaultReader (this ). - Set mode to ?
ToString (mode). - If mode is
"byob"
, return ?AcquireReadableStreamBYOBReader (this ). - Throw a
RangeError exception.
function readAllChunks( readableStream) {
const reader = readableStream. getReader();
const chunks = [];
return pump();
function pump() {
return reader. read(). then(({ value, done }) => {
if ( done) {
return chunks;
}
chunks. push( value);
return pump();
});
}
}
Note how the first thing it does is obtain a reader, and from then on it uses the reader exclusively. This ensures that no other consumer can interfere with the stream, either by reading chunks or by canceling the stream.
3.2.5.5. pipeThrough({ writable, readable }, { preventClose, preventAbort, preventCancel, signal } = {})
pipeThrough
method provides a convenient, chainable way of piping this readable stream through a transform stream (or any other { writable, readable }
pair). It simply pipes the stream
into the writable side of the supplied pair, and returns the readable side for further use.
Piping a stream will lock it for the duration of the pipe, preventing any other consumer from acquiring a reader.
- If !
IsReadableStream (this ) isfalse , throw aTypeError exception. - If !
IsWritableStream (writable) isfalse , throw aTypeError exception. - If !
IsReadableStream (readable) isfalse , throw aTypeError exception. - Set preventClose to !
ToBoolean (preventClose), set preventAbort to !ToBoolean (preventAbort), and set preventCancel to !ToBoolean (preventCancel). - If signal is not
undefined , and signal is not an instance of theAbortSignal
interface, throw aTypeError exception. - If !
IsReadableStreamLocked (this ) istrue , throw aTypeError exception. - If !
IsWritableStreamLocked (writable) istrue , throw aTypeError exception. - Let promise be !
ReadableStreamPipeTo (this , writable, preventClose, preventAbort, preventCancel, signal). - Set promise.[[PromiseIsHandled]] to
true . - Return readable.
pipeThrough(transform, options)
would
look like
httpResponseBody
. pipeThrough( decompressorTransform)
. pipeThrough( ignoreNonImageFilesTransform)
. pipeTo( mediaGallery);
3.2.5.6. pipeTo(dest, { preventClose, preventAbort, preventCancel, signal } = {})
pipeTo
method pipes this readable stream to a given writable
stream. The way in which the piping process behaves under various error conditions can be customized with a
number of passed options. It returns a promise that fulfills when the piping process completes successfully, or
rejects if any errors were encountered.
Piping a stream will lock it for the duration of the pipe, preventing any other consumer from acquiring a reader.
Errors and closures of the source and destination streams propagate as follows:
-
An error in the source readable stream will abort the destination writable stream, unless
preventAbort
is truthy. The returned promise will be rejected with the source’s error, or with any error that occurs during aborting the destination. -
An error in the destination writable stream will cancel the source readable stream, unless
preventCancel
is truthy. The returned promise will be rejected with the destination’s error, or with any error that occurs during canceling the source. -
When the source readable stream closes, the destination writable stream will be closed, unless
preventClose
is true. The returned promise will be fulfilled once this process completes, unless an error is encountered while closing the destination, in which case it will be rejected with that error. -
If the destination writable stream starts out closed or closing, the source readable stream will be canceled, unless
preventCancel
is true. The returned promise will be rejected with an error indicating piping to a closed stream failed, or with any error that occurs during canceling the source.
The signal
option can be set to an AbortSignal
to allow aborting an ongoing pipe operation via the
corresponding AbortController
. In this case, the source readable stream will be canceled, and the destination writable stream aborted, unless
the respective options preventCancel
or preventAbort
are set.
- If !
IsReadableStream (this ) isfalse , return a promise rejected with aTypeError exception. - If !
IsWritableStream (dest) isfalse , return a promise rejected with aTypeError exception. - Set preventClose to !
ToBoolean (preventClose), set preventAbort to !ToBoolean (preventAbort), and set preventCancel to !ToBoolean (preventCancel). - If signal is not
undefined , and signal is not an instance of theAbortSignal
interface, return a promise rejected with aTypeError exception. - If !
IsReadableStreamLocked (this ) istrue , return a promise rejected with aTypeError exception. - If !
IsWritableStreamLocked (dest) istrue , return a promise rejected with aTypeError exception. - Return !
ReadableStreamPipeTo (this , dest, preventClose, preventAbort, preventCancel, signal).
3.2.5.7. tee()
tee
method tees this readable stream, returning a two-element
array containing the two resulting branches as new ReadableStream
instances.
Teeing a stream will lock it, preventing any other consumer from acquiring a reader. To cancel the stream, cancel both of the resulting branches; a composite cancellation reason will then be propagated to the stream’s underlying source.
Note that the chunks seen in each branch will be the same object. If the chunks are not immutable, this could allow interference between the two branches.
- If !
IsReadableStream (this ) isfalse , throw aTypeError exception. - Let branches be ?
ReadableStreamTee (this ,false ). - Return !
CreateArrayFromList (branches).
cacheEntry
representing an
on-disk file, and another writable stream httpRequestBody
representing an upload to a remote server,
you could pipe the same readable stream to both destinations at once:
const [ forLocal, forRemote] = readableStream. tee();
Promise. all([
forLocal. pipeTo( cacheEntry),
forRemote. pipeTo( httpRequestBody)
])
. then(() => console. log( "Saved the stream to the cache and also uploaded it!" ))
. catch ( e => console. error( "Either caching or uploading failed: " , e));
3.2.5.8. [@@asyncIterator]({ preventCancel } = {})
The @@asyncIterator
method is an alias of getIterator()
.
The initial value of the @@asyncIterator
method is the same function object as the initial value of the getIterator()
method.
3.3. ReadableStreamAsyncIteratorPrototype
ReadableStreamAsyncIteratorPrototype
is an ordinary object that is used by getIterator()
to
construct the objects it returns. Instances of ReadableStreamAsyncIteratorPrototype
implement the AsyncIterator
abstract interface from the JavaScript specification. [ECMASCRIPT]
The ReadableStreamAsyncIteratorPrototype
object must have its [[Prototype]] internal slot set to %AsyncIteratorPrototype%
.
3.3.1. Internal slots
Objects created by getIterator()
, using ReadableStreamAsyncIteratorPrototype
as their
prototype, are created with the internal slots described in the following table:
Internal Slot | Description (non-normative) |
---|---|
[[asyncIteratorReader]] | A ReadableStreamDefaultReader instance
|
[[preventCancel]] | A boolean value indicating if the stream will be canceled when the async iterator’s return() method is called
|
3.3.2. next()
- If !
IsReadableStreamAsyncIterator (this ) isfalse , return a promise rejected with aTypeError exception. - Let reader be
this .[[asyncIteratorReader]]. - If reader.[[ownerReadableStream]] is
undefined , return a promise rejected with aTypeError exception. - Return the result of transforming !
ReadableStreamDefaultReaderRead (reader) with a fulfillment handler which takes the argument result and performs the following steps:- Assert:
Type (result) is Object. - Let value be ?
Get (result,"value"
). - Let done be ?
Get (result,"done"
). - Assert:
Type (done) is Boolean. - If done is
true , perform !ReadableStreamReaderGenericRelease (reader). - Return !
ReadableStreamCreateReadResult (value, done,true ).
- Assert:
3.3.3. return( value )
- If !
IsReadableStreamAsyncIterator (this ) isfalse , return a promise rejected with aTypeError exception. - Let reader be
this .[[asyncIteratorReader]]. - If reader.[[ownerReadableStream]] is
undefined , return a promise rejected with aTypeError exception. - If reader.[[readRequests]] is not empty, return a promise rejected with a
TypeError exception. - If
this .[[preventCancel]] isfalse , then:- Let result be !
ReadableStreamReaderGenericCancel (reader, value). - Perform !
ReadableStreamReaderGenericRelease (reader). - Return the result of transforming result by a fulfillment handler that returns !
ReadableStreamCreateReadResult (value,true ,true ).
- Let result be !
- Perform !
ReadableStreamReaderGenericRelease (reader). - Return a promise resolved with !
ReadableStreamCreateReadResult (value,true ,true ).
3.4. General readable stream abstract operations
The following abstract operations, unlike most in this specification, are meant to be generally useful by other specifications, instead of just being part of the implementation of this spec’s classes.
3.4.1. AcquireReadableStreamBYOBReader ( stream ) throws
This abstract operation is meant to be called from other specifications that may wish to acquire a BYOB reader for a given stream.
- Return ?
Construct (ReadableStreamBYOBReader
, « stream »).
3.4.2. AcquireReadableStreamDefaultReader ( stream ) throws
This abstract operation is meant to be called from other specifications that may wish to acquire a default reader for a given stream.
- Return ?
Construct (ReadableStreamDefaultReader
, « stream »).
3.4.3. CreateReadableStream ( startAlgorithm, pullAlgorithm, cancelAlgorithm [, highWaterMark [, sizeAlgorithm ] ] ) throws
This abstract operation is meant to be called from other specifications that wish to create ReadableStream
instances. The pullAlgorithm and cancelAlgorithm algorithms must return
promises; if supplied, sizeAlgorithm must be an algorithm accepting chunk objects and returning a
number; and if supplied, highWaterMark must be a non-negative, non-NaN number.
- If highWaterMark was not passed, set it to
1 . - If sizeAlgorithm was not passed, set it to an algorithm that returns
1 . - Assert: !
IsNonNegativeNumber (highWaterMark) istrue . - Let stream be
ObjectCreate (the original value ofReadableStream
'sprototype
property). - Perform !
InitializeReadableStream (stream). - Let controller be
ObjectCreate (the original value ofReadableStreamDefaultController
'sprototype
property). - Perform ?
SetUpReadableStreamDefaultController (stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm). - Return stream.
3.4.4. CreateReadableByteStream ( startAlgorithm, pullAlgorithm, cancelAlgorithm [, highWaterMark [, autoAllocateChunkSize ] ] ) throws
This abstract operation is meant to be called from other specifications that wish to create ReadableStream
instances of type "bytes". The pullAlgorithm and cancelAlgorithm algorithms must return
promises; if supplied, highWaterMark must be a non-negative, non-NaN number, and if supplied, autoAllocateChunkSize must be a positive integer.
- If highWaterMark was not passed, set it to
0 . - If autoAllocateChunkSize was not passed, set it to
undefined . - Assert: !
IsNonNegativeNumber (highWaterMark) istrue . - If autoAllocateChunkSize is not
undefined ,- Assert: !
IsInteger (autoAllocateChunkSize) istrue . - Assert: autoAllocateChunkSize is positive.
- Assert: !
- Let stream be
ObjectCreate (the original value ofReadableStream
'sprototype
property). - Perform !
InitializeReadableStream (stream). - Let controller be
ObjectCreate (the original value ofReadableByteStreamController
'sprototype
property). - Perform ?
SetUpReadableByteStreamController (stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, autoAllocateChunkSize). - Return stream.
3.4.5. InitializeReadableStream ( stream ) nothrow
- Set stream.[[state]] to
"readable"
. - Set stream.[[reader]] and stream.[[storedError]] to
undefined . - Set stream.[[disturbed]] to
false .
3.4.6. IsReadableStream ( x ) nothrow
- If
Type (x) is not Object, returnfalse . - If x does not have a [[readableStreamController]] internal slot, return
false . - Return
true .
3.4.7. IsReadableStreamDisturbed ( stream ) nothrow
This abstract operation is meant to be called from other specifications that may wish to query whether or not a readable stream has ever been read from or canceled.
- Assert: !
IsReadableStream (stream) istrue . - Return stream.[[disturbed]].
3.4.8. IsReadableStreamLocked ( stream ) nothrow
This abstract operation is meant to be called from other specifications that may wish to query whether or not a readable stream is locked to a reader.
- Assert: !
IsReadableStream (stream) istrue . - If stream.[[reader]] is
undefined , returnfalse . - Return
true .
3.4.9. IsReadableStreamAsyncIterator ( x ) nothrow
- If
Type (x) is not Object, returnfalse . - If x does not have a [[asyncIteratorReader]] internal slot, return
false . - Return
true .
3.4.10. ReadableStreamTee ( stream, cloneForBranch2 ) throws
This abstract operation is meant to be called from other specifications that may wish to tee a given readable stream.
The second argument, cloneForBranch2, governs whether or not the data from the original stream will be cloned (using HTML’s serializable objects framework) before appearing in the second of the returned branches. This is useful for scenarios where both branches are to be consumed in such a way that they might otherwise interfere with each other, such as by transferring their chunks. However, it does introduce a noticeable asymmetry between the two branches, and limits the possible chunks to serializable ones. [HTML]
In this standard
- Assert: !
IsReadableStream (stream) istrue . - Assert:
Type (cloneForBranch2) is Boolean. - Let reader be ?
AcquireReadableStreamDefaultReader (stream). - Let closedOrErrored be
false . - Let canceled1 be
false . - Let canceled2 be
false . - Let reason1 be
undefined . - Let reason2 be
undefined . - Let branch1 be
undefined . - Let branch2 be
undefined . - Let cancelPromise be a new promise.
- Let pullAlgorithm be the following steps:
- Return the result of transforming !
ReadableStreamDefaultReaderRead (reader) with a fulfillment handler which takes the argument result and performs the following steps:- Assert:
Type (result) is Object. - Let value be ?
Get (result,"value"
). - Let done be ?
Get (result,"done"
). - Assert:
Type (done) is Boolean. - If done is
true and closedOrErrored isfalse ,- If canceled1 is
false ,- Perform !
ReadableStreamDefaultControllerClose (branch1.[[readableStreamController]]).
- Perform !
- If canceled2 is
false ,- Perform !
ReadableStreamDefaultControllerClose (branch2.[[readableStreamController]]).
- Perform !
- Set closedOrErrored to
true .
- If canceled1 is
- If closedOrErrored is
true , return. - Let value1 and value2 be value.
- If canceled2 is
false and cloneForBranch2 istrue , set value2 to ? StructuredDeserialize(? StructuredSerialize(value2),the current Realm Record ). - If canceled1 is
false , perform ?ReadableStreamDefaultControllerEnqueue (branch1.[[readableStreamController]], value1). - If canceled2 is
false , perform ?ReadableStreamDefaultControllerEnqueue (branch2.[[readableStreamController]], value2).
- Assert:
- Return the result of transforming !
- Let cancel1Algorithm be the following steps, taking a reason argument:
- Set canceled1 to
true . - Set reason1 to reason.
- If canceled2 is
true ,- Let compositeReason be !
CreateArrayFromList (« reason1, reason2 »). - Let cancelResult be !
ReadableStreamCancel (stream, compositeReason). - Resolve cancelPromise with cancelResult.
- Let compositeReason be !
- Return cancelPromise.
- Set canceled1 to
- Let cancel2Algorithm be the following steps, taking a reason argument:
- Set canceled2 to
true . - Set reason2 to reason.
- If canceled1 is
true ,- Let compositeReason be !
CreateArrayFromList (« reason1, reason2 »). - Let cancelResult be !
ReadableStreamCancel (stream, compositeReason). - Resolve cancelPromise with cancelResult.
- Let compositeReason be !
- Return cancelPromise.
- Set canceled2 to
- Let startAlgorithm be an algorithm that returns
undefined . - Set branch1 to !
CreateReadableStream (startAlgorithm, pullAlgorithm, cancel1Algorithm). - Set branch2 to !
CreateReadableStream (startAlgorithm, pullAlgorithm, cancel2Algorithm). - Upon rejection of reader.[[closedPromise]] with reason r,
- If closedOrErrored is
false , then:- Perform !
ReadableStreamDefaultControllerError (branch1.[[readableStreamController]], r). - Perform !
ReadableStreamDefaultControllerError (branch2.[[readableStreamController]], r). - Set closedOrErrored to
true .
- Perform !
- If closedOrErrored is
- Return « branch1, branch2 ».
3.4.11. ReadableStreamPipeTo ( source, dest, preventClose, preventAbort, preventCancel, signal ) nothrow
- Assert: !
IsReadableStream (source) istrue . - Assert: !
IsWritableStream (dest) istrue . - Assert:
Type (preventClose) is Boolean,Type (preventAbort) is Boolean, andType (preventCancel) is Boolean. - Assert: signal is
undefined or signal is an instance of theAbortSignal
interface. - Assert: !
IsReadableStreamLocked (source) isfalse . - Assert: !
IsWritableStreamLocked (dest) isfalse . - If !
IsReadableByteStreamController (source.[[readableStreamController]]) istrue , let reader be either !AcquireReadableStreamBYOBReader (source) or !AcquireReadableStreamDefaultReader (source), at the user agent’s discretion. - Otherwise, let reader be !
AcquireReadableStreamDefaultReader (source). - Let writer be !
AcquireWritableStreamDefaultWriter (dest). - Let shuttingDown be
false . - Let promise be a new promise.
- If signal is not
undefined ,- Let abortAlgorithm be the following steps:
- Let error be a new "
AbortError
"DOMException
. - Let actions be an empty ordered set.
- If preventAbort is
false , append the following action to actions:- If dest.[[state]] is
"writable"
, return !WritableStreamAbort (dest, error). - Otherwise, return a promise resolved with
undefined .
- If dest.[[state]] is
- If preventCancel is
false , append the following action action to actions:- If source.[[state]] is
"readable"
, return !ReadableStreamCancel (source, error). - Otherwise, return a promise resolved with
undefined .
- If source.[[state]] is
- Shutdown with an action consisting of waiting for all of the actions in actions, and with error.
- Let error be a new "
- If signal’s aborted flag is set, perform abortAlgorithm and return promise.
- Add abortAlgorithm to signal.
- Let abortAlgorithm be the following steps:
- In parallel but not really; see #905, using reader and writer, read all
chunks from source and write them to dest. Due to the locking provided by the reader and writer, the exact
manner in which this happens is not observable to author code, and so there is flexibility in how this is done. The
following constraints apply regardless of the exact algorithm used:
- Public API must not be used: while reading or writing, or performing any of the operations below, the JavaScript-modifiable reader, writer, and stream APIs (i.e. methods on the appropriate prototypes) must not be used. Instead, the streams must be manipulated directly.
- Backpressure must be enforced:
- While
WritableStreamDefaultWriterGetDesiredSize (writer) is ≤0 or isnull , the user agent must not read from reader. - If reader is a BYOB reader,
WritableStreamDefaultWriterGetDesiredSize (writer) should be used to determine the size of the chunks read from reader. - Reads or writes should not be delayed for reasons other than these backpressure signals.
An implementation that waits for each write to successfully complete before proceeding to the next read/write operation violates this recommendation. In doing so, such an implementation makes the internal queue of dest useless, as it ensures dest always contains at most one queued chunk.
- While
- Shutdown must stop activity: if shuttingDown becomes
true , the user agent must not initiate further reads from reader, and must only perform writes of already-read chunks, as described below. In particular, the user agent must check the below conditions before performing any reads or writes, since they might lead to immediate shutdown. - Error and close states must be propagated: the following conditions must be applied in order.
- Errors must be propagated forward: if source.[[state]] is or becomes
"errored"
, then- If preventAbort is
false , shutdown with an action of !WritableStreamAbort (dest, source.[[storedError]]) and with source.[[storedError]]. - Otherwise, shutdown with source.[[storedError]].
- If preventAbort is
- Errors must be propagated backward: if dest.[[state]] is or becomes
"errored"
, then- If preventCancel is
false , shutdown with an action of !ReadableStreamCancel (source, dest.[[storedError]]) and with dest.[[storedError]]. - Otherwise, shutdown with dest.[[storedError]].
- If preventCancel is
- Closing must be propagated forward: if source.[[state]] is or becomes
"closed"
, then- If preventClose is
false , shutdown with an action of !WritableStreamDefaultWriterCloseWithErrorPropagation (writer). - Otherwise, shutdown.
- If preventClose is
- Closing must be propagated backward: if !
WritableStreamCloseQueuedOrInFlight (dest) istrue or dest.[[state]] is"closed"
, then- Assert: no chunks have been read or written.
- Let destClosed be a new
TypeError . - If preventCancel is
false , shutdown with an action of !ReadableStreamCancel (source, destClosed) and with destClosed. - Otherwise, shutdown with destClosed.
- Errors must be propagated forward: if source.[[state]] is or becomes
- Shutdown with an action: if any of the above requirements ask to
shutdown with an action action, optionally with an error originalError, then:
- If shuttingDown is
true , abort these substeps. - Set shuttingDown to
true . - If dest.[[state]] is
"writable"
and !WritableStreamCloseQueuedOrInFlight (dest) isfalse , - Let p be the result of performing action.
- Upon fulfillment of p, finalize, passing along originalError if it was given.
- Upon rejection of p with reason newError, finalize with newError.
- If shuttingDown is
- Shutdown: if any of the above requirements or steps ask to shutdown, optionally
with an error error, then:
- If shuttingDown is
true , abort these substeps. - Set shuttingDown to
true . - If dest.[[state]] is
"writable"
and !WritableStreamCloseQueuedOrInFlight (dest) isfalse , - Finalize, passing along error if it was given.
- If shuttingDown is
- Finalize: both forms of shutdown will eventually ask to finalize, optionally with
an error error, which means to perform the following steps:
- Perform !
WritableStreamDefaultWriterRelease (writer). - Perform !
ReadableStreamReaderGenericRelease (reader). - If signal is not
undefined , remove abortAlgorithm from signal. - If error was given, reject promise with error.
- Otherwise, resolve promise with
undefined .
- Perform !
- Return promise.
Various abstract operations performed here include object creation (often of promises), which usually would require
specifying a
3.5. The interface between readable streams and controllers
In terms of specification factoring, the way that the ReadableStream
class encapsulates the behavior of
both simple readable streams and readable byte streams into a single class is by centralizing most of the
potentially-varying logic inside the two controller classes, ReadableStreamDefaultController
and ReadableByteStreamController
. Those classes define most of the stateful internal slots and abstract
operations for how a stream’s internal queue is managed and how it interfaces with its underlying source or underlying byte source.
Each controller class defines two internal methods, which are called by the ReadableStream
algorithms:
- [[CancelSteps]](reason)
- The controller’s steps that run in reaction to the stream being canceled, used to clean up the state stored in the controller and inform the underlying source.
- [[PullSteps]]()
- The controller’s steps that run when a default reader is read from, used to pull from the controller any queued chunks, or pull from the underlying source to get more chunks.
(These are defined as internal methods, instead of as abstract operations, so that they can be called polymorphically by
the ReadableStream
algorithms, without having to branch on which type of controller is present.)
The rest of this section concerns abstract operations that go in the other direction: they are used by the controller
implementations to affect their associated ReadableStream
object. This translates internal state changes of the
controller into developer-facing results visible through the ReadableStream
's public API.
3.5.1. ReadableStreamAddReadIntoRequest ( stream, forAuthorCode ) nothrow
- Assert: !
IsReadableStreamBYOBReader (stream.[[reader]]) istrue . - Assert: stream.[[state]] is
"readable"
or"closed"
. - Let promise be a new promise.
- Let readIntoRequest be
Record {[[promise]]: promise, [[forAuthorCode]]: forAuthorCode}. - Append readIntoRequest as the last element of stream.[[reader]].[[readIntoRequests]].
- Return promise.
3.5.2. ReadableStreamAddReadRequest ( stream, forAuthorCode ) nothrow
- Assert: !
IsReadableStreamDefaultReader (stream.[[reader]]) istrue . - Assert: stream.[[state]] is
"readable"
. - Let promise be a new promise.
- Let readRequest be
Record {[[promise]]: promise, [[forAuthorCode]]: forAuthorCode}. - Append readRequest as the last element of stream.[[reader]].[[readRequests]].
- Return promise.
3.5.3. ReadableStreamCancel ( stream, reason ) nothrow
- Set stream.[[disturbed]] to
true . - If stream.[[state]] is
"closed"
, return a promise resolved withundefined . - If stream.[[state]] is
"errored"
, return a promise rejected with stream.[[storedError]]. - Perform !
ReadableStreamClose (stream). - Let sourceCancelPromise be ! stream.[[readableStreamController]].[[CancelSteps]](reason).
- Return the result of transforming sourceCancelPromise with a fulfillment handler that returns
undefined .
3.5.4. ReadableStreamClose ( stream ) nothrow
- Assert: stream.[[state]] is
"readable"
. - Set stream.[[state]] to
"closed"
. - Let reader be stream.[[reader]].
- If reader is
undefined , return. - If !
IsReadableStreamDefaultReader (reader) istrue ,- Repeat for each readRequest that is an element of reader.[[readRequests]],
- Resolve readRequest.[[promise]] with !
ReadableStreamCreateReadResult (undefined ,true , readRequest.[[forAuthorCode]]).
- Resolve readRequest.[[promise]] with !
- Set reader.[[readRequests]] to an empty
List .
- Repeat for each readRequest that is an element of reader.[[readRequests]],
- Resolve reader.[[closedPromise]] with
undefined .
"closed"
, but stream.[[closeRequested]] is cancel(reason)
. In this case we allow the
controller’s close
method to be called and silently do nothing, since the cancelation was outside the
control of the underlying source. 3.5.5. ReadableStreamCreateReadResult ( value, done, forAuthorCode ) nothrow
defaultReader.read()
or byobReader.read()
methods.
However, resolving promises with such objects will unavoidably result in an access to Object.prototype.then
. For internal use, particularly in pipeTo()
and in other
specifications, it is important that reads not be observable by author code—even if that author code has tampered with Object.prototype
. For this reason, a
The underlying issue here is that reading from streams always uses promises for { value, done }
objects,
even in specifications. Although it is conceivable we could rephrase all of the internal algorithms to not use
promises and not use JavaScript objects, and instead only package up the results into promise-for-{ value, done
}
when a read()
method is called, this would be a large undertaking, which we have not done. See whatwg/infra#181 for more background on this subject.
- Let prototype be
null . - If forAuthorCode is
true , set prototype to%ObjectPrototype% . - Assert:
Type (done) is Boolean. - Let obj be
ObjectCreate (prototype). - Perform
CreateDataProperty (obj,"value"
, value). - Perform
CreateDataProperty (obj,"done"
, done). - Return obj.
3.5.6. ReadableStreamError ( stream, e ) nothrow
- Assert: !
IsReadableStream (stream) istrue . - Assert: stream.[[state]] is
"readable"
. - Set stream.[[state]] to
"errored"
. - Set stream.[[storedError]] to e.
- Let reader be stream.[[reader]].
- If reader is
undefined , return. - If !
IsReadableStreamDefaultReader (reader) istrue , - Otherwise,
- Assert: !
IsReadableStreamBYOBReader (reader). - Repeat for each readIntoRequest that is an element of reader.[[readIntoRequests]],
- Reject readIntoRequest.[[promise]] with e.
- Set reader.[[readIntoRequests]] to a new empty
List .
- Assert: !
- Reject reader.[[closedPromise]] with e.
- Set reader.[[closedPromise]].[[PromiseIsHandled]] to
true .
3.5.7. ReadableStreamFulfillReadIntoRequest ( stream, chunk, done ) nothrow
- Let reader be stream.[[reader]].
- Let readIntoRequest be the first element of reader.[[readIntoRequests]].
- Remove readIntoRequest from reader.[[readIntoRequests]], shifting all other elements downward (so that the second becomes the first, and so on).
- Resolve readIntoRequest.[[promise]] with !
ReadableStreamCreateReadResult (chunk, done, readIntoRequest.[[forAuthorCode]]).
3.5.8. ReadableStreamFulfillReadRequest ( stream, chunk, done ) nothrow
- Let reader be stream.[[reader]].
- Let readRequest be the first element of reader.[[readRequests]].
- Remove readRequest from reader.[[readRequests]], shifting all other elements downward (so that the second becomes the first, and so on).
- Resolve readRequest.[[promise]] with !
ReadableStreamCreateReadResult (chunk, done, readRequest.[[forAuthorCode]]).
3.5.9. ReadableStreamGetNumReadIntoRequests ( stream ) nothrow
- Return the number of elements in stream.[[reader]].[[readIntoRequests]].
3.5.10. ReadableStreamGetNumReadRequests ( stream ) nothrow
- Return the number of elements in stream.[[reader]].[[readRequests]].
3.5.11. ReadableStreamHasBYOBReader ( stream ) nothrow
- Let reader be stream.[[reader]].
- If reader is
undefined , returnfalse . - If !
IsReadableStreamBYOBReader (reader) isfalse , returnfalse . - Return
true .
3.5.12. ReadableStreamHasDefaultReader ( stream ) nothrow
- Let reader be stream.[[reader]].
- If reader is
undefined , returnfalse . - If !
IsReadableStreamDefaultReader (reader) isfalse , returnfalse . - Return
true .
3.6. Class ReadableStreamDefaultReader
The ReadableStreamDefaultReader
class represents a default reader designed to be vended by a ReadableStream
instance.
3.6.1. Class definition
This section is non-normative.
If one were to write the ReadableStreamDefaultReader
class in something close to the syntax of [ECMASCRIPT], it
would look like
class ReadableStreamDefaultReader {
constructor( stream)
get closed()
cancel( reason)
read()
releaseLock()
}
3.6.2. Internal slots
Instances of ReadableStreamDefaultReader
are created with the internal slots described in the following table:
Internal Slot | Description (non-normative) |
---|---|
[[closedPromise]] | A promise returned by the reader’s closed getter
|
[[ownerReadableStream]] | A ReadableStream instance that owns this reader
|
[[readRequests]] | A read() method that have not yet been resolved, due to the consumer requesting chunks sooner than they are available; also used for the IsReadableStreamDefaultReader brand check
|
3.6.3. new ReadableStreamDefaultReader(stream)
ReadableStreamDefaultReader
constructor is generally not meant to be used directly; instead, a
stream’s getReader()
method ought to be used. - If !
IsReadableStream (stream) isfalse , throw aTypeError exception. - If !
IsReadableStreamLocked (stream) istrue , throw aTypeError exception. - Perform !
ReadableStreamReaderGenericInitialize (this , stream). - Set
this .[[readRequests]] to a new emptyList .
3.6.4. Properties of the ReadableStreamDefaultReader
prototype
3.6.4.1. get closed
closed
getter returns a promise that will be fulfilled when the stream becomes closed, or rejected if
the stream ever errors or the reader’s lock is released before the stream finishes
closing. - If !
IsReadableStreamDefaultReader (this ) isfalse , return a promise rejected with aTypeError exception. - Return
this .[[closedPromise]].
3.6.4.2. cancel(reason)
cancel
method behaves the same as that for the
associated stream. - If !
IsReadableStreamDefaultReader (this ) isfalse , return a promise rejected with aTypeError exception. - If
this .[[ownerReadableStream]] isundefined , return a promise rejected with aTypeError exception. - Return !
ReadableStreamReaderGenericCancel (this , reason).
3.6.4.3. read()
read
method will return a promise that allows access to the next chunk from the stream’s
internal queue, if available.
- If the chunk does become available, the promise will be fulfilled with an object of the form
{ value: theChunk, done: false }
. - If the stream becomes closed, the promise will be fulfilled with an object of the form
{ value: undefined, done: true }
. - If the stream becomes errored, the promise will be rejected with the relevant error.
If reading a chunk causes the queue to become empty, more data will be pulled from the underlying source.
- If !
IsReadableStreamDefaultReader (this ) isfalse , return a promise rejected with aTypeError exception. - If
this .[[ownerReadableStream]] isundefined , return a promise rejected with aTypeError exception. - Return !
ReadableStreamDefaultReaderRead (this ,true ).
3.6.4.4. releaseLock()
releaseLock
method releases the reader’s lock on the corresponding
stream. After the lock is released, the reader is no longer active. If the associated
stream is errored when the lock is released, the reader will appear errored in the same way from now on; otherwise,
the reader will appear closed.
A reader’s lock cannot be released while it still has a pending read request, i.e., if a promise returned by the
reader’s read()
method has not yet been settled. Attempting to do so will throw
a
- If !
IsReadableStreamDefaultReader (this ) isfalse , throw aTypeError exception. - If
this .[[ownerReadableStream]] isundefined , return. - If
this .[[readRequests]] is not empty, throw aTypeError exception. - Perform !
ReadableStreamReaderGenericRelease (this ).
3.7. Class ReadableStreamBYOBReader
The ReadableStreamBYOBReader
class represents a BYOB reader designed to be vended by a ReadableStream
instance.
3.7.1. Class definition
This section is non-normative.
If one were to write the ReadableStreamBYOBReader
class in something close to the syntax of [ECMASCRIPT], it
would look like
class ReadableStreamBYOBReader {
constructor( stream)
get closed()
cancel( reason)
read( view)
releaseLock()
}
3.7.2. Internal slots
Instances of ReadableStreamBYOBReader
are created with the internal slots described in the following table:
Internal Slot | Description (non-normative) |
---|---|
[[closedPromise]] | A promise returned by the reader’s closed getter
|
[[ownerReadableStream]] | A ReadableStream instance that owns this reader
|
[[readIntoRequests]] | A read(view) method that have not yet been resolved, due to the consumer requesting chunks sooner than they are available; also used for the IsReadableStreamBYOBReader brand check
|
3.7.3. new ReadableStreamBYOBReader(stream)
ReadableStreamBYOBReader
constructor is generally not meant to be used directly; instead, a stream’s getReader()
method ought to be used. - If !
IsReadableStream (stream) isfalse , throw aTypeError exception. - If !
IsReadableByteStreamController (stream.[[readableStreamController]]) isfalse , throw aTypeError exception. - If !
IsReadableStreamLocked (stream) istrue , throw aTypeError exception. - Perform !
ReadableStreamReaderGenericInitialize (this , stream). - Set
this .[[readIntoRequests]] to a new emptyList .
3.7.4. Properties of the ReadableStreamBYOBReader
prototype
3.7.4.1. get closed
closed
getter returns a promise that will be fulfilled when the stream becomes closed, or rejected if
the stream ever errors or the reader’s lock is released before the stream finishes
closing. - If !
IsReadableStreamBYOBReader (this ) isfalse , return a promise rejected with aTypeError exception. - Return
this .[[closedPromise]].
3.7.4.2. cancel(reason)
cancel
method behaves the same as that for the
associated stream. - If !
IsReadableStreamBYOBReader (this ) isfalse , return a promise rejected with aTypeError exception. - If
this .[[ownerReadableStream]] isundefined , return a promise rejected with aTypeError exception. - Return !
ReadableStreamReaderGenericCancel (this , reason).
3.7.4.3. read(view)
read
method will write read bytes into view
and return a promise resolved with a
possibly transferred buffer as described below.
- If the chunk does become available, the promise will be fulfilled with an object of the form
{ value: theChunk, done: false }
. - If the stream becomes closed, the promise will be fulfilled with an object of the form
{ value: undefined, done: true }
. - If the stream becomes errored, the promise will be rejected with the relevant error.
If reading a chunk causes the queue to become empty, more data will be pulled from the underlying byte source.
- If !
IsReadableStreamBYOBReader (this ) isfalse , return a promise rejected with aTypeError exception. - If
this .[[ownerReadableStream]] isundefined , return a promise rejected with aTypeError exception. - If
Type (view) is not Object, return a promise rejected with aTypeError exception. - If view does not have a [[ViewedArrayBuffer]] internal slot, return a promise rejected with a
TypeError exception. - If !
IsDetachedBuffer (view.[[ViewedArrayBuffer]]) istrue , return a promise rejected with aTypeError exception. - If view.[[ByteLength]] is
0 , return a promise rejected with aTypeError exception. - Return !
ReadableStreamBYOBReaderRead (this , view,true ).
3.7.4.4. releaseLock()
releaseLock
method releases the reader’s lock on the corresponding
stream. After the lock is released, the reader is no longer active. If the associated
stream is errored when the lock is released, the reader will appear errored in the same way from now on; otherwise,
the reader will appear closed.
A reader’s lock cannot be released while it still has a pending read request, i.e., if a promise returned by the
reader’s read()
method has not yet been settled. Attempting to do so will throw
a
- If !
IsReadableStreamBYOBReader (this ) isfalse , throw aTypeError exception. - If
this .[[ownerReadableStream]] isundefined , return. - If
this .[[readIntoRequests]] is not empty, throw aTypeError exception. - Perform !
ReadableStreamReaderGenericRelease (this ).
3.8. Readable stream reader abstract operations
3.8.1. IsReadableStreamDefaultReader ( x ) nothrow
- If
Type (x) is not Object, returnfalse . - If x does not have a [[readRequests]] internal slot, return
false . - Return
true .
3.8.2. IsReadableStreamBYOBReader ( x ) nothrow
- If
Type (x) is not Object, returnfalse . - If x does not have a [[readIntoRequests]] internal slot, return
false . - Return
true .
3.8.3. ReadableStreamReaderGenericCancel ( reader, reason ) nothrow
- Let stream be reader.[[ownerReadableStream]].
- Assert: stream is not
undefined . - Return !
ReadableStreamCancel (stream, reason).
3.8.4. ReadableStreamReaderGenericInitialize ( reader, stream ) nothrow
- Set reader.[[ownerReadableStream]] to stream.
- Set stream.[[reader]] to reader.
- If stream.[[state]] is
"readable"
,- Set reader.[[closedPromise]] to a new promise.
- Otherwise, if stream.[[state]] is
"closed"
,- Set reader.[[closedPromise]] to a promise resolved with
undefined .
- Set reader.[[closedPromise]] to a promise resolved with
- Otherwise,
- Assert: stream.[[state]] is
"errored"
. - Set reader.[[closedPromise]] to a promise rejected with stream.[[storedError]].
- Set reader.[[closedPromise]].[[PromiseIsHandled]] to
true .
- Assert: stream.[[state]] is
3.8.5. ReadableStreamReaderGenericRelease ( reader ) nothrow
- Assert: reader.[[ownerReadableStream]] is not
undefined . - Assert: reader.[[ownerReadableStream]].[[reader]] is reader.
- If reader.[[ownerReadableStream]].[[state]] is
"readable"
, reject reader.[[closedPromise]] with aTypeError exception. - Otherwise, set reader.[[closedPromise]] to a promise rejected with a
TypeError exception. - Set reader.[[closedPromise]].[[PromiseIsHandled]] to
true . - Set reader.[[ownerReadableStream]].[[reader]] to
undefined . - Set reader.[[ownerReadableStream]] to
undefined .
3.8.6. ReadableStreamBYOBReaderRead ( reader, view [, forAuthorCode ] ) nothrow
- If forAuthorCode was not passed, set it to
false . - Let stream be reader.[[ownerReadableStream]].
- Assert: stream is not
undefined . - Set stream.[[disturbed]] to
true . - If stream.[[state]] is
"errored"
, return a promise rejected with stream.[[storedError]]. - Return !
ReadableByteStreamControllerPullInto (stream.[[readableStreamController]], view, forAuthorCode).
3.8.7. ReadableStreamDefaultReaderRead ( reader [, forAuthorCode ] ) nothrow
Other specifications ought to leave forAuthorCode as its default value of { value, done }
object
to authors. See the note regarding ReadableStreamCreateReadResult for more
information.
- If forAuthorCode was not passed, set it to
false . - Let stream be reader.[[ownerReadableStream]].
- Assert: stream is not
undefined . - Set stream.[[disturbed]] to
true . - If stream.[[state]] is
"closed"
, return a promise resolved with !ReadableStreamCreateReadResult (undefined ,true , forAuthorCode). - If stream.[[state]] is
"errored"
, return a promise rejected with stream.[[storedError]]. - Assert: stream.[[state]] is
"readable"
. - Return ! stream.[[readableStreamController]].[[PullSteps]](forAuthorCode).
3.9. Class ReadableStreamDefaultController
The ReadableStreamDefaultController
class has methods that allow control of a ReadableStream
's state and internal queue. When constructing a ReadableStream
that is not a readable byte stream, the underlying source is given a corresponding ReadableStreamDefaultController
instance to manipulate.
3.9.1. Class definition
This section is non-normative.
If one were to write the ReadableStreamDefaultController
class in something close to the syntax of [ECMASCRIPT],
it would look like
class ReadableStreamDefaultController {
constructor() // always throws
get desiredSize()
close()
enqueue( chunk)
error( e)
}
3.9.2. Internal slots
Instances of ReadableStreamDefaultController
are created with the internal slots described in the following table:
Internal Slot | Description (non-normative) |
---|---|
[[cancelAlgorithm]] | A promise-returning algorithm, taking one argument (the cancel reason), which communicates a requested cancelation to the underlying source |
[[closeRequested]] | A boolean flag indicating whether the stream has been closed by its underlying source, but still has chunks in its internal queue that have not yet been read |
[[controlledReadableStream]] | The ReadableStream instance controlled
|
[[pullAgain]] | A boolean flag set to |
[[pullAlgorithm]] | A promise-returning algorithm that pulls data from the underlying source |
[[pulling]] | A boolean flag set to |
[[queue]] | A |
[[queueTotalSize]] | The total size of all the chunks stored in [[queue]] (see §6.2 Queue-with-sizes operations) |
[[started]] | A boolean flag indicating whether the underlying source has finished starting |
[[strategyHWM]] | A number supplied to the constructor as part of the stream’s queuing strategy, indicating the point at which the stream will apply backpressure to its underlying source |
[[strategySizeAlgorithm]] | An algorithm to calculate the size of enqueued chunks, as part of the stream’s queuing strategy |
3.9.3. new ReadableStreamDefaultController()
ReadableStreamDefaultController
constructor cannot be used directly; ReadableStreamDefaultController
instances are created automatically during ReadableStream
construction. - Throw a
TypeError .
3.9.4. Properties of the ReadableStreamDefaultController
prototype
3.9.4.1. get desiredSize
desiredSize
getter returns the desired size
to fill the controlled stream’s internal queue. It can be negative, if the queue is over-full. An underlying
source ought to use this information to determine when and how to apply backpressure. - If !
IsReadableStreamDefaultController (this ) isfalse , throw aTypeError exception. - Return !
ReadableStreamDefaultControllerGetDesiredSize (this ).
3.9.4.2. close()
close
method will close the controlled readable stream. Consumers will still be able to read
any previously-enqueued chunks from the stream, but once those are read, the stream will become closed. - If !
IsReadableStreamDefaultController (this ) isfalse , throw aTypeError exception. - If !
ReadableStreamDefaultControllerCanCloseOrEnqueue (this ) isfalse , throw aTypeError exception. - Perform !
ReadableStreamDefaultControllerClose (this ).
3.9.4.3. enqueue(chunk)
enqueue
method will enqueue a given chunk in the controlled readable stream. - If !
IsReadableStreamDefaultController (this ) isfalse , throw aTypeError exception. - If !
ReadableStreamDefaultControllerCanCloseOrEnqueue (this ) isfalse , throw aTypeError exception. - Return ?
ReadableStreamDefaultControllerEnqueue (this , chunk).
3.9.4.4. error(e)
error
method will error the readable stream, making all future interactions with it fail with the
given error e
. - If !
IsReadableStreamDefaultController (this ) isfalse , throw aTypeError exception. - Perform !
ReadableStreamDefaultControllerError (this , e).
3.9.5. Readable stream default controller internal methods
The following are additional internal methods implemented by each ReadableStreamDefaultController
instance. The
readable stream implementation will polymorphically call to either these or their counterparts for BYOB controllers.
3.9.5.1. [[CancelSteps]](reason)
- Perform !
ResetQueue (this ). - Let result be the result of performing
this .[[cancelAlgorithm]], passing reason. - Perform !
ReadableStreamDefaultControllerClearAlgorithms (this ). - Return result.
3.9.5.2. [[PullSteps]]( forAuthorCode )
- Let stream be
this .[[controlledReadableStream]]. - If
this .[[queue]] is not empty,- Let chunk be !
DequeueValue (this ). - If
this .[[closeRequested]] istrue andthis .[[queue]] is empty,- Perform !
ReadableStreamDefaultControllerClearAlgorithms (this ). - Perform !
ReadableStreamClose (stream).
- Perform !
- Otherwise, perform !
ReadableStreamDefaultControllerCallPullIfNeeded (this ). - Return a promise resolved with !
ReadableStreamCreateReadResult (chunk,false , forAuthorCode).
- Let chunk be !
- Let pendingPromise be !
ReadableStreamAddReadRequest (stream, forAuthorCode). - Perform !
ReadableStreamDefaultControllerCallPullIfNeeded (this ). - Return pendingPromise.
3.10. Readable stream default controller abstract operations
3.10.1. IsReadableStreamDefaultController ( x ) nothrow
- If
Type (x) is not Object, returnfalse . - If x does not have a [[controlledReadableStream]] internal slot, return
false . - Return
true .
3.10.2. ReadableStreamDefaultControllerCallPullIfNeeded ( controller ) nothrow
- Let shouldPull be !
ReadableStreamDefaultControllerShouldCallPull (controller). - If shouldPull is
false , return. - If controller.[[pulling]] is
true ,- Set controller.[[pullAgain]] to
true . - Return.
- Set controller.[[pullAgain]] to
- Assert: controller.[[pullAgain]] is
false . - Set controller.[[pulling]] to
true . - Let pullPromise be the result of performing controller.[[pullAlgorithm]].
- Upon fulfillment of pullPromise,
- Set controller.[[pulling]] to
false . - If controller.[[pullAgain]] is
true ,- Set controller.[[pullAgain]] to
false . - Perform !
ReadableStreamDefaultControllerCallPullIfNeeded (controller).
- Set controller.[[pullAgain]] to
- Set controller.[[pulling]] to
- Upon rejection of pullPromise with reason e,
- Perform !
ReadableStreamDefaultControllerError (controller, e).
- Perform !
3.10.3. ReadableStreamDefaultControllerShouldCallPull ( controller ) nothrow
- Let stream be controller.[[controlledReadableStream]].
- If !
ReadableStreamDefaultControllerCanCloseOrEnqueue (controller) isfalse , returnfalse . - If controller.[[started]] is
false , returnfalse . - If !
IsReadableStreamLocked (stream) istrue and !ReadableStreamGetNumReadRequests (stream) >0 , returntrue . - Let desiredSize be !
ReadableStreamDefaultControllerGetDesiredSize (controller). - Assert: desiredSize is not
null . - If desiredSize >
0 , returntrue . - Return
false .
3.10.4. ReadableStreamDefaultControllerClearAlgorithms ( controller ) nothrow
This abstract operation is called once the stream is closed or errored and the algorithms will not be executed any more.
By removing the algorithm references it permits the underlying source object to be garbage collected even if the ReadableStream
itself is still referenced.
The results of this algorithm are not currently observable, but could become so if JavaScript eventually adds weak references. But even without that factor, implementations will likely want to include similar steps.
- Set controller.[[pullAlgorithm]] to
undefined . - Set controller.[[cancelAlgorithm]] to
undefined . - Set controller.[[strategySizeAlgorithm]] to
undefined .
3.10.5. ReadableStreamDefaultControllerClose ( controller ) nothrow
This abstract operation can be called by other specifications that wish to close a readable stream, in the same way a developer-created stream would be closed by its associated controller object. Specifications should not do this to streams they did not create, and must ensure they have obeyed the preconditions (listed here as asserts).
- Let stream be controller.[[controlledReadableStream]].
- Assert: !
ReadableStreamDefaultControllerCanCloseOrEnqueue (controller) istrue . - Set controller.[[closeRequested]] to
true . - If controller.[[queue]] is empty,
- Perform !
ReadableStreamDefaultControllerClearAlgorithms (controller). - Perform !
ReadableStreamClose (stream).
- Perform !
3.10.6. ReadableStreamDefaultControllerEnqueue ( controller, chunk ) throws
This abstract operation can be called by other specifications that wish to enqueue chunks in a readable stream, in the same way a developer would enqueue chunks using the stream’s associated controller object. Specifications should not do this to streams they did not create, and must ensure they have obeyed the preconditions (listed here as asserts).
- Let stream be controller.[[controlledReadableStream]].
- Assert: !
ReadableStreamDefaultControllerCanCloseOrEnqueue (controller) istrue . - If !
IsReadableStreamLocked (stream) istrue and !ReadableStreamGetNumReadRequests (stream) >0 , perform !ReadableStreamFulfillReadRequest (stream, chunk,false ). - Otherwise,
- Let result be the result of performing controller.[[strategySizeAlgorithm]], passing in chunk, and interpreting the result as an ECMAScript completion value.
- If result is an
abrupt completion ,- Perform !
ReadableStreamDefaultControllerError (controller, result.[[Value]]). - Return result.
- Perform !
- Let chunkSize be result.[[Value]].
- Let enqueueResult be
EnqueueValueWithSize (controller, chunk, chunkSize). - If enqueueResult is an
abrupt completion ,- Perform !
ReadableStreamDefaultControllerError (controller, enqueueResult.[[Value]]). - Return enqueueResult.
- Perform !
- Perform !
ReadableStreamDefaultControllerCallPullIfNeeded (controller).
3.10.7. ReadableStreamDefaultControllerError ( controller, e ) nothrow
This abstract operation can be called by other specifications that wish to move a readable stream to an errored state, in the same way a developer would error a stream using its associated controller object. Specifications should not do this to streams they did not create.
- Let stream be controller.[[controlledReadableStream]].
- If stream.[[state]] is not
"readable"
, return. - Perform !
ResetQueue (controller). - Perform !
ReadableStreamDefaultControllerClearAlgorithms (controller). - Perform !
ReadableStreamError (stream, e).
3.10.8. ReadableStreamDefaultControllerGetDesiredSize ( controller ) nothrow
This abstract operation can be called by other specifications that wish to determine the desired size to fill this stream’s internal queue, similar to how a developer would consult
the desiredSize
property of the stream’s associated controller object.
Specifications should not use this on streams they did not create.
- Let stream be controller.[[controlledReadableStream]].
- Let state be stream.[[state]].
- If state is
"errored"
, returnnull . - If state is
"closed"
, return0 . - Return controller.[[strategyHWM]] − controller.[[queueTotalSize]].
3.10.9. ReadableStreamDefaultControllerHasBackpressure ( controller ) nothrow
This abstract operation is used in the implementation of TransformStream.
- If !
ReadableStreamDefaultControllerShouldCallPull (controller) istrue , returnfalse . - Otherwise, return
true .
3.10.10. ReadableStreamDefaultControllerCanCloseOrEnqueue ( controller ) nothrow
- Let state be controller.[[controlledReadableStream]].[[state]].
- If controller.[[closeRequested]] is
false and state is"readable"
, returntrue . - Otherwise, return
false .
"readable"
, happens when the stream is errored via error(e)
, or
when it is closed without its controller’s close
method ever being called: e.g., if the stream was closed
by a call to cancel(reason)
. 3.10.11. SetUpReadableStreamDefaultController(stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm ) throws
- Assert: stream.[[readableStreamController]] is
undefined . - Set controller.[[controlledReadableStream]] to stream.
- Set controller.[[queue]] and controller.[[queueTotalSize]] to
undefined , then perform !ResetQueue (controller). - Set controller.[[started]], controller.[[closeRequested]], controller.[[pullAgain]], and
controller.[[pulling]] to
false . - Set controller.[[strategySizeAlgorithm]] to sizeAlgorithm and controller.[[strategyHWM]] to highWaterMark.
- Set controller.[[pullAlgorithm]] to pullAlgorithm.
- Set controller.[[cancelAlgorithm]] to cancelAlgorithm.
- Set stream.[[readableStreamController]] to controller.
- Let startResult be the result of performing startAlgorithm. (This may throw an exception.)
- Let startPromise be a promise resolved with startResult.
- Upon fulfillment of startPromise,
- Set controller.[[started]] to
true . - Assert: controller.[[pulling]] is
false . - Assert: controller.[[pullAgain]] is
false . - Perform !
ReadableStreamDefaultControllerCallPullIfNeeded (controller).
- Set controller.[[started]] to
- Upon rejection of startPromise with reason r,
- Perform !
ReadableStreamDefaultControllerError (controller, r).
- Perform !
3.10.12. SetUpReadableStreamDefaultControllerFromUnderlyingSource(stream, underlyingSource, highWaterMark, sizeAlgorithm ) throws
- Assert: underlyingSource is not
undefined . - Let controller be
ObjectCreate (the original value ofReadableStreamDefaultController
'sprototype
property). - Let startAlgorithm be the following steps:
- Return ?
InvokeOrNoop (underlyingSource,"start"
, « controller »).
- Return ?
- Let pullAlgorithm be ?
CreateAlgorithmFromUnderlyingMethod (underlyingSource,"pull"
,0 , « controller »). - Let cancelAlgorithm be ?
CreateAlgorithmFromUnderlyingMethod (underlyingSource,"cancel"
,1 , « »). - Perform ?
SetUpReadableStreamDefaultController (stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm).
3.11. Class ReadableByteStreamController
The ReadableByteStreamController
class has methods that allow control of a ReadableStream
's state and internal queue. When constructing a ReadableStream
, the underlying byte source is given a
corresponding ReadableByteStreamController
instance to manipulate.
3.11.1. Class definition
This section is non-normative.
If one were to write the ReadableByteStreamController
class in something close to the syntax of [ECMASCRIPT], it
would look like
class ReadableByteStreamController {
constructor() // always throws
get byobRequest()
get desiredSize()
close()
enqueue( chunk)
error( e)
}
3.11.2. Internal slots
Instances of ReadableByteStreamController
are created with the internal slots described in the following table:
Internal Slot | Description (non-normative) |
---|---|
[[autoAllocateChunkSize]] | A positive integer, when the automatic buffer allocation feature is enabled. In that case,
this value specifies the size of buffer to allocate. It is |
[[byobRequest]] | A ReadableStreamBYOBRequest instance representing the current BYOB pull request
|
[[cancelAlgorithm]] | A promise-returning algorithm, taking one argument (the cancel reason), which communicates a requested cancelation to the underlying source |
[[closeRequested]] | A boolean flag indicating whether the stream has been closed by its underlying byte source, but still has chunks in its internal queue that have not yet been read |
[[controlledReadableByteStream]] | The ReadableStream instance controlled
|
[[pullAgain]] | A boolean flag set to pull() method to pull more data, but the pull could
not yet be done since a previous call is still executing
|
[[pullAlgorithm]] | A promise-returning algorithm that pulls data from the underlying source |
[[pulling]] | A boolean flag set to pull() method is executing and has not yet fulfilled, used to prevent reentrant calls
|
[[pendingPullIntos]] | A |
[[queue]] | A |
[[queueTotalSize]] | The total size (in bytes) of all the chunks stored in [[queue]] |
[[started]] | A boolean flag indicating whether the underlying source has finished starting |
[[strategyHWM]] | A number supplied to the constructor as part of the stream’s queuing strategy, indicating the point at which the stream will apply backpressure to its underlying byte source |
Although ReadableByteStreamController
instances have [[queue]] and [[queueTotalSize]] slots, we do not use
most of the abstract operations in §6.2 Queue-with-sizes operations on them, as the way in which we manipulate this queue is
rather different than the others in the spec. Instead, we update the two slots together manually.
This might be cleaned up in a future spec refactoring.
3.11.3. new ReadableByteStreamController()
ReadableByteStreamController
constructor cannot be used directly; ReadableByteStreamController
instances are created automatically during ReadableStream
construction. - Throw a
TypeError exception.
3.11.4. Properties of the ReadableByteStreamController
prototype
3.11.4.1. get byobRequest
byobRequest
getter returns the current BYOB pull request. - If !
IsReadableByteStreamController (this ) isfalse , throw aTypeError exception. - If
this .[[byobRequest]] isundefined andthis .[[pendingPullIntos]] is not empty,- Let firstDescriptor be the first element of
this .[[pendingPullIntos]]. - Let view be !
Construct (%Uint8Array%, « firstDescriptor.[[buffer]], firstDescriptor.[[byteOffset]] + firstDescriptor.[[bytesFilled]], firstDescriptor.[[byteLength]] − firstDescriptor.[[bytesFilled]] »). - Let byobRequest be
ObjectCreate (the original value ofReadableStreamBYOBRequest
'sprototype
property). - Perform !
SetUpReadableStreamBYOBRequest (byobRequest,this , view). - Set
this .[[byobRequest]] to byobRequest.
- Let firstDescriptor be the first element of
- Return
this .[[byobRequest]].
3.11.4.2. get desiredSize
desiredSize
getter returns the desired size
to fill the controlled stream’s internal queue. It can be negative, if the queue is over-full. An underlying
source ought to use this information to determine when and how to apply backpressure. - If !
IsReadableByteStreamController (this ) isfalse , throw aTypeError exception. - Return !
ReadableByteStreamControllerGetDesiredSize (this ).
3.11.4.3. close()
close
method will close the controlled readable stream. Consumers will still be able to read
any previously-enqueued chunks from the stream, but once those are read, the stream will become closed. - If !
IsReadableByteStreamController (this ) isfalse , throw aTypeError exception. - If
this .[[closeRequested]] istrue , throw aTypeError exception. - If
this .[[controlledReadableByteStream]].[[state]] is not"readable"
, throw aTypeError exception. - Perform ?
ReadableByteStreamControllerClose (this ).
3.11.4.4. enqueue(chunk)
enqueue
method will enqueue a given chunk in the controlled readable stream. - If !
IsReadableByteStreamController (this ) isfalse , throw aTypeError exception. - If
this .[[closeRequested]] istrue , throw aTypeError exception. - If
this .[[controlledReadableByteStream]].[[state]] is not"readable"
, throw aTypeError exception. - If
Type (chunk) is not Object, throw aTypeError exception. - If chunk does not have a [[ViewedArrayBuffer]] internal slot, throw a
TypeError exception. - If !
IsDetachedBuffer (chunk.[[ViewedArrayBuffer]]) istrue , throw aTypeError exception. - Return !
ReadableByteStreamControllerEnqueue (this , chunk).
3.11.4.5. error(e)
error
method will error the readable stream, making all future interactions with it fail with the
given error e
. - If !
IsReadableByteStreamController (this ) isfalse , throw aTypeError exception. - Perform !
ReadableByteStreamControllerError (this , e).
3.11.5. Readable stream BYOB controller internal methods
The following are additional internal methods implemented by each ReadableByteStreamController
instance. The
readable stream implementation will polymorphically call to either these or their counterparts for default controllers.
3.11.5.1. [[CancelSteps]](reason)
- If
this .[[pendingPullIntos]] is not empty,- Let firstDescriptor be the first element of
this .[[pendingPullIntos]]. - Set firstDescriptor.[[bytesFilled]] to
0 .
- Let firstDescriptor be the first element of
- Perform !
ResetQueue (this ). - Let result be the result of performing
this .[[cancelAlgorithm]], passing in reason. - Perform !
ReadableByteStreamControllerClearAlgorithms (this ). - Return result.
3.11.5.2. [[PullSteps]]( forAuthorCode )
- Let stream be
this .[[controlledReadableByteStream]]. - Assert: !
ReadableStreamHasDefaultReader (stream) istrue . - If
this .[[queueTotalSize]] >0 ,- Assert: !
ReadableStreamGetNumReadRequests (stream) is0 . - Let entry be the first element of
this .[[queue]]. - Remove entry from
this .[[queue]], shifting all other elements downward (so that the second becomes the first, and so on). - Set
this .[[queueTotalSize]] tothis .[[queueTotalSize]] − entry.[[byteLength]]. - Perform !
ReadableByteStreamControllerHandleQueueDrain (this ). - Let view be !
Construct (%Uint8Array%, « entry.[[buffer]], entry.[[byteOffset]], entry.[[byteLength]] »). - Return a promise resolved with !
ReadableStreamCreateReadResult (view,false , forAuthorCode).
- Assert: !
- Let autoAllocateChunkSize be
this .[[autoAllocateChunkSize]]. - If autoAllocateChunkSize is not
undefined ,- Let buffer be
Construct (%ArrayBuffer% , « autoAllocateChunkSize »). - If buffer is an
abrupt completion , return a promise rejected with buffer.[[Value]]. - Let pullIntoDescriptor be
Record {[[buffer]]: buffer.[[Value]], [[byteOffset]]:0 , [[byteLength]]: autoAllocateChunkSize, [[bytesFilled]]:0 , [[elementSize]]:1 , [[ctor]]: %Uint8Array%, [[readerType]]:"default"
}. - Append pullIntoDescriptor as the last element of
this .[[pendingPullIntos]].
- Let buffer be
- Let promise be !
ReadableStreamAddReadRequest (stream, forAuthorCode). - Perform !
ReadableByteStreamControllerCallPullIfNeeded (this ). - Return promise.
3.12. Class ReadableStreamBYOBRequest
The ReadableStreamBYOBRequest
class represents a pull into request in a ReadableByteStreamController
.
3.12.1. Class definition
This section is non-normative.
If one were to write the ReadableStreamBYOBRequest
class in something close to the syntax of [ECMASCRIPT], it
would look like
class ReadableStreamBYOBRequest {
constructor( controller, view)
get view()
respond( bytesWritten)
respondWithNewView( view)
}
3.12.2. Internal slots
Instances of ReadableStreamBYOBRequest
are created with the internal slots described in the following table:
Internal Slot | Description (non-normative) |
---|---|
[[associatedReadableByteStreamController]] | The parent ReadableByteStreamController instance
|
[[view]] | A typed array representing the destination region to which the controller can write generated data |
3.12.3. new ReadableStreamBYOBRequest()
- Throw a
TypeError exception.
3.12.4. Properties of the ReadableStreamBYOBRequest
prototype
3.12.4.1. get view
- If !
IsReadableStreamBYOBRequest (this ) isfalse , throw aTypeError exception. - Return
this .[[view]].
3.12.4.2. respond(bytesWritten)
- If !
IsReadableStreamBYOBRequest (this ) isfalse , throw aTypeError exception. - If
this .[[associatedReadableByteStreamController]] isundefined , throw aTypeError exception. - If !
IsDetachedBuffer (this .[[view]].[[ViewedArrayBuffer]]) istrue , throw aTypeError exception. - Return ?
ReadableByteStreamControllerRespond (this .[[associatedReadableByteStreamController]], bytesWritten).
3.12.4.3. respondWithNewView(view)
- If !
IsReadableStreamBYOBRequest (this ) isfalse , throw aTypeError exception. - If
this .[[associatedReadableByteStreamController]] isundefined , throw aTypeError exception. - If
Type (view) is not Object, throw aTypeError exception. - If view does not have a [[ViewedArrayBuffer]] internal slot, throw a
TypeError exception. - If !
IsDetachedBuffer (view.[[ViewedArrayBuffer]]) istrue , throw aTypeError exception. - Return ?
ReadableByteStreamControllerRespondWithNewView (this .[[associatedReadableByteStreamController]], view).
3.13. Readable stream BYOB controller abstract operations
3.13.1. IsReadableStreamBYOBRequest ( x ) nothrow
- If
Type (x) is not Object, returnfalse . - If x does not have an [[associatedReadableByteStreamController]] internal slot, return
false . - Return
true .
3.13.2. IsReadableByteStreamController ( x ) nothrow
- If
Type (x) is not Object, returnfalse . - If x does not have an [[controlledReadableByteStream]] internal slot, return
false . - Return
true .
3.13.3. ReadableByteStreamControllerCallPullIfNeeded ( controller ) nothrow
- Let shouldPull be !
ReadableByteStreamControllerShouldCallPull (controller). - If shouldPull is
false , return. - If controller.[[pulling]] is
true ,- Set controller.[[pullAgain]] to
true . - Return.
- Set controller.[[pullAgain]] to
- Assert: controller.[[pullAgain]] is
false . - Set controller.[[pulling]] to
true . - Let pullPromise be the result of performing controller.[[pullAlgorithm]].
- Upon fulfillment of pullPromise,
- Set controller.[[pulling]] to
false . - If controller.[[pullAgain]] is
true ,- Set controller.[[pullAgain]] to
false . - Perform !
ReadableByteStreamControllerCallPullIfNeeded (controller).
- Set controller.[[pullAgain]] to
- Set controller.[[pulling]] to
- Upon rejection of pullPromise with reason e,
- Perform !
ReadableByteStreamControllerError (controller, e).
- Perform !
3.13.4. ReadableByteStreamControllerClearAlgorithms ( controller ) throws
This abstract operation is called once the stream is closed or errored and the algorithms will not be executed any more.
By removing the algorithm references it permits the underlying source object to be garbage collected even if the ReadableStream
itself is still referenced.
The results of this algorithm are not currently observable, but could become so if JavaScript eventually adds weak references. But even without that factor, implementations will likely want to include similar steps.
- Set controller.[[pullAlgorithm]] to
undefined . - Set controller.[[cancelAlgorithm]] to
undefined .
3.13.5. ReadableByteStreamControllerClearPendingPullIntos ( controller ) nothrow
- Perform !
ReadableByteStreamControllerInvalidateBYOBRequest (controller). - Set controller.[[pendingPullIntos]] to a new empty
List .
3.13.6. ReadableByteStreamControllerClose ( controller ) throws
- Let stream be controller.[[controlledReadableByteStream]].
- Assert: controller.[[closeRequested]] is
false . - Assert: stream.[[state]] is
"readable"
. - If controller.[[queueTotalSize]] >
0 ,- Set controller.[[closeRequested]] to
true . - Return.
- Set controller.[[closeRequested]] to
- If controller.[[pendingPullIntos]] is not empty,
- Let firstPendingPullInto be the first element of controller.[[pendingPullIntos]].
- If firstPendingPullInto.[[bytesFilled]] >
0 ,- Let e be a new
TypeError exception. - Perform !
ReadableByteStreamControllerError (controller, e). - Throw e.
- Let e be a new
- Perform !
ReadableByteStreamControllerClearAlgorithms (controller). - Perform !
ReadableStreamClose (stream).
3.13.7. ReadableByteStreamControllerCommitPullIntoDescriptor ( stream, pullIntoDescriptor ) nothrow
- Assert: stream.[[state]] is not
"errored"
. - Let done be
false . - If stream.[[state]] is
"closed"
,- Assert: pullIntoDescriptor.[[bytesFilled]] is
0 . - Set done to
true .
- Assert: pullIntoDescriptor.[[bytesFilled]] is
- Let filledView be !
ReadableByteStreamControllerConvertPullIntoDescriptor (pullIntoDescriptor). - If pullIntoDescriptor.[[readerType]] is
"default"
,- Perform !
ReadableStreamFulfillReadRequest (stream, filledView, done).
- Perform !
- Otherwise,
- Assert: pullIntoDescriptor.[[readerType]] is
"byob"
. - Perform !
ReadableStreamFulfillReadIntoRequest (stream, filledView, done).
- Assert: pullIntoDescriptor.[[readerType]] is
3.13.8. ReadableByteStreamControllerConvertPullIntoDescriptor ( pullIntoDescriptor ) nothrow
- Let bytesFilled be pullIntoDescriptor.[[bytesFilled]].
- Let elementSize be pullIntoDescriptor.[[elementSize]].
- Assert: bytesFilled ≤ pullIntoDescriptor.[[byteLength]].
- Assert: bytesFilled mod elementSize is
0 . - Return !
Construct (pullIntoDescriptor.[[ctor]], « pullIntoDescriptor.[[buffer]], pullIntoDescriptor.[[byteOffset]], bytesFilled ÷ elementSize »).
3.13.9. ReadableByteStreamControllerEnqueue ( controller, chunk ) nothrow
- Let stream be controller.[[controlledReadableByteStream]].
- Assert: controller.[[closeRequested]] is
false . - Assert: stream.[[state]] is
"readable"
. - Let buffer be chunk.[[ViewedArrayBuffer]].
- Let byteOffset be chunk.[[ByteOffset]].
- Let byteLength be chunk.[[ByteLength]].
- Let transferredBuffer be !
TransferArrayBuffer (buffer). - If !
ReadableStreamHasDefaultReader (stream) istrue - If !
ReadableStreamGetNumReadRequests (stream) is0 ,- Perform !
ReadableByteStreamControllerEnqueueChunkToQueue (controller, transferredBuffer, byteOffset, byteLength).
- Perform !
- Otherwise,
- Assert: controller.[[queue]] is empty.
- Let transferredView be !
Construct (%Uint8Array%, « transferredBuffer, byteOffset, byteLength »). - Perform !
ReadableStreamFulfillReadRequest (stream, transferredView,false ).
- If !
- Otherwise, if !
ReadableStreamHasBYOBReader (stream) istrue ,- Perform !
ReadableByteStreamControllerEnqueueChunkToQueue (controller, transferredBuffer, byteOffset, byteLength). - Perform !
ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue (controller).
- Perform !
- Otherwise,
- Assert: !
IsReadableStreamLocked (stream) isfalse . - Perform !
ReadableByteStreamControllerEnqueueChunkToQueue (controller, transferredBuffer, byteOffset, byteLength).
- Assert: !
- Perform !
ReadableByteStreamControllerCallPullIfNeeded (controller).
3.13.10. ReadableByteStreamControllerEnqueueChunkToQueue ( controller, buffer, byteOffset, byteLength ) nothrow
- Append
Record {[[buffer]]: buffer, [[byteOffset]]: byteOffset, [[byteLength]]: byteLength} as the last element of controller.[[queue]]. - Add byteLength to controller.[[queueTotalSize]].
3.13.11. ReadableByteStreamControllerError ( controller, e ) nothrow
- Let stream be controller.[[controlledReadableByteStream]].
- If stream.[[state]] is not
"readable"
, return. - Perform !
ReadableByteStreamControllerClearPendingPullIntos (controller). - Perform !
ResetQueue (controller). - Perform !
ReadableByteStreamControllerClearAlgorithms (controller). - Perform !
ReadableStreamError (stream, e).
3.13.12. ReadableByteStreamControllerFillHeadPullIntoDescriptor ( controller, size, pullIntoDescriptor ) nothrow
- Assert: either controller.[[pendingPullIntos]] is empty, or the first element of controller.[[pendingPullIntos]] is pullIntoDescriptor.
- Perform !
ReadableByteStreamControllerInvalidateBYOBRequest (controller). - Set pullIntoDescriptor.[[bytesFilled]] to pullIntoDescriptor.[[bytesFilled]] + size.
3.13.13. ReadableByteStreamControllerFillPullIntoDescriptorFromQueue ( controller, pullIntoDescriptor ) nothrow
- Let elementSize be pullIntoDescriptor.[[elementSize]].
- Let currentAlignedBytes be pullIntoDescriptor.[[bytesFilled]] − (pullIntoDescriptor.[[bytesFilled]] mod elementSize).
- Let maxBytesToCopy be
min (controller.[[queueTotalSize]], pullIntoDescriptor.[[byteLength]] − pullIntoDescriptor.[[bytesFilled]]). - Let maxBytesFilled be pullIntoDescriptor.[[bytesFilled]] + maxBytesToCopy.
- Let maxAlignedBytes be maxBytesFilled − (maxBytesFilled mod elementSize).
- Let totalBytesToCopyRemaining be maxBytesToCopy.
- Let ready be
false . - If maxAlignedBytes > currentAlignedBytes,
- Set totalBytesToCopyRemaining to maxAlignedBytes − pullIntoDescriptor.[[bytesFilled]].
- Set ready to
true .
- Let queue be controller.[[queue]].
- Repeat the following steps while totalBytesToCopyRemaining >
0 ,- Let headOfQueue be the first element of queue.
- Let bytesToCopy be
min (totalBytesToCopyRemaining, headOfQueue.[[byteLength]]). - Let destStart be pullIntoDescriptor.[[byteOffset]] + pullIntoDescriptor.[[bytesFilled]].
- Perform !
CopyDataBlockBytes (pullIntoDescriptor.[[buffer]].[[ArrayBufferData]], destStart, headOfQueue.[[buffer]].[[ArrayBufferData]], headOfQueue.[[byteOffset]], bytesToCopy). - If headOfQueue.[[byteLength]] is bytesToCopy,
- Remove the first element of queue, shifting all other elements downward (so that the second becomes the first, and so on).
- Otherwise,
- Set headOfQueue.[[byteOffset]] to headOfQueue.[[byteOffset]] + bytesToCopy.
- Set headOfQueue.[[byteLength]] to headOfQueue.[[byteLength]] − bytesToCopy.
- Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] − bytesToCopy.
- Perform !
ReadableByteStreamControllerFillHeadPullIntoDescriptor (controller, bytesToCopy, pullIntoDescriptor). - Set totalBytesToCopyRemaining to totalBytesToCopyRemaining − bytesToCopy.
- If ready is
false ,- Assert: controller.[[queueTotalSize]] is
0 . - Assert: pullIntoDescriptor.[[bytesFilled]] >
0 . - Assert: pullIntoDescriptor.[[bytesFilled]] < pullIntoDescriptor.[[elementSize]].
- Assert: controller.[[queueTotalSize]] is
- Return ready.
3.13.14. ReadableByteStreamControllerGetDesiredSize ( controller ) nothrow
- Let stream be controller.[[controlledReadableByteStream]].
- Let state be stream.[[state]].
- If state is
"errored"
, returnnull . - If state is
"closed"
, return0 . - Return controller.[[strategyHWM]] − controller.[[queueTotalSize]].
3.13.15. ReadableByteStreamControllerHandleQueueDrain ( controller ) nothrow
- Assert: controller.[[controlledReadableByteStream]].[[state]] is
"readable"
. - If controller.[[queueTotalSize]] is
0 and controller.[[closeRequested]] istrue ,- Perform !
ReadableByteStreamControllerClearAlgorithms (controller). - Perform !
ReadableStreamClose (controller.[[controlledReadableByteStream]]).
- Perform !
- Otherwise,
- Perform !
ReadableByteStreamControllerCallPullIfNeeded (controller).
- Perform !
3.13.16. ReadableByteStreamControllerInvalidateBYOBRequest ( controller ) nothrow
- If controller.[[byobRequest]] is
undefined , return. - Set controller.[[byobRequest]].[[associatedReadableByteStreamController]] to
undefined . - Set controller.[[byobRequest]].[[view]] to
undefined . - Set controller.[[byobRequest]] to
undefined .
3.13.17. ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue ( controller ) nothrow
- Assert: controller.[[closeRequested]] is
false . - Repeat the following steps while controller.[[pendingPullIntos]] is not empty,
- If controller.[[queueTotalSize]] is
0 , return. - Let pullIntoDescriptor be the first element of controller.[[pendingPullIntos]].
- If !
ReadableByteStreamControllerFillPullIntoDescriptorFromQueue (controller, pullIntoDescriptor) istrue ,- Perform !
ReadableByteStreamControllerShiftPendingPullInto (controller). - Perform !
ReadableByteStreamControllerCommitPullIntoDescriptor (controller.[[controlledReadableByteStream]], pullIntoDescriptor).
- Perform !
- If controller.[[queueTotalSize]] is
3.13.18. ReadableByteStreamControllerPullInto ( controller, view, forAuthorCode ) nothrow
- Let stream be controller.[[controlledReadableByteStream]].
- Let elementSize be 1.
- Let ctor be
%DataView% . - If view has a [[TypedArrayName]] internal slot (i.e., it is not a
DataView
),- Set elementSize to the element size specified in the typed array constructors table for view.[[TypedArrayName]].
- Set ctor to the constructor specified in the typed array constructors table for view.[[TypedArrayName]].
- Let byteOffset be view.[[ByteOffset]].
- Let byteLength be view.[[ByteLength]].
- Let buffer be !
TransferArrayBuffer (view.[[ViewedArrayBuffer]]). - Let pullIntoDescriptor be
Record {[[buffer]]: buffer, [[byteOffset]]: byteOffset, [[byteLength]]: byteLength, [[bytesFilled]]:0 , [[elementSize]]: elementSize, [[ctor]]: ctor, [[readerType]]:"byob"
}. - If controller.[[pendingPullIntos]] is not empty,
- Append pullIntoDescriptor as the last element of controller.[[pendingPullIntos]].
- Return !
ReadableStreamAddReadIntoRequest (stream, forAuthorCode).
- If stream.[[state]] is
"closed"
,- Let emptyView be !
Construct (ctor, « pullIntoDescriptor.[[buffer]], pullIntoDescriptor.[[byteOffset]],0 »). - Return a promise resolved with !
ReadableStreamCreateReadResult (emptyView,true , forAuthorCode).
- Let emptyView be !
- If controller.[[queueTotalSize]] >
0 ,- If !
ReadableByteStreamControllerFillPullIntoDescriptorFromQueue (controller, pullIntoDescriptor) istrue ,- Let filledView be !
ReadableByteStreamControllerConvertPullIntoDescriptor (pullIntoDescriptor). - Perform !
ReadableByteStreamControllerHandleQueueDrain (controller). - Return a promise resolved with !
ReadableStreamCreateReadResult (filledView,false , forAuthorCode).
- Let filledView be !
- If controller.[[closeRequested]] is
true ,- Let e be a
TypeError exception. - Perform !
ReadableByteStreamControllerError (controller, e). - Return a promise rejected with e.
- Let e be a
- If !
- Append pullIntoDescriptor as the last element of controller.[[pendingPullIntos]].
- Let promise be !
ReadableStreamAddReadIntoRequest (stream, forAuthorCode). - Perform !
ReadableByteStreamControllerCallPullIfNeeded (controller). - Return promise.
3.13.19. ReadableByteStreamControllerRespond ( controller, bytesWritten ) throws
- Let bytesWritten be ?
ToNumber (bytesWritten). - If !
IsFiniteNonNegativeNumber (bytesWritten) isfalse ,- Throw a
RangeError exception.
- Throw a
- Assert: controller.[[pendingPullIntos]] is not empty.
- Perform ?
ReadableByteStreamControllerRespondInternal (controller, bytesWritten).
3.13.20. ReadableByteStreamControllerRespondInClosedState ( controller, firstDescriptor ) nothrow
- Set firstDescriptor.[[buffer]] to !
TransferArrayBuffer (firstDescriptor.[[buffer]]). - Assert: firstDescriptor.[[bytesFilled]] is
0 . - Let stream be controller.[[controlledReadableByteStream]].
- If !
ReadableStreamHasBYOBReader (stream) istrue ,- Repeat the following steps while !
ReadableStreamGetNumReadIntoRequests (stream) >0 ,- Let pullIntoDescriptor be !
ReadableByteStreamControllerShiftPendingPullInto (controller). - Perform !
ReadableByteStreamControllerCommitPullIntoDescriptor (stream, pullIntoDescriptor).
- Let pullIntoDescriptor be !
- Repeat the following steps while !
3.13.21. ReadableByteStreamControllerRespondInReadableState ( controller, bytesWritten, pullIntoDescriptor ) throws
- If pullIntoDescriptor.[[bytesFilled]] + bytesWritten > pullIntoDescriptor.[[byteLength]], throw a
RangeError exception. - Perform !
ReadableByteStreamControllerFillHeadPullIntoDescriptor (controller, bytesWritten, pullIntoDescriptor). - If pullIntoDescriptor.[[bytesFilled]] < pullIntoDescriptor.[[elementSize]], return.
- Perform !
ReadableByteStreamControllerShiftPendingPullInto (controller). - Let remainderSize be pullIntoDescriptor.[[bytesFilled]] mod pullIntoDescriptor.[[elementSize]].
- If remainderSize >
0 ,- Let end be pullIntoDescriptor.[[byteOffset]] + pullIntoDescriptor.[[bytesFilled]].
- Let remainder be ?
CloneArrayBuffer (pullIntoDescriptor.[[buffer]], end − remainderSize, remainderSize,%ArrayBuffer% ). - Perform !
ReadableByteStreamControllerEnqueueChunkToQueue (controller, remainder,0 , remainder.[[ByteLength]]).
- Set pullIntoDescriptor.[[buffer]] to !
TransferArrayBuffer (pullIntoDescriptor.[[buffer]]). - Set pullIntoDescriptor.[[bytesFilled]] to pullIntoDescriptor.[[bytesFilled]] − remainderSize.
- Perform !
ReadableByteStreamControllerCommitPullIntoDescriptor (controller.[[controlledReadableByteStream]], pullIntoDescriptor). - Perform !
ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue (controller).
3.13.22. ReadableByteStreamControllerRespondInternal ( controller, bytesWritten ) throws
- Let firstDescriptor be the first element of controller.[[pendingPullIntos]].
- Let stream be controller.[[controlledReadableByteStream]].
- If stream.[[state]] is
"closed"
,- If bytesWritten is not
0 , throw aTypeError exception. - Perform !
ReadableByteStreamControllerRespondInClosedState (controller, firstDescriptor).
- If bytesWritten is not
- Otherwise,
- Assert: stream.[[state]] is
"readable"
. - Perform ?
ReadableByteStreamControllerRespondInReadableState (controller, bytesWritten, firstDescriptor).
- Assert: stream.[[state]] is
- Perform !
ReadableByteStreamControllerCallPullIfNeeded (controller).
3.13.23. ReadableByteStreamControllerRespondWithNewView ( controller, view ) throws
- Assert: controller.[[pendingPullIntos]] is not empty.
- Let firstDescriptor be the first element of controller.[[pendingPullIntos]].
- If firstDescriptor.[[byteOffset]] + firstDescriptor.[[bytesFilled]] is not view.[[ByteOffset]], throw a
RangeError exception. - If firstDescriptor.[[byteLength]] is not view.[[ByteLength]], throw a
RangeError exception. - Set firstDescriptor.[[buffer]] to view.[[ViewedArrayBuffer]].
- Perform ?
ReadableByteStreamControllerRespondInternal (controller, view.[[ByteLength]]).
3.13.24. ReadableByteStreamControllerShiftPendingPullInto ( controller ) nothrow
- Let descriptor be the first element of controller.[[pendingPullIntos]].
- Remove descriptor from controller.[[pendingPullIntos]], shifting all other elements downward (so that the second becomes the first, and so on).
- Perform !
ReadableByteStreamControllerInvalidateBYOBRequest (controller). - Return descriptor.
3.13.25. ReadableByteStreamControllerShouldCallPull ( controller ) nothrow
- Let stream be controller.[[controlledReadableByteStream]].
- If stream.[[state]] is not
"readable"
, returnfalse . - If controller.[[closeRequested]] is
true , returnfalse . - If controller.[[started]] is
false , returnfalse . - If !
ReadableStreamHasDefaultReader (stream) istrue and !ReadableStreamGetNumReadRequests (stream) >0 , returntrue . - If !
ReadableStreamHasBYOBReader (stream) istrue and !ReadableStreamGetNumReadIntoRequests (stream) >0 , returntrue . - Let desiredSize be !
ReadableByteStreamControllerGetDesiredSize (controller). - Assert: desiredSize is not
null . - If desiredSize >
0 , returntrue . - Return
false .
3.13.26. SetUpReadableByteStreamController ( stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, autoAllocateChunkSize ) throws
- Assert: stream.[[readableStreamController]] is
undefined . - If autoAllocateChunkSize is not
undefined ,- Assert: !
IsInteger (autoAllocateChunkSize) istrue . - Assert: autoAllocateChunkSize is positive.
- Assert: !
- Set controller.[[controlledReadableByteStream]] to stream.
- Set controller.[[pullAgain]] and controller.[[pulling]] to
false . - Perform !
ReadableByteStreamControllerClearPendingPullIntos (controller). - Perform !
ResetQueue (controller). - Set controller.[[closeRequested]] and controller.[[started]] to
false . - Set controller.[[strategyHWM]] to ?
ValidateAndNormalizeHighWaterMark (highWaterMark). - Set controller.[[pullAlgorithm]] to pullAlgorithm.
- Set controller.[[cancelAlgorithm]] to cancelAlgorithm.
- Set controller.[[autoAllocateChunkSize]] to autoAllocateChunkSize.
- Set controller.[[pendingPullIntos]] to a new empty
List . - Set stream.[[readableStreamController]] to controller.
- Let startResult be the result of performing startAlgorithm.
- Let startPromise be a promise resolved with startResult.
- Upon fulfillment of startPromise,
- Set controller.[[started]] to
true . - Assert: controller.[[pulling]] is
false . - Assert: controller.[[pullAgain]] is
false . - Perform !
ReadableByteStreamControllerCallPullIfNeeded (controller).
- Set controller.[[started]] to
- Upon rejection of startPromise with reason r,
- Perform !
ReadableByteStreamControllerError (controller, r).
- Perform !
3.13.27. SetUpReadableByteStreamControllerFromUnderlyingSource ( stream, underlyingByteSource, highWaterMark ) throws
- Assert: underlyingByteSource is not
undefined . - Let controller be
ObjectCreate (the original value ofReadableByteStreamController
'sprototype
property). - Let startAlgorithm be the following steps:
- Return ?
InvokeOrNoop (underlyingByteSource,"start"
, « controller »).
- Return ?
- Let pullAlgorithm be ?
CreateAlgorithmFromUnderlyingMethod (underlyingByteSource,"pull"
,0 , « controller »). - Let cancelAlgorithm be ?
CreateAlgorithmFromUnderlyingMethod (underlyingByteSource,"cancel"
,1 , « »). - Let autoAllocateChunkSize be ?
GetV (underlyingByteSource,"autoAllocateChunkSize"
). - If autoAllocateChunkSize is not
undefined , - Perform ?
SetUpReadableByteStreamController (stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, autoAllocateChunkSize).
3.13.28. SetUpReadableStreamBYOBRequest ( request, controller, view ) nothrow
- Assert: !
IsReadableByteStreamController (controller) istrue . - Assert:
Type (view) is Object. - Assert: view has a [[ViewedArrayBuffer]] internal slot.
- Assert: !
IsDetachedBuffer (view.[[ViewedArrayBuffer]]) isfalse . - Set request.[[associatedReadableByteStreamController]] to controller.
- Set request.[[view]] to view.
4. Writable streams
4.1. Using writable streams
readableStream. pipeTo( writableStream)
. then(() => console. log( "All data successfully written!" ))
. catch ( e => console. error( "Something went wrong!" , e));
write()
and close()
methods. Since writable streams
queue any incoming writes, and take care internally to forward them to the underlying sink in sequence, you can
indiscriminately write to a writable stream without much ceremony:
function writeArrayToStream( array, writableStream) {
const writer = writableStream. getWriter();
array. forEach( chunk => writer. write( chunk). catch (() => {}));
return writer. close();
}
writeArrayToStream([ 1 , 2 , 3 , 4 , 5 ], writableStream)
. then(() => console. log( "All done!" ))
. catch ( e => console. error( "Error with the stream: " + e));
Note how we use .catch(() => {})
to suppress any rejections from the write()
method; we’ll be notified of any fatal errors via a rejection of the close()
method, and leaving them un-caught would cause potential unhandledrejection
events and console warnings.
close()
method. That promise will reject if anything
goes wrong with the stream—initializing it, writing to it, or closing it. And it will fulfill once the stream is
successfully closed. Often this is all you care about.
However, if you care about the success of writing a specific chunk, you can use the promise returned by the
writer’s write()
method:
writer. write( "i am a chunk of data" )
. then(() => console. log( "chunk successfully written!" ))
. catch ( e => console. error( e));
What "success" means is up to a given stream instance (or more precisely, its underlying sink) to decide. For example, for a file stream it could simply mean that the OS has accepted the write, and not necessarily that the chunk has been flushed to disk. Some streams might not be able to give such a signal at all, in which case the returned promise will fulfill immediately.
desiredSize
and ready
properties of writable
stream writers allow producers to more precisely respond to flow control signals from the stream, to keep
memory usage below the stream’s specified high water mark. The following example writes an infinite sequence of
random bytes to a stream, using desiredSize
to determine how many bytes to generate at
a given time, and using ready
to wait for the backpressure to subside.
async function writeRandomBytesForever( writableStream) {
const writer = writableStream. getWriter();
while ( true ) {
await writer. ready;
const bytes = new Uint8Array( writer. desiredSize);
crypto. getRandomValues( bytes);
// Purposefully don’t await; awaiting writer.ready is enough.
writer. write( bytes). catch (() => {});
}
}
writeRandomBytesForever( myWritableStream). catch ( e => console. error( "Something broke" , e));
Note how we don’t await
the promise returned by write()
; this would be
redundant with await
ing the ready
promise. Additionally, similar to a previous example, we use the .catch(() => {})
pattern on the
promises returned by write()
; in this case we’ll be notified about any failures await
ing the ready
promise.
await
the promise returned by write()
, consider a modification of the above example, where we continue to use the WritableStreamDefaultWriter
interface directly, but we don’t control how many bytes we have to write at a given
time. In that case, the backpressure-respecting code looks the same:
async function writeSuppliedBytesForever( writableStream, getBytes) {
const writer = writableStream. getWriter();
while ( true ) {
await writer. ready;
const bytes = getBytes();
writer. write( bytes). catch (() => {});
}
}
Unlike the previous example, where—because we were always writing exactly writer.desiredSize
bytes each time—the write()
and ready
promises were synchronized, in this
case it’s quite possible that the ready
promise fulfills before the one returned by write()
does. Remember, the ready
promise fulfills
when the desired size becomes positive, which might be
before the write succeeds (especially in cases with a larger high water mark).
In other words, await
ing the return value of write()
means you never
queue up writes in the stream’s internal queue, instead only executing a write after the previous one succeeds,
which can result in low throughput.
4.2. Class WritableStream
4.2.1. Class definition
This section is non-normative.
If one were to write the WritableStream
class in something close to the syntax of [ECMASCRIPT], it would look
like
class WritableStream {
constructor( underlyingSink = {}, strategy = {})
get locked()
abort( reason)
getWriter()
}
4.2.2. Internal slots
Instances of WritableStream
are created with the internal slots described in the following table:
Internal Slot | Description (non-normative) |
---|---|
[[backpressure]] | The backpressure signal set by the controller |
[[closeRequest]] | The promise returned from the writer close() method
|
[[inFlightWriteRequest]] | A slot set to the promise for the current in-flight write operation while the underlying sink’s write algorithm is executing and has not yet fulfilled, used to prevent reentrant calls |
[[inFlightCloseRequest]] | A slot set to the promise for the current in-flight close operation while the underlying sink’s close algorithm is executing and has not yet fulfilled, used to prevent the abort() method from interrupting close
|
[[pendingAbortRequest]] | A abort() and the reason passed to abort()
|
[[state]] | A string containing the stream’s current state, used internally; one of "writable" , "closed" , "erroring" , or "errored"
|
[[storedError]] | A value indicating how the stream failed, to be given as a failure reason or exception
when trying to operate on the stream while in the "errored" state
|
[[writableStreamController]] | A WritableStreamDefaultController created with the ability to control the state and
queue of this stream; also used for the IsWritableStream brand check
|
[[writer]] | A WritableStreamDefaultWriter instance, if the stream is locked to a writer, or |
[[writeRequests]] | A |
The [[inFlightCloseRequest]] slot and [[closeRequest]] slot are mutually exclusive. Similarly, no element will be
removed from [[writeRequests]] while [[inFlightWriteRequest]] is not
4.2.3. new WritableStream(underlyingSink = {}, strategy = {})
underlyingSink
argument represents the underlying sink, as described in §4.2.4 Underlying sink API.
The strategy
argument represents the stream’s queuing strategy, as described in §6.1.1 The queuing strategy API. If it
is not provided, the default behavior will be the same as a CountQueuingStrategy
with a high water mark of 1.
- Perform !
InitializeWritableStream (this). - Let size be ?
GetV (strategy,"size"
). - Let highWaterMark be ?
GetV (strategy,"highWaterMark"
). - Let type be ?
GetV (underlyingSink,"type"
). - If type is not
undefined , throw aRangeError exception.This is to allow us to add new potential types in the future, without backward-compatibility concerns.
- Let sizeAlgorithm be ?
MakeSizeAlgorithmFromSizeFunction (size). - If highWaterMark is
undefined , let highWaterMark be1 . - Set highWaterMark to ?
ValidateAndNormalizeHighWaterMark (highWaterMark). - Perform ?
SetUpWritableStreamDefaultControllerFromUnderlyingSink (this , underlyingSink, highWaterMark, sizeAlgorithm).
4.2.4. Underlying sink API
This section is non-normative.
The WritableStream()
constructor accepts as its first argument a JavaScript object representing the underlying
sink. Such objects can contain any of the following properties:
start(controller)
-
A function that is called immediately during creation of the
WritableStream
.Typically this is used to acquire access to the underlying sink resource being represented.
If this setup process is asynchronous, it can return a promise to signal success or failure; a rejected promise will error the stream. Any thrown exceptions will be re-thrown by the
WritableStream()
constructor. write(chunk, controller)
-
A function that is called when a new chunk of data is ready to be written to the underlying sink. The stream implementation guarantees that this function will be called only after previous writes have succeeded, and never before
start()
has succeeded or afterclose()
orabort()
have been called.This function is used to actually send the data to the resource presented by the underlying sink, for example by calling a lower-level API.
If the process of writing data is asynchronous, and communicates success or failure signals back to its user, then this function can return a promise to signal success or failure. This promise return value will be communicated back to the caller of
writer.write()
, so they can monitor that individual write. Throwing an exception is treated the same as returning a rejected promise.Note that such signals are not always available; compare e.g. §8.6 A writable stream with no backpressure or success signals with §8.7 A writable stream with backpressure and success signals. In such cases, it’s best to not return anything.
The promise potentially returned by this function also governs whether the given chunk counts as written for the purposes of computed the desired size to fill the stream’s internal queue. That is, during the time it takes the promise to settle,
writer.desiredSize
will stay at its previous value, only increasing to signal the desire for more chunks once the write succeeds. close()
-
A function that is called after the producer signals, via
writer.close()
, that they are done writing chunks to the stream, and subsequently all queued-up writes have successfully completed.This function can perform any actions necessary to finalize or flush writes to the underlying sink, and release access to any held resources.
If the shutdown process is asynchronous, the function can return a promise to signal success or failure; the result will be communicated via the return value of the called
writer.close()
method. Additionally, a rejected promise will error the stream, instead of letting it close successfully. Throwing an exception is treated the same as returning a rejected promise. abort(reason)
-
A function that is called after the producer signals, via
stream.abort()
orwriter.abort()
, that they wish to abort the stream. It takes as its argument the same value as was passed to those methods by the producer.Writable streams can additionally be aborted under certain conditions during piping; see the definition of the
pipeTo()
method for more details.This function can clean up any held resources, much like
close()
, but perhaps with some custom handling.If the shutdown process is asynchronous, the function can return a promise to signal success or failure; the result will be communicated via the return value of the called
abort()
method. Throwing an exception is treated the same as returning a rejected promise. Regardless, the stream will be errored with a newTypeError
indicating that it was aborted.
The controller
argument passed to start()
and write()
is an
instance of WritableStreamDefaultController
, and has the ability to error the stream. This is mainly used for
bridging the gap with non-promise-based APIs, as seen for example in §8.6 A writable stream with no backpressure or success signals.
4.2.5. Properties of the WritableStream
prototype
4.2.5.1. get locked
locked
getter returns whether or not the writable stream is locked to a writer. - If !
IsWritableStream (this ) isfalse , throw aTypeError exception. - Return !
IsWritableStreamLocked (this ).
4.2.5.2. abort(reason)
abort
method aborts the stream, signaling that the producer can
no longer successfully write to the stream and it is to be immediately moved to an errored state, with any queued-up
writes discarded. This will also execute any abort mechanism of the underlying sink. - If !
IsWritableStream (this ) isfalse , return a promise rejected with aTypeError exception. - If !
IsWritableStreamLocked (this ) istrue , return a promise rejected with aTypeError exception. - Return !
WritableStreamAbort (this , reason).
4.2.5.3. getWriter()
getWriter
method creates a writer (an instance of WritableStreamDefaultWriter
) and locks the stream to the new writer. While the stream is locked, no other writer can be
acquired until this one is released.
This functionality is especially useful for creating abstractions that desire the ability to write to a stream without interruption or interleaving. By getting a writer for the stream, you can ensure nobody else can write at the same time, which would cause the resulting written data to be unpredictable and probably useless.
- If !
IsWritableStream (this ) isfalse , throw aTypeError exception. - Return ?
AcquireWritableStreamDefaultWriter (this ).
4.3. General writable stream abstract operations
The following abstract operations, unlike most in this specification, are meant to be generally useful by other specifications, instead of just being part of the implementation of this spec’s classes.
4.3.1. AcquireWritableStreamDefaultWriter ( stream ) throws
- Return ?
Construct (WritableStreamDefaultWriter
, « stream »).
4.3.2. CreateWritableStream ( startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm [, highWaterMark [, sizeAlgorithm ] ] ) throws
This abstract operation is meant to be called from other specifications that wish to create WritableStream
instances. The writeAlgorithm, closeAlgorithm and abortAlgorithm algorithms must return
promises; if supplied, sizeAlgorithm must be an algorithm accepting chunk objects and returning a
number; and if supplied, highWaterMark must be a non-negative, non-NaN number.
- If highWaterMark was not passed, set it to
1 . - If sizeAlgorithm was not passed, set it to an algorithm that returns
1 . - Assert: !
IsNonNegativeNumber (highWaterMark) istrue . - Let stream be
ObjectCreate (the original value ofWritableStream
'sprototype
property). - Perform !
InitializeWritableStream (stream). - Let controller be
ObjectCreate (the original value ofWritableStreamDefaultController
'sprototype
property). - Perform ?
SetUpWritableStreamDefaultController (stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm). - Return stream.
4.3.3. InitializeWritableStream ( stream ) nothrow
- Set stream.[[state]] to
"writable"
. - Set stream.[[storedError]], stream.[[writer]], stream.[[writableStreamController]],
stream.[[inFlightWriteRequest]], stream.[[closeRequest]], stream.[[inFlightCloseRequest]] and
stream.[[pendingAbortRequest]] to
undefined . - Set stream.[[writeRequests]] to a new empty
List . - Set stream.[[backpressure]] to
false .
4.3.4. IsWritableStream ( x ) nothrow
- If
Type (x) is not Object, returnfalse . - If x does not have a [[writableStreamController]] internal slot, return
false . - Return
true .
4.3.5. IsWritableStreamLocked ( stream ) nothrow
This abstract operation is meant to be called from other specifications that may wish to query whether or not a writable stream is locked to a writer.
- Assert: !
IsWritableStream (stream) istrue . - If stream.[[writer]] is
undefined , returnfalse . - Return
true .
4.3.6. WritableStreamAbort ( stream, reason ) nothrow
- Let state be stream.[[state]].
- If state is
"closed"
or"errored"
, return a promise resolved withundefined . - If stream.[[pendingAbortRequest]] is not
undefined , return stream.[[pendingAbortRequest]].[[promise]]. - Assert: state is
"writable"
or"erroring"
. - Let wasAlreadyErroring be
false . - If state is
"erroring"
,- Set wasAlreadyErroring to
true . - Set reason to
undefined .
- Set wasAlreadyErroring to
- Let promise be a new promise.
- Set stream.[[pendingAbortRequest]] to
Record {[[promise]]: promise, [[reason]]: reason, [[wasAlreadyErroring]]: wasAlreadyErroring}. - If wasAlreadyErroring is
false , perform !WritableStreamStartErroring (stream, reason). - Return promise.
4.4. Writable stream abstract operations used by controllers
To allow future flexibility to add different writable stream behaviors (similar to the distinction between default
readable streams and readable byte streams), much of the internal state of a writable stream is
encapsulated by the WritableStreamDefaultController
class.
The abstract operations in this section are interfaces that are used by the controller implementation to affect its
associated WritableStream
object, translating the controller’s internal state changes into developer-facing results
visible through the WritableStream
's public API.
4.4.1. WritableStreamAddWriteRequest ( stream ) nothrow
- Assert: !
IsWritableStreamLocked (stream) istrue . - Assert: stream.[[state]] is
"writable"
. - Let promise be a new promise.
- Append promise as the last element of stream.[[writeRequests]].
- Return promise.
4.4.2. WritableStreamDealWithRejection ( stream, error ) nothrow
- Let state be stream.[[state]].
- If state is
"writable"
,- Perform !
WritableStreamStartErroring (stream, error). - Return.
- Perform !
- Assert: state is
"erroring"
. - Perform !
WritableStreamFinishErroring (stream).
4.4.3. WritableStreamStartErroring ( stream, reason ) nothrow
- Assert: stream.[[storedError]] is
undefined . - Assert: stream.[[state]] is
"writable"
. - Let controller be stream.[[writableStreamController]].
- Assert: controller is not
undefined . - Set stream.[[state]] to
"erroring"
. - Set stream.[[storedError]] to reason.
- Let writer be stream.[[writer]].
- If writer is not
undefined , perform !WritableStreamDefaultWriterEnsureReadyPromiseRejected (writer, reason). - If !
WritableStreamHasOperationMarkedInFlight (stream) isfalse and controller.[[started]] istrue , perform !WritableStreamFinishErroring (stream).
4.4.4. WritableStreamFinishErroring ( stream ) nothrow
- Assert: stream.[[state]] is
"erroring"
. - Assert: !
WritableStreamHasOperationMarkedInFlight (stream) isfalse . - Set stream.[[state]] to
"errored"
. - Perform ! stream.[[writableStreamController]].[[ErrorSteps]]().
- Let storedError be stream.[[storedError]].
- Repeat for each writeRequest that is an element of stream.[[writeRequests]],
- Reject writeRequest with storedError.
- Set stream.[[writeRequests]] to an empty
List . - If stream.[[pendingAbortRequest]] is
undefined ,- Perform !
WritableStreamRejectCloseAndClosedPromiseIfNeeded (stream). - Return.
- Perform !
- Let abortRequest be stream.[[pendingAbortRequest]].
- Set stream.[[pendingAbortRequest]] to
undefined . - If abortRequest.[[wasAlreadyErroring]] is
true ,- Reject abortRequest.[[promise]] with storedError.
- Perform !
WritableStreamRejectCloseAndClosedPromiseIfNeeded (stream). - Return.
- Let promise be ! stream.[[writableStreamController]].[[AbortSteps]](abortRequest.[[reason]]).
- Upon fulfillment of promise,
- Resolve abortRequest.[[promise]] with
undefined . - Perform !
WritableStreamRejectCloseAndClosedPromiseIfNeeded (stream).
- Resolve abortRequest.[[promise]] with
- Upon rejection of promise with reason reason,
- Reject abortRequest.[[promise]] with reason.
- Perform !
WritableStreamRejectCloseAndClosedPromiseIfNeeded (stream).
4.4.5. WritableStreamFinishInFlightWrite ( stream ) nothrow
- Assert: stream.[[inFlightWriteRequest]] is not
undefined . - Resolve stream.[[inFlightWriteRequest]] with
undefined . - Set stream.[[inFlightWriteRequest]] to
undefined .
4.4.6. WritableStreamFinishInFlightWriteWithError ( stream, error ) nothrow
- Assert: stream.[[inFlightWriteRequest]] is not
undefined . - Reject stream.[[inFlightWriteRequest]] with error.
- Set stream.[[inFlightWriteRequest]] to
undefined . - Assert: stream.[[state]] is
"writable"
or"erroring"
. - Perform !
WritableStreamDealWithRejection (stream, error).
4.4.7. WritableStreamFinishInFlightClose ( stream ) nothrow
- Assert: stream.[[inFlightCloseRequest]] is not
undefined . - Resolve stream.[[inFlightCloseRequest]] with
undefined . - Set stream.[[inFlightCloseRequest]] to
undefined . - Let state be stream.[[state]].
- Assert: stream.[[state]] is
"writable"
or"erroring"
. - If state is
"erroring"
,- Set stream.[[storedError]] to
undefined . - If stream.[[pendingAbortRequest]] is not
undefined ,- Resolve stream.[[pendingAbortRequest]].[[promise]] with
undefined . - Set stream.[[pendingAbortRequest]] to
undefined .
- Resolve stream.[[pendingAbortRequest]].[[promise]] with
- Set stream.[[storedError]] to
- Set stream.[[state]] to
"closed"
. - Let writer be stream.[[writer]].
- If writer is not
undefined , resolve writer.[[closedPromise]] withundefined . - Assert: stream.[[pendingAbortRequest]] is
undefined . - Assert: stream.[[storedError]] is
undefined .
4.4.8. WritableStreamFinishInFlightCloseWithError ( stream, error ) nothrow
- Assert: stream.[[inFlightCloseRequest]] is not
undefined . - Reject stream.[[inFlightCloseRequest]] with error.
- Set stream.[[inFlightCloseRequest]] to
undefined . - Assert: stream.[[state]] is
"writable"
or"erroring"
. - If stream.[[pendingAbortRequest]] is not
undefined ,- Reject stream.[[pendingAbortRequest]].[[promise]] with error.
- Set stream.[[pendingAbortRequest]] to
undefined .
- Perform !
WritableStreamDealWithRejection (stream, error).
4.4.9. WritableStreamCloseQueuedOrInFlight ( stream ) nothrow
- If stream.[[closeRequest]] is
undefined and stream.[[inFlightCloseRequest]] isundefined , returnfalse . - Return
true .
4.4.10. WritableStreamHasOperationMarkedInFlight ( stream ) nothrow
- If stream.[[inFlightWriteRequest]] is
undefined and controller.[[inFlightCloseRequest]] isundefined , returnfalse . - Return
true .
4.4.11. WritableStreamMarkCloseRequestInFlight ( stream ) nothrow
- Assert: stream.[[inFlightCloseRequest]] is
undefined . - Assert: stream.[[closeRequest]] is not
undefined . - Set stream.[[inFlightCloseRequest]] to stream.[[closeRequest]].
- Set stream.[[closeRequest]] to
undefined .
4.4.12. WritableStreamMarkFirstWriteRequestInFlight ( stream ) nothrow
- Assert: stream.[[inFlightWriteRequest]] is
undefined . - Assert: stream.[[writeRequests]] is not empty.
- Let writeRequest be the first element of stream.[[writeRequests]].
- Remove writeRequest from stream.[[writeRequests]], shifting all other elements downward (so that the second becomes the first, and so on).
- Set stream.[[inFlightWriteRequest]] to writeRequest.
4.4.13. WritableStreamRejectCloseAndClosedPromiseIfNeeded ( stream ) nothrow
- Assert: stream.[[state]] is
"errored"
. - If stream.[[closeRequest]] is not
undefined ,- Assert: stream.[[inFlightCloseRequest]] is
undefined . - Reject stream.[[closeRequest]] with stream.[[storedError]].
- Set stream.[[closeRequest]] to
undefined .
- Assert: stream.[[inFlightCloseRequest]] is
- Let writer be stream.[[writer]].
- If writer is not
undefined ,- Reject writer.[[closedPromise]] with stream.[[storedError]].
- Set writer.[[closedPromise]].[[PromiseIsHandled]] to
true .
4.4.14. WritableStreamUpdateBackpressure ( stream, backpressure ) nothrow
- Assert: stream.[[state]] is
"writable"
. - Assert: !
WritableStreamCloseQueuedOrInFlight (stream) isfalse . - Let writer be stream.[[writer]].
- If writer is not
undefined and backpressure is not stream.[[backpressure]],- If backpressure is
true , set writer.[[readyPromise]] to a new promise. - Otherwise,
- Assert: backpressure is
false . - Resolve writer.[[readyPromise]] with
undefined .
- Assert: backpressure is
- If backpressure is
- Set stream.[[backpressure]] to backpressure.
4.5. Class WritableStreamDefaultWriter
The WritableStreamDefaultWriter
class represents a writable stream writer designed to be vended by a WritableStream
instance.
4.5.1. Class definition
This section is non-normative.
If one were to write the WritableStreamDefaultWriter
class in something close to the syntax of [ECMASCRIPT], it
would look like
class WritableStreamDefaultWriter {
constructor( stream)
get closed()
get desiredSize()
get ready()
abort( reason)
close()
releaseLock()
write( chunk)
}
4.5.2. Internal slots
Instances of WritableStreamDefaultWriter
are created with the internal slots described in the following table:
Internal Slot | Description (non-normative) |
---|---|
[[closedPromise]] | A promise returned by the writer’s closed getter
|
[[ownerWritableStream]] | A WritableStream instance that owns this writer
|
[[readyPromise]] | A promise returned by the writer’s ready getter
|
4.5.3. new WritableStreamDefaultWriter(stream)
WritableStreamDefaultWriter
constructor is generally not meant to be used directly; instead, a
stream’s getWriter()
method ought to be used. - If !
IsWritableStream (stream) isfalse , throw aTypeError exception. - If !
IsWritableStreamLocked (stream) istrue , throw aTypeError exception. - Set
this .[[ownerWritableStream]] to stream. - Set stream.[[writer]] to
this . - Let state be stream.[[state]].
- If state is
"writable"
,- If !
WritableStreamCloseQueuedOrInFlight (stream) isfalse and stream.[[backpressure]] istrue , setthis .[[readyPromise]] to a new promise. - Otherwise, set
this .[[readyPromise]] to a promise resolved withundefined . - Set
this .[[closedPromise]] to a new promise.
- If !
- Otherwise, if state is
"erroring"
,- Set
this .[[readyPromise]] to a promise rejected with stream.[[storedError]]. - Set
this .[[readyPromise]].[[PromiseIsHandled]] totrue . - Set
this .[[closedPromise]] to a new promise.
- Set
- Otherwise, if state is
"closed"
,- Set
this .[[readyPromise]] to a promise resolved withundefined . - Set
this .[[closedPromise]] to a promise resolved withundefined .
- Set
- Otherwise,
- Assert: state is
"errored"
. - Let storedError be stream.[[storedError]].
- Set
this .[[readyPromise]] to a promise rejected with storedError. - Set
this .[[readyPromise]].[[PromiseIsHandled]] totrue . - Set
this .[[closedPromise]] to a promise rejected with storedError. - Set
this .[[closedPromise]].[[PromiseIsHandled]] totrue .
- Assert: state is
4.5.4. Properties of the WritableStreamDefaultWriter
prototype
4.5.4.1. get closed
closed
getter returns a promise that will be fulfilled when the stream becomes closed, or rejected if
the stream ever errors or the writer’s lock is released before the stream finishes
closing. - If !
IsWritableStreamDefaultWriter (this ) isfalse , return a promise rejected with aTypeError exception. - Return
this .[[closedPromise]].
4.5.4.2. get desiredSize
desiredSize
getter returns the desired size to
fill the stream’s internal queue. It can be negative, if the queue is over-full. A producer can use this
information to determine the right amount of data to write.
It will be
- If !
IsWritableStreamDefaultWriter (this ) isfalse , throw aTypeError exception. - If
this .[[ownerWritableStream]] isundefined , throw aTypeError exception. - Return !
WritableStreamDefaultWriterGetDesiredSize (this ).
4.5.4.3. get ready
ready
getter returns a promise that will be fulfilled when the desired size to fill the stream’s internal queue transitions from non-positive to positive,
signaling that it is no longer applying backpressure. Once the desired size to fill the stream’s internal queue dips back to zero or below, the getter will return a new
promise that stays pending until the next transition.
If the stream becomes errored or aborted, or the writer’s lock is released, the returned promise will become rejected.
- If !
IsWritableStreamDefaultWriter (this ) isfalse , return a promise rejected with aTypeError exception. - Return
this .[[readyPromise]].
4.5.4.4. abort(reason)
abort
method behaves the same as that for the
associated stream. (Otherwise, it returns a rejected promise.) - If !
IsWritableStreamDefaultWriter (this ) isfalse , return a promise rejected with aTypeError exception. - If
this .[[ownerWritableStream]] isundefined , return a promise rejected with aTypeError exception. - Return !
WritableStreamDefaultWriterAbort (this , reason).
4.5.4.5. close()
close
method will close the associated writable stream. The underlying sink will finish
processing any previously-written chunks, before invoking its close behavior. During this time any further
attempts to write will fail (without erroring the stream).
The method returns a promise that is fulfilled with
- If !
IsWritableStreamDefaultWriter (this ) isfalse , return a promise rejected with aTypeError exception. - Let stream be
this .[[ownerWritableStream]]. - If stream is
undefined , return a promise rejected with aTypeError exception. - If !
WritableStreamCloseQueuedOrInFlight (stream) istrue , return a promise rejected with aTypeError exception. - Return !
WritableStreamDefaultWriterClose (this ).
4.5.4.6. releaseLock()
releaseLock
method releases the writer’s lock on the corresponding
stream. After the lock is released, the writer is no longer active. If the associated
stream is errored when the lock is released, the writer will appear errored in the same way from now on; otherwise,
the writer will appear closed.
Note that the lock can still be released even if some ongoing writes have not yet finished (i.e. even if the promises
returned from previous calls to write()
have not yet settled). It’s not necessary to
hold the lock on the writer for the duration of the write; the lock instead simply prevents other producers from writing in an interleaved manner.
- If !
IsWritableStreamDefaultWriter (this ) isfalse , throw aTypeError exception. - Let stream be
this .[[ownerWritableStream]]. - If stream is
undefined , return. - Assert: stream.[[writer]] is not
undefined . - Perform !
WritableStreamDefaultWriterRelease (this ).
4.5.4.7. write(chunk)
write
method writes the given chunk to the writable stream, by waiting until any previous
writes have finished successfully, and then sending the chunk to the underlying sink’s write()
method. It will return a promise that fulfills with Note that what "success" means is up to the underlying sink; it might indicate simply that the chunk has been accepted, and not necessarily that it is safely saved to its ultimate destination.
- If !
IsWritableStreamDefaultWriter (this ) isfalse , return a promise rejected with aTypeError exception. - If
this .[[ownerWritableStream]] isundefined , return a promise rejected with aTypeError exception. - Return !
WritableStreamDefaultWriterWrite (this , chunk).
4.6. Writable stream writer abstract operations
4.6.1. IsWritableStreamDefaultWriter ( x ) nothrow
- If
Type (x) is not Object, returnfalse . - If x does not have an [[ownerWritableStream]] internal slot, return
false . - Return
true .
4.6.2. WritableStreamDefaultWriterAbort ( writer, reason ) nothrow
- Let stream be writer.[[ownerWritableStream]].
- Assert: stream is not
undefined . - Return !
WritableStreamAbort (stream, reason).
4.6.3. WritableStreamDefaultWriterClose ( writer ) nothrow
- Let stream be writer.[[ownerWritableStream]].
- Assert: stream is not
undefined . - Let state be stream.[[state]].
- If state is
"closed"
or"errored"
, return a promise rejected with aTypeError exception. - Assert: state is
"writable"
or"erroring"
. - Assert: !
WritableStreamCloseQueuedOrInFlight (stream) isfalse . - Let promise be a new promise.
- Set stream.[[closeRequest]] to promise.
- If stream.[[backpressure]] is
true and state is"writable"
, resolve writer.[[readyPromise]] withundefined . - Perform !
WritableStreamDefaultControllerClose (stream.[[writableStreamController]]). - Return promise.
4.6.4. WritableStreamDefaultWriterCloseWithErrorPropagation ( writer ) nothrow
This abstract operation helps implement the error propagation semantics of pipeTo()
.
- Let stream be writer.[[ownerWritableStream]].
- Assert: stream is not
undefined . - Let state be stream.[[state]].
- If !
WritableStreamCloseQueuedOrInFlight (stream) istrue or state is"closed"
, return a promise resolved withundefined . - If state is
"errored"
, return a promise rejected with stream.[[storedError]]. - Assert: state is
"writable"
or"erroring"
. - Return !
WritableStreamDefaultWriterClose (writer).
4.6.5. WritableStreamDefaultWriterEnsureClosedPromiseRejected( writer, error ) nothrow
- If writer.[[closedPromise]].[[PromiseState]] is
"pending"
, reject writer.[[closedPromise]] with error. - Otherwise, set writer.[[closedPromise]] to a promise rejected with error.
- Set writer.[[closedPromise]].[[PromiseIsHandled]] to
true .
4.6.6. WritableStreamDefaultWriterEnsureReadyPromiseRejected( writer, error ) nothrow
- If writer.[[readyPromise]].[[PromiseState]] is
"pending"
, reject writer.[[readyPromise]] with error. - Otherwise, set writer.[[readyPromise]] to a promise rejected with error.
- Set writer.[[readyPromise]].[[PromiseIsHandled]] to
true .
4.6.7. WritableStreamDefaultWriterGetDesiredSize ( writer ) nothrow
- Let stream be writer.[[ownerWritableStream]].
- Let state be stream.[[state]].
- If state is
"errored"
or"erroring"
, returnnull . - If state is
"closed"
, return0 . - Return !
WritableStreamDefaultControllerGetDesiredSize (stream.[[writableStreamController]]).
4.6.8. WritableStreamDefaultWriterRelease ( writer ) nothrow
- Let stream be writer.[[ownerWritableStream]].
- Assert: stream is not
undefined . - Assert: stream.[[writer]] is writer.
- Let releasedError be a new
TypeError . - Perform !
WritableStreamDefaultWriterEnsureReadyPromiseRejected (writer, releasedError). - Perform !
WritableStreamDefaultWriterEnsureClosedPromiseRejected (writer, releasedError). - Set stream.[[writer]] to
undefined . - Set writer.[[ownerWritableStream]] to
undefined .
4.6.9. WritableStreamDefaultWriterWrite ( writer, chunk ) nothrow
- Let stream be writer.[[ownerWritableStream]].
- Assert: stream is not
undefined . - Let controller be stream.[[writableStreamController]].
- Let chunkSize be !
WritableStreamDefaultControllerGetChunkSize (controller, chunk). - If stream is not equal to writer.[[ownerWritableStream]], return a promise rejected with a
TypeError exception. - Let state be stream.[[state]].
- If state is
"errored"
, return a promise rejected with stream.[[storedError]]. - If !
WritableStreamCloseQueuedOrInFlight (stream) istrue or state is"closed"
, return a promise rejected with aTypeError exception indicating that the stream is closing or closed. - If state is
"erroring"
, return a promise rejected with stream.[[storedError]]. - Assert: state is
"writable"
. - Let promise be !
WritableStreamAddWriteRequest (stream). - Perform !
WritableStreamDefaultControllerWrite (controller, chunk, chunkSize). - Return promise.
4.7. Class WritableStreamDefaultController
The WritableStreamDefaultController
class has methods that allow control of a WritableStream
's state. When
constructing a WritableStream
, the underlying sink is given a corresponding WritableStreamDefaultController
instance to manipulate.
4.7.1. Class definition
This section is non-normative.
If one were to write the WritableStreamDefaultController
class in something close to the syntax of [ECMASCRIPT],
it would look like
class WritableStreamDefaultController {
constructor() // always throws
error( e)
}
4.7.2. Internal slots
Instances of WritableStreamDefaultController
are created with the internal slots described in the following table:
Internal Slot | Description (non-normative) |
---|---|
[[abortAlgorithm]] | A promise-returning algorithm, taking one argument (the abort reason), which communicates a requested abort to the underlying sink |
[[closeAlgorithm]] | A promise-returning algorithm which communicates a requested close to the underlying sink |
[[controlledWritableStream]] | The WritableStream instance controlled
|
[[queue]] | A |
[[queueTotalSize]] | The total size of all the chunks stored in [[queue]] (see §6.2 Queue-with-sizes operations) |
[[started]] | A boolean flag indicating whether the underlying sink has finished starting |
[[strategyHWM]] | A number supplied by the creator of the stream as part of the stream’s queuing strategy, indicating the point at which the stream will apply backpressure to its underlying sink |
[[strategySizeAlgorithm]] | An algorithm to calculate the size of enqueued chunks, as part of the stream’s queuing strategy |
[[writeAlgorithm]] | A promise-returning algorithm, taking one argument (the chunk to write), which writes data to the underlying sink |
4.7.3. new WritableStreamDefaultController()
WritableStreamDefaultController
constructor cannot be used directly; WritableStreamDefaultController
instances are created automatically during WritableStream
construction. - Throw a
TypeError exception.
4.7.4. Properties of the WritableStreamDefaultController
prototype
4.7.4.1. error(e)
error
method will error the writable stream, making all future interactions with it fail with the
given error e
.
This method is rarely used, since usually it suffices to return a rejected promise from one of the underlying sink’s methods. However, it can be useful for suddenly shutting down a stream in response to an event outside the normal lifecycle of interactions with the underlying sink.
- If !
IsWritableStreamDefaultController (this ) isfalse , throw aTypeError exception. - Let state be
this .[[controlledWritableStream]].[[state]]. - If state is not
"writable"
, return. - Perform !
WritableStreamDefaultControllerError (this , e).
4.7.5. Writable stream default controller internal methods
The following are additional internal methods implemented by each WritableStreamDefaultController
instance. The
writable stream implementation will call into these.
The reason these are in method form, instead of as abstract operations, is to make it clear that the writable stream implementation is decoupled from the controller implementation, and could in the future be expanded with other controllers, as long as those controllers implemented such internal methods. A similar scenario is seen for readable streams, where there actually are multiple controller types and as such the counterpart internal methods are used polymorphically.
4.7.5.1. [[AbortSteps]]( reason )
- Let result be the result of performing
this .[[abortAlgorithm]], passing reason. - Perform !
WritableStreamDefaultControllerClearAlgorithms (this ). - Return result.
4.7.5.2. [[ErrorSteps]]()
- Perform !
ResetQueue (this ).
4.8. Writable stream default controller abstract operations
4.8.1. IsWritableStreamDefaultController ( x ) nothrow
- If
Type (x) is not Object, returnfalse . - If x does not have an [[controlledWritableStream]] internal slot, return
false . - Return
true .
4.8.2. SetUpWritableStreamDefaultController ( stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm ) throws
- Assert: !
IsWritableStream (stream) istrue . - Assert: stream.[[writableStreamController]] is
undefined . - Set controller.[[controlledWritableStream]] to stream.
- Set stream.[[writableStreamController]] to controller.
- Perform !
ResetQueue (controller). - Set controller.[[started]] to
false . - Set controller.[[strategySizeAlgorithm]] to sizeAlgorithm.
- Set controller.[[strategyHWM]] to highWaterMark.
- Set controller.[[writeAlgorithm]] to writeAlgorithm.
- Set controller.[[closeAlgorithm]] to closeAlgorithm.
- Set controller.[[abortAlgorithm]] to abortAlgorithm.
- Let backpressure be !
WritableStreamDefaultControllerGetBackpressure (controller). - Perform !
WritableStreamUpdateBackpressure (stream, backpressure). - Let startResult be the result of performing startAlgorithm. (This may throw an exception.)
- Let startPromise be a promise resolved with startResult.
- Upon fulfillment of startPromise,
- Assert: stream.[[state]] is
"writable"
or"erroring"
. - Set controller.[[started]] to
true . - Perform !
WritableStreamDefaultControllerAdvanceQueueIfNeeded (controller).
- Assert: stream.[[state]] is
- Upon rejection of startPromise with reason r,
- Assert: stream.[[state]] is
"writable"
or"erroring"
. - Set controller.[[started]] to
true . - Perform !
WritableStreamDealWithRejection (stream, r).
- Assert: stream.[[state]] is
4.8.3. SetUpWritableStreamDefaultControllerFromUnderlyingSink ( stream, underlyingSink, highWaterMark, sizeAlgorithm ) throws
- Assert: underlyingSink is not
undefined . - Let controller be
ObjectCreate (the original value ofWritableStreamDefaultController
'sprototype
property). - Let startAlgorithm be the following steps:
- Return ?
InvokeOrNoop (underlyingSink,"start"
, « controller »).
- Return ?
- Let writeAlgorithm be ?
CreateAlgorithmFromUnderlyingMethod (underlyingSink,"write"
,1 , « controller »). - Let closeAlgorithm be ?
CreateAlgorithmFromUnderlyingMethod (underlyingSink,"close"
,0 , « »). - Let abortAlgorithm be ?
CreateAlgorithmFromUnderlyingMethod (underlyingSink,"abort"
,1 , « »). - Perform ?
SetUpWritableStreamDefaultController (stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm).
4.8.4. WritableStreamDefaultControllerClearAlgorithms ( controller ) nothrow
This abstract operation is called once the stream is closed or errored and the algorithms will not be executed any more.
By removing the algorithm references it permits the underlying sink object to be garbage collected even if the WritableStream
itself is still referenced.
The results of this algorithm are not currently observable, but could become so if JavaScript eventually adds weak references. But even without that factor, implementations will likely want to include similar steps.
This operation will be performed multiple times in some edge cases. After the first time it will do nothing.
- Set controller.[[writeAlgorithm]] to
undefined . - Set controller.[[closeAlgorithm]] to
undefined . - Set controller.[[abortAlgorithm]] to
undefined . - Set controller.[[strategySizeAlgorithm]] to
undefined .
4.8.5. WritableStreamDefaultControllerClose ( controller ) nothrow
- Perform !
EnqueueValueWithSize (controller,"close"
,0 ). - Perform !
WritableStreamDefaultControllerAdvanceQueueIfNeeded (controller).
4.8.6. WritableStreamDefaultControllerGetChunkSize ( controller, chunk ) nothrow
- Let returnValue be the result of performing controller.[[strategySizeAlgorithm]], passing in chunk, and interpreting the result as an ECMAScript completion value.
- If returnValue is an
abrupt completion ,- Perform !
WritableStreamDefaultControllerErrorIfNeeded (controller, returnValue.[[Value]]). - Return 1.
- Perform !
- Return returnValue.[[Value]].
4.8.7. WritableStreamDefaultControllerGetDesiredSize ( controller ) nothrow
- Return controller.[[strategyHWM]] − controller.[[queueTotalSize]].
4.8.8. WritableStreamDefaultControllerWrite ( controller, chunk, chunkSize ) nothrow
- Let writeRecord be
Record {[[chunk]]: chunk}. - Let enqueueResult be
EnqueueValueWithSize (controller, writeRecord, chunkSize). - If enqueueResult is an
abrupt completion ,- Perform !
WritableStreamDefaultControllerErrorIfNeeded (controller, enqueueResult.[[Value]]). - Return.
- Perform !
- Let stream be controller.[[controlledWritableStream]].
- If !
WritableStreamCloseQueuedOrInFlight (stream) isfalse and stream.[[state]] is"writable"
,- Let backpressure be !
WritableStreamDefaultControllerGetBackpressure (controller). - Perform !
WritableStreamUpdateBackpressure (stream, backpressure).
- Let backpressure be !
- Perform !
WritableStreamDefaultControllerAdvanceQueueIfNeeded (controller).
4.8.9. WritableStreamDefaultControllerAdvanceQueueIfNeeded ( controller ) nothrow
- Let stream be controller.[[controlledWritableStream]].
- If controller.[[started]] is
false , return. - If stream.[[inFlightWriteRequest]] is not
undefined , return. - Let state be stream.[[state]].
- If state is
"closed"
or"errored"
, return. - If state is
"erroring"
,- Perform !
WritableStreamFinishErroring (stream). - Return.
- Perform !
- If controller.[[queue]] is empty, return.
- Let writeRecord be !
PeekQueueValue (controller). - If writeRecord is
"close"
, perform !WritableStreamDefaultControllerProcessClose (controller). - Otherwise, perform !
WritableStreamDefaultControllerProcessWrite (controller, writeRecord.[[chunk]]).
4.8.10. WritableStreamDefaultControllerErrorIfNeeded ( controller, error ) nothrow
- If controller.[[controlledWritableStream]].[[state]] is
"writable"
, perform !WritableStreamDefaultControllerError (controller, error).
4.8.11. WritableStreamDefaultControllerProcessClose ( controller ) nothrow
- Let stream be controller.[[controlledWritableStream]].
- Perform !
WritableStreamMarkCloseRequestInFlight (stream). - Perform !
DequeueValue (controller). - Assert: controller.[[queue]] is empty.
- Let sinkClosePromise be the result of performing controller.[[closeAlgorithm]].
- Perform !
WritableStreamDefaultControllerClearAlgorithms (controller). - Upon fulfillment of sinkClosePromise,
- Perform !
WritableStreamFinishInFlightClose (stream).
- Perform !
- Upon rejection of sinkClosePromise with reason reason,
- Perform !
WritableStreamFinishInFlightCloseWithError (stream, reason).
- Perform !
4.8.12. WritableStreamDefaultControllerProcessWrite ( controller, chunk ) nothrow
- Let stream be controller.[[controlledWritableStream]].
- Perform !
WritableStreamMarkFirstWriteRequestInFlight (stream). - Let sinkWritePromise be the result of performing controller.[[writeAlgorithm]], passing in chunk.
- Upon fulfillment of sinkWritePromise,
- Perform !
WritableStreamFinishInFlightWrite (stream). - Let state be stream.[[state]].
- Assert: state is
"writable"
or"erroring"
. - Perform !
DequeueValue (controller). - If !
WritableStreamCloseQueuedOrInFlight (stream) isfalse and state is"writable"
,- Let backpressure be !
WritableStreamDefaultControllerGetBackpressure (controller). - Perform !
WritableStreamUpdateBackpressure (stream, backpressure).
- Let backpressure be !
- Perform !
WritableStreamDefaultControllerAdvanceQueueIfNeeded (controller).
- Perform !
- Upon rejection of sinkWritePromise with reason,
- If stream.[[state]] is
"writable"
, perform !WritableStreamDefaultControllerClearAlgorithms (controller). - Perform !
WritableStreamFinishInFlightWriteWithError (stream, reason).
- If stream.[[state]] is
4.8.13. WritableStreamDefaultControllerGetBackpressure ( controller ) nothrow
- Let desiredSize be !
WritableStreamDefaultControllerGetDesiredSize (controller). - Return desiredSize ≤
0 .
4.8.14. WritableStreamDefaultControllerError ( controller, error ) nothrow
- Let stream be controller.[[controlledWritableStream]].
- Assert: stream.[[state]] is
"writable"
. - Perform !
WritableStreamDefaultControllerClearAlgorithms (controller). - Perform !
WritableStreamStartErroring (stream, error).
5. Transform streams
5.1. Using transform streams
readableStream
. pipeThrough( transformStream)
. pipeTo( writableStream)
. then(() => console. log( "All data successfully transformed!" ))
. catch ( e => console. error( "Something went wrong!" , e));
readable
and writable
properties of a transform stream
directly to access the usual interfaces of a readable stream and writable stream. In this example we
supply data to the writable side of the stream using its writer interface. The readable side is
then piped to anotherWritableStream
.
const writer = transformStream. writable. getWriter();
writer. write( "input chunk" );
transformStream. readable. pipeTo( anotherWritableStream);
fetch()
API accepts a readable stream request body, but it can be more
convenient to write data for uploading via a writable stream interface. Using an identity transform stream addresses
this:
const { writable, readable } = new TransformStream();
fetch( "..." , { body: readable }). then( response => /* ... */ );
const writer = writable. getWriter();
writer. write( new Uint8Array([ 0x73 , 0x74 , 0x72 , 0x65 , 0x61 , 0x6D , 0x73 , 0x21 ]));
writer. close();
Another use of identity transform streams is to add additional buffering to a pipe. In this example we add
extra buffering between readableStream
and writableStream
.
const writableStrategy = new ByteLengthQueuingStrategy({ highWaterMark: 1024 * 1024 });
readableStream
. pipeThrough( new TransformStream( undefined , writableStrategy))
. pipeTo( writableStream);
5.2. Class TransformStream
5.2.1. Class definition
This section is non-normative.
If one were to write the TransformStream
class in something close to the syntax of [ECMASCRIPT], it would look
like
class TransformStream {
constructor( transformer = {}, writableStrategy = {}, readableStrategy = {})
get readable()
get writable()
}
5.2.2. Internal slots
Instances of TransformStream
are created with the internal slots described in the following table:
Internal Slot | Description (non-normative) |
---|---|
[[backpressure]] | Whether there was backpressure on [[readable]] the last time it was observed |
[[backpressureChangePromise]] | A promise which is fulfilled and replaced every time the value of [[backpressure]] changes |
[[readable]] | The ReadableStream instance controlled by this object
|
[[transformStreamController]] | A TransformStreamDefaultController created with the ability to control [[readable]]
and [[writable]]; also used for the IsTransformStream brand check
|
[[writable]] | The WritableStream instance controlled by this object
|
5.2.3. new TransformStream(transformer = {}, writableStrategy = {}, readableStrategy = {})
transformer
argument represents the transformer, as described in §5.2.4 Transformer API.
The writableStrategy
and readableStrategy
arguments are the queuing strategy objects
for the writable and readable sides respectively. These are used in the construction of the WritableStream
and ReadableStream
objects and can be used to add buffering to a TransformStream
, in order to smooth out
variations in the speed of the transformation, or to increase the amount of buffering in a pipe. If they are
not provided, the default behavior will be the same as a CountQueuingStrategy
, with respective high water
marks of 1 and 0.
- Let writableSizeFunction be ?
GetV (writableStrategy,"size"
). - Let writableHighWaterMark be ?
GetV (writableStrategy,"highWaterMark"
). - Let readableSizeFunction be ?
GetV (readableStrategy,"size"
). - Let readableHighWaterMark be ?
GetV (readableStrategy,"highWaterMark"
). - Let writableType be ?
GetV (transformer,"writableType"
). - If writableType is not
undefined , throw aRangeError exception. - Let writableSizeAlgorithm be ?
MakeSizeAlgorithmFromSizeFunction (writableSizeFunction). - If writableHighWaterMark is
undefined , set writableHighWaterMark to1 . - Set writableHighWaterMark to ?
ValidateAndNormalizeHighWaterMark (writableHighWaterMark). - Let readableType be ?
GetV (transformer,"readableType"
). - If readableType is not
undefined , throw aRangeError exception. - Let readableSizeAlgorithm be ?
MakeSizeAlgorithmFromSizeFunction (readableSizeFunction). - If readableHighWaterMark is
undefined , set readableHighWaterMark to0 . - Set readableHighWaterMark be ?
ValidateAndNormalizeHighWaterMark (readableHighWaterMark). - Let startPromise be a new promise.
- Perform !
InitializeTransformStream (this , startPromise, writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm). - Perform ?
SetUpTransformStreamDefaultControllerFromTransformer (this , transformer). - Let startResult be ?
InvokeOrNoop (transformer,"start"
, «this .[[transformStreamController]] »). - Resolve startPromise with startResult.
5.2.4. Transformer API
This section is non-normative.
The TransformStream()
constructor accepts as its first argument a JavaScript object representing the transformer. Such objects can contain any of the following methods:
start(controller)
-
A function that is called immediately during creation of the
TransformStream
.Typically this is used to enqueue prefix chunks, using
controller.enqueue()
. Those chunks will be read from the readable side but don’t depend on any writes to the writable side.If this initial process is asynchronous, for example because it takes some effort to acquire the prefix chunks, the function can return a promise to signal success or failure; a rejected promise will error the stream. Any thrown exceptions will be re-thrown by the
TransformStream()
constructor. transform(chunk, controller)
-
A function called when a new chunk originally written to the writable side is ready to be transformed. The stream implementation guarantees that this function will be called only after previous transforms have succeeded, and never before
start()
has completed or afterflush()
has been called.This function performs the actual transformation work of the transform stream. It can enqueue the results using
controller.enqueue()
. This permits a single chunk written to the writable side to result in zero or multiple chunks on the readable side, depending on how many timescontroller.enqueue()
is called. §8.9 A transform stream that replaces template tags demonstrates this by sometimes enqueuing zero chunks.If the process of transforming is asynchronous, this function can return a promise to signal success or failure of the transformation. A rejected promise will error both the readable and writable sides of the transform stream.
If no
transform()
is supplied, the identity transform is used, which enqueues chunks unchanged from the writable side to the readable side. flush(controller)
-
A function called after all chunks written to the writable side have been transformed by successfully passing through
transform()
, and the writable side is about to be closed.Typically this is used to enqueue suffix chunks to the readable side, before that too becomes closed. An example can be seen in §8.9 A transform stream that replaces template tags.
If the flushing process is asynchronous, the function can return a promise to signal success or failure; the result will be communicated to the caller of
stream.writable.write()
. Additionally, a rejected promise will error both the readable and writable sides of the stream. Throwing an exception is treated the same as returning a rejected promise.(Note that there is no need to call
controller.terminate()
insideflush()
; the stream is already in the process of successfully closing down, and terminating it would be counterproductive.)
The controller
object passed to start()
, transform()
, and flush()
is an instance of TransformStreamDefaultController
, and has the ability to enqueue chunks to the readable side, or to terminate or error the stream.
5.2.5. Properties of the TransformStream
prototype
5.2.5.1. get readable
readable
getter gives access to the readable side of the transform stream. - If !
IsTransformStream (this ) isfalse , throw aTypeError exception. - Return
this .[[readable]].
5.2.5.2. get writable
writable
getter gives access to the writable side of the transform stream. - If !
IsTransformStream (this ) isfalse , throw aTypeError exception. - Return
this .[[writable]].
5.3. General transform stream abstract operations
5.3.1. CreateTransformStream ( startAlgorithm, transformAlgorithm, flushAlgorithm [, writableHighWaterMark [, writableSizeAlgorithm [, readableHighWaterMark [, readableSizeAlgorithm ] ] ] ] ) throws
This abstract operation is meant to be called from other specifications that wish to create TransformStream
instances. The transformAlgorithm and flushAlgorithm algorithms must return promises; if supplied, writableHighWaterMark and readableHighWaterMark must be non-negative, non-NaN numbers; and if
supplied, writableSizeAlgorithm and readableSizeAlgorithm must be algorithms accepting chunk objects and returning numbers.
- If writableHighWaterMark was not passed, set it to
1 . - If writableSizeAlgorithm was not passed, set it to an algorithm that returns
1 . - If readableHighWaterMark was not passed, set it to
0 . - If readableSizeAlgorithm was not passed, set it to an algorithm that returns
1 . - Assert: !
IsNonNegativeNumber (writableHighWaterMark) istrue . - Assert: !
IsNonNegativeNumber (readableHighWaterMark) istrue . - Let stream be
ObjectCreate (the original value ofTransformStream
'sprototype
property). - Let startPromise be a new promise.
- Perform !
InitializeTransformStream (stream, startPromise, writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm). - Let controller be
ObjectCreate (the original value ofTransformStreamDefaultController
'sprototype
property). - Perform !
SetUpTransformStreamDefaultController (stream, controller, transformAlgorithm, flushAlgorithm). - Let startResult be the result of performing startAlgorithm. (This may throw an exception.)
- Resolve startPromise with startResult.
- Return stream.
5.3.2. InitializeTransformStream ( stream, startPromise, writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm ) nothrow
- Let startAlgorithm be an algorithm that returns startPromise.
- Let writeAlgorithm be the following steps, taking a chunk argument:
- Return !
TransformStreamDefaultSinkWriteAlgorithm (stream, chunk).
- Return !
- Let abortAlgorithm be the following steps, taking a reason argument:
- Return !
TransformStreamDefaultSinkAbortAlgorithm (stream, reason).
- Return !
- Let closeAlgorithm be the following steps:
- Return !
TransformStreamDefaultSinkCloseAlgorithm (stream).
- Return !
- Set stream.[[writable]] to !
CreateWritableStream (startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, writableHighWaterMark, writableSizeAlgorithm). - Let pullAlgorithm be the following steps:
- Return !
TransformStreamDefaultSourcePullAlgorithm (stream).
- Return !
- Let cancelAlgorithm be the following steps, taking a reason argument:
- Perform !
TransformStreamErrorWritableAndUnblockWrite (stream, reason). - Return a promise resolved with
undefined .
- Perform !
- Set stream.[[readable]] to !
CreateReadableStream (startAlgorithm, pullAlgorithm, cancelAlgorithm, readableHighWaterMark, readableSizeAlgorithm). - Set stream.[[backpressure]] and stream.[[backpressureChangePromise]] to
undefined .The [[backpressure]] slot is set to
undefined so that it can be initialized byTransformStreamSetBackpressure . Alternatively, implementations can use a strictly boolean value for [[backpressure]] and change the way it is initialized. This will not be visible to user code so long as the initialization is correctly completed before transformer’sstart()
method is called. - Perform !
TransformStreamSetBackpressure (stream,true ). - Set stream.[[transformStreamController]] to
undefined .
5.3.3. IsTransformStream ( x ) nothrow
- If
Type (x) is not Object, returnfalse . - If x does not have a [[transformStreamController]] internal slot, return
false . - Return
true .
5.3.4. TransformStreamError ( stream, e ) nothrow
- Perform !
ReadableStreamDefaultControllerError (stream.[[readable]].[[readableStreamController]], e). - Perform !
TransformStreamErrorWritableAndUnblockWrite (stream, e).
This operation works correctly when one or both sides are already errored. As a result, calling algorithms do not need to check stream states when responding to an error condition.
5.3.5. TransformStreamErrorWritableAndUnblockWrite ( stream, e ) nothrow
- Perform !
TransformStreamDefaultControllerClearAlgorithms (stream.[[transformStreamController]]). - Perform !
WritableStreamDefaultControllerErrorIfNeeded (stream.[[writable]].[[writableStreamController]], e). - If stream.[[backpressure]] is
true , perform !TransformStreamSetBackpressure (stream,false ).
The
5.3.6. TransformStreamSetBackpressure ( stream, backpressure ) nothrow
- Assert: stream.[[backpressure]] is not backpressure.
- If stream.[[backpressureChangePromise]] is not
undefined , resolve stream.[[backpressureChangePromise]] withundefined . - Set stream.[[backpressureChangePromise]] to a new promise.
- Set stream.[[backpressure]] to backpressure.
5.4. Class TransformStreamDefaultController
The TransformStreamDefaultController
class has methods that allow manipulation of the associated ReadableStream
and WritableStream
. When constructing a TransformStream
, the transformer object is given a corresponding TransformStreamDefaultController
instance to manipulate.
5.4.1. Class definition
This section is non-normative.
If one were to write the TransformStreamDefaultController
class in something close to the syntax of [ECMASCRIPT],
it would look like
class TransformStreamDefaultController {
constructor() // always throws
get desiredSize()
enqueue( chunk)
error( reason)
terminate()
}
5.4.2. Internal slots
Instances of TransformStreamDefaultController
are created with the internal slots described in the following table:
Internal Slot | Description (non-normative) |
---|---|
[[controlledTransformStream]] | The TransformStream instance controlled; also used for the
|
[[flushAlgorithm]] | A promise-returning algorithm which communicates a requested close to the transformer |
[[transformAlgorithm]] | A promise-returning algorithm, taking one argument (the chunk to transform), which requests the transformer perform its transformation |
5.4.3. new TransformStreamDefaultController()
TransformStreamDefaultController
constructor cannot be used directly; TransformStreamDefaultController
instances are created automatically during TransformStream
construction. - Throw a
TypeError exception.
5.4.4. Properties of the TransformStreamDefaultController
prototype
5.4.4.1. get desiredSize
desiredSize
getter returns the desired size
to fill the readable side’s internal queue. It can be negative, if the queue is over-full. - If !
IsTransformStreamDefaultController (this ) isfalse , throw aTypeError exception. - Let readableController be
this .[[controlledTransformStream]].[[readable]].[[readableStreamController]]. - Return !
ReadableStreamDefaultControllerGetDesiredSize (readableController).
5.4.4.2. enqueue(chunk)
- If !
IsTransformStreamDefaultController (this ) isfalse , throw aTypeError exception. - Perform ?
TransformStreamDefaultControllerEnqueue (this , chunk).
5.4.4.3. error(reason)
error
method will error both the readable side and the writable side of the controlled transform stream, making all future interactions fail with the given reason
. Any chunks queued for transformation will be discarded. - If !
IsTransformStreamDefaultController (this ) isfalse , throw aTypeError exception. - Perform !
TransformStreamDefaultControllerError (this , reason).
5.4.4.4. terminate()
terminate
method will close the readable side and error the writable side of the
controlled transform stream. This is useful when the transformer only needs to consume a portion of the chunks written to the writable side. - If !
IsTransformStreamDefaultController (this ) isfalse , throw aTypeError exception. - Perform !
TransformStreamDefaultControllerTerminate (this ).
5.5. Transform stream default controller abstract operations
5.5.1. IsTransformStreamDefaultController ( x ) nothrow
- If
Type (x) is not Object, returnfalse . - If x does not have an [[controlledTransformStream]] internal slot, return
false . - Return
true .
5.5.2. SetUpTransformStreamDefaultController ( stream, controller, transformAlgorithm, flushAlgorithm ) nothrow
- Assert: !
IsTransformStream (stream) istrue . - Assert: stream.[[transformStreamController]] is
undefined . - Set controller.[[controlledTransformStream]] to stream.
- Set stream.[[transformStreamController]] to controller.
- Set controller.[[transformAlgorithm]] to transformAlgorithm.
- Set controller.[[flushAlgorithm]] to flushAlgorithm.
5.5.3. SetUpTransformStreamDefaultControllerFromTransformer ( stream, transformer ) throws
- Assert: transformer is not
undefined . - Let controller be
ObjectCreate (the original value ofTransformStreamDefaultController
'sprototype
property). - Let transformAlgorithm be the following steps, taking a chunk argument:
- Let result be
TransformStreamDefaultControllerEnqueue (controller, chunk). - If result is an
abrupt completion , return a promise rejected with result.[[Value]]. - Otherwise, return a promise resolved with
undefined .
- Let result be
- Let transformMethod be ?
GetV (transformer,"transform"
). - If transformMethod is not
undefined ,- If !
IsCallable (transformMethod) isfalse , throw aTypeError exception. - Set transformAlgorithm to the following steps, taking a chunk argument:
- Return !
PromiseCall (transformMethod, transformer, « chunk, controller »).
- Return !
- If !
- Let flushAlgorithm be ?
CreateAlgorithmFromUnderlyingMethod (transformer,"flush"
,0 , « controller »). - Perform !
SetUpTransformStreamDefaultController (stream, controller, transformAlgorithm, flushAlgorithm).
5.5.4. TransformStreamDefaultControllerClearAlgorithms ( controller ) nothrow
This abstract operation is called once the stream is closed or errored and the algorithms will not be executed any more.
By removing the algorithm references it permits the transformer object to be garbage collected even if the TransformStream
itself is still referenced.
The results of this algorithm are not currently observable, but could become so if JavaScript eventually adds weak references. But even without that factor, implementations will likely want to include similar steps.
- Set controller.[[transformAlgorithm]] to
undefined . - Set controller.[[flushAlgorithm]] to
undefined .
5.5.5. TransformStreamDefaultControllerEnqueue ( controller, chunk ) throws
This abstract operation can be called by other specifications that wish to enqueue chunks in the readable side, in the same way a developer would enqueue chunks using the stream’s associated controller object. Specifications should not do this to streams they did not create.
- Let stream be controller.[[controlledTransformStream]].
- Let readableController be stream.[[readable]].[[readableStreamController]].
- If !
ReadableStreamDefaultControllerCanCloseOrEnqueue (readableController) isfalse , throw aTypeError exception. - Let enqueueResult be
ReadableStreamDefaultControllerEnqueue (readableController, chunk). - If enqueueResult is an
abrupt completion ,- Perform !
TransformStreamErrorWritableAndUnblockWrite (stream, enqueueResult.[[Value]]). - Throw stream.[[readable]].[[storedError]].
- Perform !
- Let backpressure be !
ReadableStreamDefaultControllerHasBackpressure (readableController). - If backpressure is not stream.[[backpressure]],
- Assert: backpressure is
true . - Perform !
TransformStreamSetBackpressure (stream,true ).
- Assert: backpressure is
5.5.6. TransformStreamDefaultControllerError ( controller, e ) nothrow
This abstract operation can be called by other specifications that wish to move a transform stream to an errored state, in the same way a developer would error a stream using its associated controller object. Specifications should not do this to streams they did not create.
- Perform !
TransformStreamError (controller.[[controlledTransformStream]], e).
5.5.7. TransformStreamDefaultControllerPerformTransform ( controller, chunk ) nothrow
- Let transformPromise be the result of performing controller.[[transformAlgorithm]], passing chunk.
- Return the result of transforming transformPromise with a rejection handler that, when called with
argument r, performs the following steps:
- Perform !
TransformStreamError (controller.[[controlledTransformStream]], r). - Throw r.
- Perform !
5.5.8. TransformStreamDefaultControllerTerminate ( controller ) nothrow
This abstract operation can be called by other specifications that wish to terminate a transform stream, in the same way a developer-created stream would be closed by its associated controller object. Specifications should not do this to streams they did not create.
- Let stream be controller.[[controlledTransformStream]].
- Let readableController be stream.[[readable]].[[readableStreamController]].
- If !
ReadableStreamDefaultControllerCanCloseOrEnqueue (readableController) istrue , perform !ReadableStreamDefaultControllerClose (readableController). - Let error be a
TypeError exception indicating that the stream has been terminated. - Perform !
TransformStreamErrorWritableAndUnblockWrite (stream, error).
5.6. Transform stream default sink abstract operations
5.6.1. TransformStreamDefaultSinkWriteAlgorithm ( stream, chunk ) nothrow
- Assert: stream.[[writable]].[[state]] is
"writable"
. - Let controller be stream.[[transformStreamController]].
- If stream.[[backpressure]] is
true ,- Let backpressureChangePromise be stream.[[backpressureChangePromise]].
- Assert: backpressureChangePromise is not
undefined . - Return the result of transforming backpressureChangePromise with a fulfillment handler which performs
the following steps:
- Let writable be stream.[[writable]].
- Let state be writable.[[state]].
- If state is
"erroring"
, throw writable.[[storedError]]. - Assert: state is
"writable"
. - Return !
TransformStreamDefaultControllerPerformTransform (controller, chunk).
- Return !
TransformStreamDefaultControllerPerformTransform (controller, chunk).
5.6.2. TransformStreamDefaultSinkAbortAlgorithm ( stream, reason ) nothrow
- Perform !
TransformStreamError (stream, reason). - Return a promise resolved with
undefined .
5.6.3. TransformStreamDefaultSinkCloseAlgorithm( stream ) nothrow
- Let readable be stream.[[readable]].
- Let controller be stream.[[transformStreamController]].
- Let flushPromise be the result of performing controller.[[flushAlgorithm]].
- Perform !
TransformStreamDefaultControllerClearAlgorithms (controller). - Return the result of transforming flushPromise with:
- A fulfillment handler that performs the following steps:
- If readable.[[state]] is
"errored"
, throw readable.[[storedError]]. - Let readableController be readable.[[readableStreamController]].
- If !
ReadableStreamDefaultControllerCanCloseOrEnqueue (readableController) istrue , perform !ReadableStreamDefaultControllerClose (readableController).
- If readable.[[state]] is
- A rejection handler that, when called with argument r, performs the following steps:
- Perform !
TransformStreamError (stream, r). - Throw readable.[[storedError]].
- Perform !
- A fulfillment handler that performs the following steps:
5.7. Transform stream default source abstract operations
5.7.1. TransformStreamDefaultSourcePullAlgorithm( stream ) nothrow
- Assert: stream.[[backpressure]] is
true . - Assert: stream.[[backpressureChangePromise]] is not
undefined . - Perform !
TransformStreamSetBackpressure (stream,false ). - Return stream.[[backpressureChangePromise]].
6. Other stream APIs and operations
6.1. Queuing strategies
6.1.1. The queuing strategy API
This section is non-normative.
The ReadableStream()
, WritableStream()
, and TransformStream()
constructors all accept at least one argument
representing an appropriate queuing strategy for the stream being created. Such objects contain the following
properties:
size(chunk)
(non-byte streams only)-
A function that computes and returns the size of the given chunk value.
The result is used to determine backpressure, manifesting via the appropriate
desiredSize
property: eitherdefaultController.desiredSize
,byteController.desiredSize
, orwriter.desiredSize
, depending on where the queuing strategy is being used. For readable streams, it also governs when the underlying source’spull()
method is called.This function has to be idempotent and not cause side effects; very strange results can occur otherwise.
For readable byte streams, this function is not used, as chunks are always measured in bytes.
highWaterMark
-
A non-negative number indicating the high water mark of the stream using this queuing strategy.
Any object with these properties can be used when a queuing strategy object is expected. However, we provide two
built-in queuing strategy classes that provide a common vocabulary for certain cases: ByteLengthQueuingStrategy
and CountQueuingStrategy
.
6.1.2. Class ByteLengthQueuingStrategy
A common queuing strategy when dealing with bytes is to wait until the accumulated byteLength
properties of the incoming chunks reaches a specified high-water mark. As such, this is provided as a built-in queuing strategy that can be used when constructing streams.
const stream = new ReadableStream(
{ ... },
new ByteLengthQueuingStrategy({ highWaterMark: 16 * 1024 })
);
In this case, 16 KiB worth of chunks can be enqueued by the readable stream’s underlying source before the readable stream implementation starts sending backpressure signals to the underlying source.
const stream = new WritableStream(
{ ... },
new ByteLengthQueuingStrategy({ highWaterMark: 32 * 1024 })
);
In this case, 32 KiB worth of chunks can be accumulated in the writable stream’s internal queue, waiting for previous writes to the underlying sink to finish, before the writable stream starts sending backpressure signals to any producers.
It is not necessary to use ByteLengthQueuingStrategy
with readable byte streams, as they always measure
chunks in bytes. Attempting to construct a byte stream with a ByteLengthQueuingStrategy
will fail.
6.1.2.1. Class definition
This section is non-normative.
If one were to write the ByteLengthQueuingStrategy
class in something close to the syntax of [ECMASCRIPT], it
would look like
class ByteLengthQueuingStrategy {
constructor({ highWaterMark })
size( chunk)
}
Each ByteLengthQueuingStrategy
instance will additionally have an own data property highWaterMark
set by its constructor.
6.1.2.2. new ByteLengthQueuingStrategy({ highWaterMark })
- Perform !
CreateDataProperty (this ,"highWaterMark"
, highWaterMark).
6.1.2.3. Properties of the ByteLengthQueuingStrategy
prototype
6.1.2.3.1. size(chunk)
size
method returns the given chunk’s byteLength
property. (If the chunk doesn’t have
one, it will return This method is intentionally generic; it does not require that its ByteLengthQueuingStrategy
object.
- Return ?
GetV (chunk,"byteLength"
).
6.1.3. Class CountQueuingStrategy
A common queuing strategy when dealing with streams of generic objects is to simply count the number of chunks that have been accumulated so far, waiting until this number reaches a specified high-water mark. As such, this strategy is also provided out of the box.
const stream = new ReadableStream(
{ ... },
new CountQueuingStrategy({ highWaterMark: 10 })
);
In this case, 10 chunks (of any kind) can be enqueued by the readable stream’s underlying source before the readable stream implementation starts sending backpressure signals to the underlying source.
const stream = new WritableStream(
{ ... },
new CountQueuingStrategy({ highWaterMark: 5 })
);
In this case, five chunks (of any kind) can be accumulated in the writable stream’s internal queue, waiting for previous writes to the underlying sink to finish, before the writable stream starts sending backpressure signals to any producers.
6.1.3.1. Class definition
This section is non-normative.
If one were to write the CountQueuingStrategy
class in something close to the syntax of [ECMASCRIPT], it would
look like
class CountQueuingStrategy {
constructor({ highWaterMark })
size( chunk)
}
Each CountQueuingStrategy
instance will additionally have an own data property highWaterMark
set by its constructor.
6.1.3.2. new CountQueuingStrategy({ highWaterMark })
- Perform !
CreateDataProperty (this ,"highWaterMark"
, highWaterMark).
6.1.3.3. Properties of the CountQueuingStrategy
prototype
6.1.3.3.1. size()
size
method returns one always, so that the total queue size is a count of the number of chunks in
the queue.
This method is intentionally generic; it does not require that its CountQueuingStrategy
object.
- Return
1 .
6.2. Queue-with-sizes operations
The streams in this specification use a "queue-with-sizes" data structure to store queued up values, along with their
determined sizes. Various specification objects contain a queue-with-sizes, represented by the object having two paired
internal slots, always named [[queue]] and [[queueTotalSize]]. [[queue]] is a Number
, i.e. a double-precision floating point number.
The following abstract operations are used when operating on objects that contain queues-with-sizes, in order to ensure that the two internal slots stay synchronized.
Due to the limited precision of floating-point arithmetic, the framework specified here, of keeping a running total in the [[queueTotalSize]] slot, is not equivalent to adding up the size of all chunks in [[queue]]. (However, this only makes a difference when there is a huge (~1015) variance in size between chunks, or when trillions of chunks are enqueued.)
6.2.1. DequeueValue ( container ) nothrow
- Assert: container has [[queue]] and [[queueTotalSize]] internal slots.
- Assert: container.[[queue]] is not empty.
- Let pair be the first element of container.[[queue]].
- Remove pair from container.[[queue]], shifting all other elements downward (so that the second becomes the first, and so on).
- Set container.[[queueTotalSize]] to container.[[queueTotalSize]] − pair.[[size]].
- If container.[[queueTotalSize]] <
0 , set container.[[queueTotalSize]] to0 . (This can occur due to rounding errors.) - Return pair.[[value]].
6.2.2. EnqueueValueWithSize ( container, value, size ) throws
- Assert: container has [[queue]] and [[queueTotalSize]] internal slots.
- Let size be ?
ToNumber (size). - If !
IsFiniteNonNegativeNumber (size) isfalse , throw aRangeError exception. - Append
Record {[[value]]: value, [[size]]: size} as the last element of container.[[queue]]. - Set container.[[queueTotalSize]] to container.[[queueTotalSize]] + size.
6.2.3. PeekQueueValue ( container ) nothrow
- Assert: container has [[queue]] and [[queueTotalSize]] internal slots.
- Assert: container.[[queue]] is not empty.
- Let pair be the first element of container.[[queue]].
- Return pair.[[value]].
6.2.4. ResetQueue ( container ) nothrow
- Assert: container has [[queue]] and [[queueTotalSize]] internal slots.
- Set container.[[queue]] to a new empty
List . - Set container.[[queueTotalSize]] to
0 .
6.3. Miscellaneous operations
A few abstract operations are used in this specification for utility purposes. We define them here.
6.3.1. CreateAlgorithmFromUnderlyingMethod ( underlyingObject, methodName, algoArgCount, extraArgs ) throws
- Assert: underlyingObject is not
undefined . - Assert: !
IsPropertyKey (methodName) istrue . - Assert: algoArgCount is
0 or1 . - Assert: extraArgs is a
List . - Let method be ?
GetV (underlyingObject, methodName). - If method is not
undefined ,- If !
IsCallable (method) isfalse , throw aTypeError exception. - If algoArgCount is
0 , return an algorithm that performs the following steps:- Return !
PromiseCall (method, underlyingObject, extraArgs).
- Return !
- Otherwise, return an algorithm that performs the following steps, taking an arg argument:
- Let fullArgs be a
List consisting of arg followed by the elements of extraArgs in order. - Return !
PromiseCall (method, underlyingObject, fullArgs).
- Let fullArgs be a
- If !
- Return an algorithm which returns a promise resolved with
undefined .
6.3.2. InvokeOrNoop ( O, P, args ) throws
- Assert: O is not
undefined . - Assert: !
IsPropertyKey (P) istrue . - Assert: args is a
List . - Let method be ?
GetV (O, P). - If method is
undefined , returnundefined . - Return ?
Call (method, O, args).
6.3.3. IsFiniteNonNegativeNumber ( v ) nothrow
- If !
IsNonNegativeNumber (v) isfalse , returnfalse . - If v is
+∞ , returnfalse . - Return
true .
6.3.4. IsNonNegativeNumber ( v ) nothrow
- If
Type (v) is not Number, returnfalse . - If v is
NaN , returnfalse . - If v <
0 , returnfalse . - Return
true .
6.3.5. PromiseCall ( F, V, args ) nothrow
- Assert: !
IsCallable (F) istrue . - Assert: V is not
undefined . - Assert: args is a
List . - Let returnValue be
Call (F, V, args). - If returnValue is an
abrupt completion , return a promise rejected with returnValue.[[Value]]. - Otherwise, return a promise resolved with returnValue.[[Value]].
6.3.6. TransferArrayBuffer ( O ) nothrow
- Assert:
Type (O) is Object. - Assert: O has an [[ArrayBufferData]] internal slot.
- Assert: !
IsDetachedBuffer (O) isfalse . - Let arrayBufferData be O.[[ArrayBufferData]].
- Let arrayBufferByteLength be O.[[ArrayBufferByteLength]].
- Perform !
DetachArrayBuffer (O). - Return a new
ArrayBuffer
object (created inthe current Realm Record ) whose [[ArrayBufferData]] internal slot value is arrayBufferData and whose [[ArrayBufferByteLength]] internal slot value is arrayBufferByteLength.
6.3.7. ValidateAndNormalizeHighWaterMark ( highWaterMark ) throws
- Set highWaterMark to ?
ToNumber (highWaterMark). - If highWaterMark is
NaN or highWaterMark <0 , throw aRangeError exception.+∞ is explicitly allowed as a valid high water mark. It causes backpressure to never be applied. - Return highWaterMark.
6.3.8. MakeSizeAlgorithmFromSizeFunction ( size ) throws
- If size is
undefined , return an algorithm that returns1 . - If !
IsCallable (size) isfalse , throw aTypeError exception. - Return an algorithm that performs the following steps, taking a chunk argument:
- Return ?
Call (size,undefined , « chunk »).
- Return ?
7. Global properties
The following constructors must be exposed on the
The attributes of these properties must be { [[Writable]]:
ReadableStreamDefaultReader
, ReadableStreamBYOBReader
, ReadableStreamDefaultController
, ReadableByteStreamController
, WritableStreamDefaultWriter
, WritableStreamDefaultController
, and TransformStreamDefaultController
classes are specifically not exposed, as they are not independently useful. 8. Examples of creating streams
This section, and all its subsections, are non-normative.
The previous examples throughout the standard have focused on how to use streams. Here we show how to create a stream,
using the ReadableStream
or WritableStream
constructors.
8.1. A readable stream with an underlying push source (no backpressure support)
The following function creates readable streams that wrap WebSocket
instances [HTML], which are push sources that do not support backpressure signals. It illustrates how, when adapting a push source, usually most of the work
happens in the start()
function.
function makeReadableWebSocketStream( url, protocols) {
const ws = new WebSocket( url, protocols);
ws. binaryType = "arraybuffer" ;
return new ReadableStream({
start( controller) {
ws. onmessage = event => controller. enqueue( event. data);
ws. onclose = () => controller. close();
ws. onerror = () => controller. error( new Error( "The WebSocket errored!" ));
},
cancel() {
ws. close();
}
});
}
We can then use this function to create readable streams for a web socket, and pipe that stream to an arbitrary writable stream:
const webSocketStream = makeReadableWebSocketStream( "wss://example.com:443/" , "protocol" );
webSocketStream. pipeTo( writableStream)
. then(() => console. log( "All data successfully written!" ))
. catch ( e => console. error( "Something went wrong!" , e));
However, often when people talk about "adding streams support to web sockets", they are hoping instead for a new
capability to send an individual web socket message in a streaming fashion, so that e.g. a file could be transferred
in a single message without holding all of its contents in memory on the client side. To accomplish this goal, we’d
instead want to allow individual web socket messages to themselves be ReadableStream
instances. That isn’t what we
show in the above example.
For more background, see this discussion.
8.2. A readable stream with an underlying push source and backpressure support
The following function returns readable streams that wrap "backpressure sockets," which are hypothetical objects
that have the same API as web sockets, but also provide the ability to pause and resume the flow of data with their readStop
and readStart
methods. In doing so, this example shows how to apply backpressure to underlying sources that support it.
function makeReadableBackpressureSocketStream( host, port) {
const socket = createBackpressureSocket( host, port);
return new ReadableStream({
start( controller) {
socket. ondata = event => {
controller. enqueue( event. data);
if ( controller. desiredSize <= 0 ) {
// The internal queue is full, so propagate
// the backpressure signal to the underlying source.
socket. readStop();
}
};
socket. onend = () => controller. close();
socket. onerror = () => controller. error( new Error( "The socket errored!" ));
},
pull() {
// This is called if the internal queue has been emptied, but the
// stream’s consumer still wants more data. In that case, restart
// the flow of data if we have previously paused it.
socket. readStart();
},
cancel() {
socket. close();
}
});
}
We can then use this function to create readable streams for such "backpressure sockets" in the same way we do for web sockets. This time, however, when we pipe to a destination that cannot accept data as fast as the socket is producing it, or if we leave the stream alone without reading from it for some time, a backpressure signal will be sent to the socket.
8.3. A readable byte stream with an underlying push source (no backpressure support)
The following function returns readable byte streams that wraps a hypothetical UDP socket API, including a
promise-returning select2()
method that is meant to be evocative of the POSIX select(2) system call.
Since the UDP protocol does not have any built-in backpressure support, the backpressure signal given by desiredSize
is ignored, and the stream ensures that when data is available from the
socket but not yet requested by the developer, it is enqueued in the stream’s internal queue, to avoid overflow
of the kernel-space queue and a consequent loss of data.
This has some interesting consequences for how consumers interact with the stream. If the consumer does not read data as fast as the socket produces it, the chunks will remain in the stream’s internal queue indefinitely. In this case, using a BYOB reader will cause an extra copy, to move the data from the stream’s internal queue to the developer-supplied buffer. However, if the consumer consumes the data quickly enough, a BYOB reader will allow zero-copy reading directly into developer-supplied buffers.
(You can imagine a more complex version of this example which uses desiredSize
to
inform an out-of-band backpressure signaling mechanism, for example by sending a message down the socket to adjust the
rate of data being sent. That is left as an exercise for the reader.)
const DEFAULT_CHUNK_SIZE = 65536 ;
function makeUDPSocketStream( host, port) {
const socket = createUDPSocket( host, port);
return new ReadableStream({
type: "bytes" ,
start( controller) {
readRepeatedly(). catch ( e => controller. error( e));
function readRepeatedly() {
return socket. select2(). then(() => {
// Since the socket can become readable even when there’s
// no pending BYOB requests, we need to handle both cases.
let bytesRead;
if ( controller. byobRequest) {
const v = controller. byobRequest. view;
bytesRead = socket. readInto( v. buffer, v. byteOffset, v. byteLength);
controller. byobRequest. respond( bytesRead);
} else {
const buffer = new ArrayBuffer( DEFAULT_CHUNK_SIZE);
bytesRead = socket. readInto( buffer, 0 , DEFAULT_CHUNK_SIZE);
controller. enqueue( new Uint8Array( buffer, 0 , bytesRead));
}
if ( bytesRead === 0 ) {
controller. close();
return ;
}
return readRepeatedly();
});
}
},
cancel() {
socket. close();
}
});
}
ReadableStream
instances returned from this function can now vend BYOB readers, with all of the
aforementioned benefits and caveats.
8.4. A readable stream with an underlying pull source
The following function returns readable streams that wrap portions of the Node.js file system API (which themselves map fairly directly to C’s fopen
, fread
, and fclose
trio). Files are a typical example of pull
sources. Note how in contrast to the examples with push sources, most of the work here happens on-demand in the pull()
function, and not at startup time in the start()
function.
const fs = require( "pr/fs" ); // https://github.com/jden/pr
const CHUNK_SIZE = 1024 ;
function makeReadableFileStream( filename) {
let fd;
let position = 0 ;
return new ReadableStream({
start() {
return fs. open( filename, "r" ). then( result => {
fd = result;
});
},
pull( controller) {
const buffer = new ArrayBuffer( CHUNK_SIZE);
return fs. read( fd, buffer, 0 , CHUNK_SIZE, position). then( bytesRead => {
if ( bytesRead === 0 ) {
return fs. close( fd). then(() => controller. close());
} else {
position += bytesRead;
controller. enqueue( new Uint8Array( buffer, 0 , bytesRead));
}
});
},
cancel() {
return fs. close( fd);
}
});
}
We can then create and use readable streams for files just as we could before for sockets.
8.5. A readable byte stream with an underlying pull source
The following function returns readable byte streams that allow efficient zero-copy reading of files, again using the Node.js file system API. Instead of using a predetermined chunk size of 1024, it attempts to fill the developer-supplied buffer, allowing full control.
const fs = require( "pr/fs" ); // https://github.com/jden/pr
const DEFAULT_CHUNK_SIZE = 1024 ;
function makeReadableByteFileStream( filename) {
let fd;
let position = 0 ;
return new ReadableStream({
type: "bytes" ,
start() {
return fs. open( filename, "r" ). then( result => {
fd = result;
});
},
pull( controller) {
// Even when the consumer is using the default reader, the auto-allocation
// feature allocates a buffer and passes it to us via byobRequest.
const v = controller. byobRequest. view;
return fs. read( fd, v. buffer, v. byteOffset, v. byteLength, position). then( bytesRead => {
if ( bytesRead === 0 ) {
return fs. close( fd). then(() => controller. close());
} else {
position += bytesRead;
controller. byobRequest. respond( bytesRead);
}
});
},
cancel() {
return fs. close( fd);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE
});
}
With this in hand, we can create and use BYOB readers for the returned ReadableStream
. But we can
also create default readers, using them in the same simple and generic manner as usual. The adaptation between
the low-level byte tracking of the underlying byte source shown here, and the higher-level chunk-based
consumption of a default reader, is all taken care of automatically by the streams implementation. The
auto-allocation feature, via the autoAllocateChunkSize
option, even allows
us to write less code, compared to the manual branching in §8.3 A readable byte stream with an underlying push source (no backpressure support).
8.6. A writable stream with no backpressure or success signals
The following function returns a writable stream that wraps a WebSocket
[HTML]. Web sockets do not provide
any way to tell when a given chunk of data has been successfully sent (without awkward polling of bufferedAmount
, which we leave as an exercise to the reader). As such, this writable stream has no ability
to communicate accurate backpressure signals or write success/failure to its producers. That is, the
promises returned by its writer’s write()
method and ready
getter will always fulfill immediately.
function makeWritableWebSocketStream( url, protocols) {
const ws = new WebSocket( url, protocols);
return new WritableStream({
start( controller) {
ws. onerror = () => {
controller. error( new Error( "The WebSocket errored!" ));
ws. onclose = null ;
};
ws. onclose = () => controller. error( new Error( "The server closed the connection unexpectedly!" ));
return new Promise( resolve => ws. onopen = resolve);
},
write( chunk) {
ws. send( chunk);
// Return immediately, since the web socket gives us no easy way to tell
// when the write completes.
},
close() {
return closeWS( 1000 );
},
abort( reason) {
return closeWS( 4000 , reason && reason. message);
},
});
function closeWS( code, reasonString) {
return new Promise(( resolve, reject) => {
ws. onclose = e => {
if ( e. wasClean) {
resolve();
} else {
reject( new Error( "The connection was not closed cleanly" ));
}
};
ws. close( code, reasonString);
});
}
}
We can then use this function to create writable streams for a web socket, and pipe an arbitrary readable stream to it:
const webSocketStream = makeWritableWebSocketStream( "wss://example.com:443/" , "protocol" );
readableStream. pipeTo( webSocketStream)
. then(() => console. log( "All data successfully written!" ))
. catch ( e => console. error( "Something went wrong!" , e));
See the earlier note about this style of wrapping web sockets into streams.
8.7. A writable stream with backpressure and success signals
The following function returns writable streams that wrap portions of the Node.js file system API (which themselves map fairly directly to C’s fopen
, fwrite
, and fclose
trio). Since the API we are wrapping provides a way to
tell when a given write succeeds, this stream will be able to communicate backpressure signals as well as whether
an individual write succeeded or failed.
const fs = require( "pr/fs" ); // https://github.com/jden/pr
function makeWritableFileStream( filename) {
let fd;
return new WritableStream({
start() {
return fs. open( filename, "w" ). then( result => {
fd = result;
});
},
write( chunk) {
return fs. write( fd, chunk, 0 , chunk. length);
},
close() {
return fs. close( fd);
},
abort() {
return fs. close( fd);
}
});
}
We can then use this function to create a writable stream for a file, and write individual chunks of data to it:
const fileStream = makeWritableFileStream( "/example/path/on/fs.txt" );
const writer = fileStream. getWriter();
writer. write( "To stream, or not to stream\n" );
writer. write( "That is the question\n" );
writer. close()
. then(() => console. log( "chunks written and stream closed successfully!" ))
. catch ( e => console. error( e));
Note that if a particular call to fs.write
takes a longer time, the returned promise will fulfill later.
In the meantime, additional writes can be queued up, which are stored in the stream’s internal queue. The accumulation
of chunks in this queue can change the stream to return a pending promise from the ready
getter, which is a signal to producers that they would benefit from backing off and stopping writing, if
possible.
The way in which the writable stream queues up writes is especially important in this case, since as stated in the documentation for fs.write
, "it is unsafe to use fs.write
multiple times on the same file without waiting
for the [promise]." But we don’t have to worry about that when writing the makeWritableFileStream
function, since the stream implementation guarantees that the underlying sink’s write()
method will not be called until any promises returned by previous calls have fulfilled!
8.8. A { readable, writable } stream pair wrapping the same underlying resource
The following function returns an object of the form { readable, writable }
, with the readable
property containing a readable stream and the writable
property containing a
writable stream, where both streams wrap the same underlying web socket resource. In essence, this combines §8.1 A readable stream with an underlying push source (no backpressure support) and §8.6 A writable stream with no backpressure or success signals.
While doing so, it illustrates how you can use JavaScript classes to create reusable underlying sink and underlying source abstractions.
function streamifyWebSocket( url, protocol) {
const ws = new WebSocket( url, protocols);
ws. binaryType = "arraybuffer" ;
return {
readable: new ReadableStream( new WebSocketSource( ws)),
writable: new WritableStream( new WebSocketSink( ws))
};
}
class WebSocketSource {
constructor( ws) {
this . _ws = ws;
}
start( controller) {
this . _ws. onmessage = event => controller. enqueue( event. data);
this . _ws. onclose = () => controller. close();
this . _ws. addEventListener( "error" , () => {
controller. error( new Error( "The WebSocket errored!" ));
});
}
cancel() {
this . _ws. close();
}
}
class WebSocketSink {
constructor( ws) {
this . _ws = ws;
}
start( controller) {
this . _ws. onclose = () => controller. error( new Error( "The server closed the connection unexpectedly!" ));
this . _ws. addEventListener( "error" , () => {
controller. error( new Error( "The WebSocket errored!" ));
this . _ws. onclose = null ;
});
return new Promise( resolve => this . _ws. onopen = resolve);
}
write( chunk) {
this . _ws. send( chunk);
}
close() {
return this . _closeWS( 1000 );
}
abort( reason) {
return this . _closeWS( 4000 , reason && reason. message);
}
_closeWS( code, reasonString) {
return new Promise(( resolve, reject) => {
this . _ws. onclose = e => {
if ( e. wasClean) {
resolve();
} else {
reject( new Error( "The connection was not closed cleanly" ));
}
};
this . _ws. close( code, reasonString);
});
}
}
We can then use the objects created by this function to communicate with a remote web socket, using the standard stream APIs:
const streamyWS = streamifyWebSocket( "wss://example.com:443/" , "protocol" );
const writer = streamyWS. writable. getWriter();
const reader = streamyWS. readable. getReader();
writer. write( "Hello" );
writer. write( "web socket!" );
reader. read(). then(({ value, done }) => {
console. log( "The web socket says: " , value);
});
Note how in this setup canceling the readable
side will implicitly close the writable
side,
and similarly, closing or aborting the writable
side will implicitly close the readable
side.
See the earlier note about this style of wrapping web sockets into streams.
8.9. A transform stream that replaces template tags
It’s often useful to substitute tags with variables on a stream of data, where the parts that need to be replaced are
small compared to the overall data size. This example presents a simple way to do that. It maps strings to strings,
transforming a template like "Time: {{time}} Message: {{message}}"
to "Time: 15:36 Message:
hello"
assuming that { time: "15:36", message: "hello" }
was passed in the substitutions
parameter to LipFuzzTransformer
.
This example also demonstrates one way to deal with a situation where a chunk contains partial data that cannot be
transformed until more data is received. In this case, a partial template tag will be accumulated in the partialChunk
instance variable until either the end of the tag is found or the end of the stream is
reached.
class LipFuzzTransformer {
constructor( substitutions) {
this . substitutions = substitutions;
this . partialChunk = "" ;
this . lastIndex = undefined ;
}
transform( chunk, controller) {
chunk = this . partialChunk + chunk;
this . partialChunk = "" ;
// lastIndex is the index of the first character after the last substitution.
this . lastIndex = 0 ;
chunk = chunk. replace( /\{\{([a-zA-Z0-9_-]+)\}\}/g , this . replaceTag. bind( this ));
// Regular expression for an incomplete template at the end of a string.
const partialAtEndRegexp = /\{(\{([a-zA-Z0-9_-]+(\})?)?)?$/g ;
// Avoid looking at any characters that have already been substituted.
partialAtEndRegexp. lastIndex = this . lastIndex;
this . lastIndex = undefined ;
const match = partialAtEndRegexp. exec( chunk);
if ( match) {
this . partialChunk = chunk. substring( match. index);
chunk = chunk. substring( 0 , match. index);
}
controller. enqueue( chunk);
}
flush( controller) {
if ( this . partialChunk. length > 0 ) {
controller. enqueue( this . partialChunk);
}
}
replaceTag( match, p1, offset) {
let replacement = this . substitutions[ p1];
if ( replacement === undefined ) {
replacement = "" ;
}
this . lastIndex = offset + replacement. length;
return replacement;
}
}
In this case we define the transformer to be passed to the TransformStream
constructor as a class. This is
useful when there is instance data to track.
The class would be used in code like:
const data = { userName, displayName, icon, date };
const ts = new TransformStream( new LipFuzzTransformer( data));
fetchEvent. respondWith(
fetch( fetchEvent. request. url). then( response => {
const transformedBody = response. body
// Decode the binary-encoded response to string
. pipeThrough( new TextDecoderStream())
// Apply the LipFuzzTransformer
. pipeThrough( ts)
// Encode the transformed string
. pipeThrough( new TextEncoderStream());
return new Response( transformedBody);
})
);
LipFuzzTransformer
performs unescaped text substitutions. In real
applications, a template system that performs context-aware escaping is good practice for security and robustness.8.10. A transform stream created from a sync mapper function
The following function allows creating new TransformStream
instances from synchronous "mapper" functions, of the
type you would normally pass to Array.prototype.map
. It demonstrates that the API is concise
even for trivial transforms.
function mapperTransformStream( mapperFunction) {
return new TransformStream({
transform( chunk, controller) {
controller. enqueue( mapperFunction( chunk));
}
});
}
This function can then be used to create a TransformStream
that uppercases all its inputs:
const ts = mapperTransformStream( chunk => chunk. toUpperCase());
const writer = ts. writable. getWriter();
const reader = ts. readable. getReader();
writer. write( "No need to shout" );
// Logs "NO NEED TO SHOUT":
reader. read(). then(({ value }) => console. log( value));
Although a synchronous transform never causes backpressure itself, it will only transform chunks as long as there is no backpressure, so resources will not be wasted.
Exceptions error the stream in a natural way:
const ts = mapperTransformStream( chunk => JSON. parse( chunk));
const writer = ts. writable. getWriter();
const reader = ts. readable. getReader();
writer. write( "[1, " );
// Logs a SyntaxError, twice:
reader. read(). catch ( e => console. error( e));
writer. write( "{}" ). catch ( e => console. error( e));
Conventions
This specification depends on the Infra Standard. [INFRA]
This specification uses algorithm conventions very similar to those of [ECMASCRIPT], whose rules should be used to
interpret it (apart from the exceptions enumerated below). In particular, the objects specified here should be treated
as built-in objects. For example,
their name
and length
properties are derived as described by that specification, as are the
default property descriptor values and the treatment of missing,
We also depart from the [ECMASCRIPT] conventions in the following ways, mostly for brevity. It is hoped (and vaguely planned) that the conventions of ECMAScript itself will evolve in these ways.
- We prefix section headings with
new
to indicate they are defining constructors; when doing so, we assume that NewTarget will be checked before the algorithm starts. - We use the default argument notation
= {}
in a couple of cases, meaning that before the algorithm starts,undefined (including the implicitundefined when no argument is provided) is instead treated as a new object created as if byObjectCreate (%ObjectPrototype% ). (This object may then be destructured, if combined with the below destructuring convention.) - We use destructuring notation in function and method declarations, and assume that DestructuringAssignmentEvaluation was performed appropriately before the algorithm starts.
- We use "
this " instead of "this value". - We use the shorthand phrases from the [PROMISES-GUIDE] to operate on promises at a higher level than the ECMAScript spec does.
It’s also worth noting that, as in [ECMASCRIPT], all numbers are represented as double-precision floating point values, and all arithmetic operations performed on them must be done in the standard way for such values.
Acknowledgments
The editors would like to thank Anne van Kesteren, AnthumChris, Arthur Langereis, Ben Kelly, Bert Belder, Brian di Palma, Calvin Metcalf, Dominic Tarr, Ed Hager, Forbes Lindesay, Forrest Norvell, Gary Blackwood, Gorgi Kosev, Gus Caplan, 贺师俊 (hax), Isaac Schlueter, isonmad, Jake Archibald, Jake Verbaten, Janessa Det, Jens Nockert, Lennart Grahl, Mangala Sadhu Sangeet Singh Khalsa, Marcos Caceres, Marvin Hagemeister, Mattias Buelens, Michael Mior, Mihai Potra, Romain Bellessort, Simon Menke, Stephen Sugden, Surma, Tab Atkins, Tanguy Krotoff, Thorsten Lorenz, Till Schneidereit, Tim Caswell, Trevor Norris, tzik, Will Chan, Youenn Fablet, 平野裕 (Yutaka Hirano), and Xabier Rodríguez for their contributions to this specification. Community involvement in this specification has been above and beyond; we couldn’t have done it without you.
This standard is written by Adam Rice (Google, ricea@chromium.org), Domenic Denicola (Google, d@domenic.me), and 吉野剛史 (Takeshi Yoshino, Google, tyoshino@chromium.org).
Copyright © 2019 WHATWG (Apple, Google, Mozilla, Microsoft). This work is licensed under a Creative Commons Attribution 4.0 International License.