1. Introduction
This section is non-normative.
Large swathes of the web platform are built on streaming data: that is, data that is created, processed, and consumed in an incremental fashion, without ever reading all of it into memory. The Streams Standard provides a common set of APIs for creating and interfacing with such streaming data, embodied in readable streams , writable streams , and transform streams .
These APIs have been designed to efficiently map to low-level I/O primitives, including specializations for byte streams where appropriate. They allow easy composition of multiple streams into pipe chains , or can be used directly via readers and writers . Finally, they are designed to automatically provide backpressure and queuing.
This
standard
provides
the
base
stream
primitives
which
other
parts
of
the
web
platform
can
use
to
expose
their
streaming
data.
For
example,
[FETCH]
exposes
Response
bodies
as
ReadableStream
instances.
More
generally,
the
platform
is
full
of
streaming
abstractions
waiting
to
be
expressed
as
streams:
multimedia
streams,
file
streams,
inter-global
communication,
and
more
benefit
from
being
able
to
process
data
incrementally
instead
of
buffering
it
all
into
memory
and
processing
it
in
one
go.
By
providing
the
foundation
for
these
streams
to
be
exposed
to
developers,
the
Streams
Standard
enables
use
cases
like:
-
Video effects: piping a readable video stream through a transform stream that applies effects in real time.
-
Decompression: piping a file stream through a transform stream that selectively decompresses files from a .tgz archive, turning them into
img
elements as the user scrolls through an image gallery. -
Image decoding: piping an HTTP response stream through a transform stream that decodes bytes into bitmap data, and then through another transform that translates bitmaps into PNGs. If installed inside the
fetch
hook of a service worker, this would allow developers to transparently polyfill new image formats. [SERVICE-WORKERS]
Web developers can also use the APIs described here to create their own streams, with the same APIs as those provided by the platform. Other developers can then transparently compose platform-provided streams with those supplied by libraries. In this way, the APIs described here provide unifying abstraction for all streams, encouraging an ecosystem to grow around these shared and composable interfaces.
2. Model
A
chunk
is
a
single
piece
of
data
that
is
written
to
or
read
from
a
stream.
It
can
be
of
any
type;
streams
can
even
contain
chunks
of
different
types.
A
chunk
will
often
not
be
the
most
atomic
unit
of
data
for
a
given
stream;
for
example
a
byte
stream
might
contain
chunks
consisting
of
16
KiB
Uint8Array
s,
instead
of
single
bytes.
2.1. Readable streams
A
readable
stream
represents
a
source
of
data,
from
which
you
can
read.
In
other
words,
data
comes
out
of
a
readable
stream.
Concretely,
a
readable
stream
is
an
instance
of
the
ReadableStream
class.
Although a readable stream can be created with arbitrary behavior, most readable streams wrap a lower-level I/O source, called the underlying source . There are two types of underlying source: push sources and pull sources.
Push sources push data at you, whether or not you are listening for it. They may also provide a mechanism for pausing and resuming the flow of data. An example push source is a TCP socket, where data is constantly being pushed from the OS level, at a rate that can be controlled by changing the TCP window size.
Pull sources require you to request data from them. The data may be available synchronously, e.g. if it is held by the operating system’s in-memory buffers, or asynchronously, e.g. if it has to be read from disk. An example pull source is a file handle, where you seek to specific locations and read specific amounts.
Readable
streams
are
designed
to
wrap
both
types
of
sources
behind
a
single,
unified
interface.
For
web
developer–created
streams,
the
implementation
details
of
a
source
are
provided
by
an
object
with
certain
methods
and
properties
that
is
passed
to
the
ReadableStream()
constructor.
Chunks
are
enqueued
into
the
stream
by
the
stream’s
underlying
source
.
They
can
then
be
read
one
at
a
time
via
the
stream’s
public
interface,
in
particular
by
using
a
readable
stream
reader
acquired
using
the
stream’s
getReader()
method.
Code that reads from a readable stream using its public interface is known as a consumer .
Consumers
also
have
the
ability
to
cancel
a
readable
stream,
using
its
cancel()
method.
This
indicates
that
the
consumer
has
lost
interest
in
the
stream,
and
will
immediately
close
the
stream,
throw
away
any
queued
chunks
,
and
execute
any
cancellation
mechanism
of
the
underlying
source
.
Consumers
can
also
tee
a
readable
stream
using
its
tee()
method.
This
will
lock
the
stream,
making
it
no
longer
directly
usable;
however,
it
will
create
two
new
streams,
called
branches
,
which
can
be
consumed
independently.
For
streams
representing
bytes,
an
extended
version
of
the
readable
stream
is
provided
to
handle
bytes
efficiently,
in
particular
by
minimizing
copies.
The
underlying
source
for
such
a
readable
stream
is
called
an
underlying
byte
source
.
A
readable
stream
whose
underlying
source
is
an
underlying
byte
source
is
sometimes
called
a
readable
byte
stream
.
Consumers
of
a
readable
byte
stream
can
acquire
a
BYOB
reader
using
the
stream’s
getReader()
method.
2.2. Writable streams
A
writable
stream
represents
a
destination
for
data,
into
which
you
can
write.
In
other
words,
data
goes
in
to
a
writable
stream.
Concretely,
a
writable
stream
is
an
instance
of
the
WritableStream
class.
Analogously to readable streams, most writable streams wrap a lower-level I/O sink, called the underlying sink . Writable streams work to abstract away some of the complexity of the underlying sink, by queuing subsequent writes and only delivering them to the underlying sink one by one.
Chunks
are
written
to
the
stream
via
its
public
interface,
and
are
passed
one
at
a
time
to
the
stream’s
underlying
sink
.
For
web
developer-created
streams,
the
implementation
details
of
the
sink
are
provided
by
an
object
with
certain
methods
that
is
passed
to
the
WritableStream()
constructor.
Code that writes into a writable stream using its public interface is known as a producer .
Producers
also
have
the
ability
to
abort
a
writable
stream,
using
its
abort()
method.
This
indicates
that
the
producer
believes
something
has
gone
wrong,
and
that
future
writes
should
be
discontinued.
It
puts
the
stream
in
an
errored
state,
even
without
a
signal
from
the
underlying
sink
,
and
it
discards
all
writes
in
the
stream’s
internal
queue
.
2.3. Transform streams
A transform stream consists of a pair of streams: a writable stream , known as its writable side , and a readable stream , known as its readable side . In a manner specific to the transform stream in question, writes to the writable side result in new data being made available for reading from the readable side.
Concretely,
any
object
with
a
writable
property
and
a
readable
property
can
serve
as
a
transform
stream.
However,
the
standard
TransformStream
class
makes
it
much
easier
to
create
such
a
pair
that
is
properly
entangled.
It
wraps
a
transformer
,
which
defines
algorithms
for
the
specific
transformation
to
be
performed.
For
web
developer–created
streams,
the
implementation
details
of
a
transformer
are
provided
by
an
object
with
certain
methods
and
properties
that
is
passed
to
the
TransformStream()
constructor.
Other
specifications
might
use
the
GenericTransformStream
mixin
to
create
classes
with
the
same
writable
/
readable
property
pair
but
other
custom
APIs
layered
on
top.
An
identity
transform
stream
is
a
type
of
transform
stream
which
forwards
all
chunks
written
to
its
writable
side
to
its
readable
side
,
without
any
changes.
This
can
be
useful
in
a
variety
of
scenarios
.
By
default,
the
TransformStream
constructor
will
create
an
identity
transform
stream,
when
no
transform()
method
is
present
on
the
transformer
object.
Some examples of potential transform streams include:
-
A GZIP compressor, to which uncompressed bytes are written and from which compressed bytes are read;
-
A video decoder, to which encoded bytes are written and from which uncompressed video frames are read;
-
A text decoder, to which bytes are written and from which strings are read;
-
A CSV-to-JSON converter, to which strings representing lines of a CSV file are written and from which corresponding JavaScript objects are read.
2.4. Pipe chains and backpressure
Streams
are
primarily
used
by
piping
them
to
each
other.
A
readable
stream
can
be
piped
directly
to
a
writable
stream,
using
its
pipeTo()
method,
or
it
can
be
piped
through
one
or
more
transform
streams
first,
using
its
pipeThrough()
method.
A set of streams piped together in this way is referred to as a pipe chain . In a pipe chain, the original source is the underlying source of the first readable stream in the chain; the ultimate sink is the underlying sink of the final writable stream in the chain.
Once a pipe chain is constructed, it will propagate signals regarding how fast chunks should flow through it. If any step in the chain cannot yet accept chunks, it propagates a signal backwards through the pipe chain, until eventually the original source is told to stop producing chunks so fast. This process of normalizing flow from the original source according to how fast the chain can process chunks is called backpressure .
Concretely,
the
original
source
is
given
the
controller.desiredSize
(or
byteController.desiredSize
)
value,
and
can
then
adjust
its
rate
of
data
flow
accordingly.
This
value
is
derived
from
the
writer.desiredSize
corresponding
to
the
ultimate
sink
,
which
gets
updated
as
the
ultimate
sink
finishes
writing
chunks
.
The
pipeTo()
method
used
to
construct
the
chain
automatically
ensures
this
information
propagates
back
through
the
pipe
chain
.
When teeing a readable stream, the backpressure signals from its two branches will aggregate, such that if neither branch is read from, a backpressure signal will be sent to the underlying source of the original stream.
Piping locks the readable and writable streams, preventing them from being manipulated for the duration of the pipe operation. This allows the implementation to perform important optimizations, such as directly shuttling data from the underlying source to the underlying sink while bypassing many of the intermediate queues.
2.5. Internal queues and queuing strategies
Both readable and writable streams maintain internal queues , which they use for similar purposes. In the case of a readable stream, the internal queue contains chunks that have been enqueued by the underlying source , but not yet read by the consumer. In the case of a writable stream, the internal queue contains chunks which have been written to the stream by the producer, but not yet processed and acknowledged by the underlying sink .
A queuing strategy is an object that determines how a stream should signal backpressure based on the state of its internal queue . The queuing strategy assigns a size to each chunk , and compares the total size of all chunks in the queue to a specified number, known as the high water mark . The resulting difference, high water mark minus total size, is used to determine the desired size to fill the stream’s queue .
For readable streams, an underlying source can use this desired size as a backpressure signal, slowing down chunk generation so as to try to keep the desired size above or at zero. For writable streams, a producer can behave similarly, avoiding writes that would cause the desired size to go negative.
Concretely
,
a
queuing
strategy
for
web
developer–created
streams
is
given
by
any
JavaScript
object
with
a
highWaterMark
property.
For
byte
streams
the
highWaterMark
always
has
units
of
bytes.
For
other
streams
the
default
unit
is
chunks
,
but
a
size()
function
can
be
included
in
the
strategy
object
which
returns
the
size
for
a
given
chunk.
This
permits
the
highWaterMark
to
be
specified
in
arbitrary
floating-point
units.
In
JavaScript,
such
a
strategy
could
be
written
manually
as
,
or
using
the
built-in
CountQueuingStrategy
class,
as
.
2.6. Locking
A
readable
stream
reader
,
or
simply
reader,
is
an
object
that
allows
direct
reading
of
chunks
from
a
readable
stream
.
Without
a
reader,
a
consumer
can
only
perform
high-level
operations
on
the
readable
stream:
canceling
the
stream,
or
piping
the
readable
stream
to
a
writable
stream.
A
reader
is
acquired
via
the
stream’s
getReader()
method.
A
readable
byte
stream
has
the
ability
to
vend
two
types
of
readers:
default
readers
and
BYOB
readers
.
BYOB
("bring
your
own
buffer")
readers
allow
reading
into
a
developer-supplied
buffer,
thus
minimizing
copies.
A
non-byte
readable
stream
can
only
vend
default
readers.
Default
readers
are
instances
of
the
ReadableStreamDefaultReader
class,
while
BYOB
readers
are
instances
of
ReadableStreamBYOBReader
.
Similarly,
a
writable
stream
writer
,
or
simply
writer,
is
an
object
that
allows
direct
writing
of
chunks
to
a
writable
stream
.
Without
a
writer,
a
producer
can
only
perform
the
high-level
operations
of
aborting
the
stream
or
piping
a
readable
stream
to
the
writable
stream.
Writers
are
represented
by
the
WritableStreamDefaultWriter
class.
Under the covers, these high-level operations actually use a reader or writer themselves.
A
given
readable
or
writable
stream
only
has
at
most
one
reader
or
writer
at
a
time.
We
say
in
this
case
the
stream
is
locked
,
and
that
the
reader
or
writer
is
active
.
This
state
can
be
determined
using
the
readableStream.locked
or
writableStream.locked
properties.
A
reader
or
writer
also
has
the
capability
to
release
its
lock
,
which
makes
it
no
longer
active,
and
allows
further
readers
or
writers
to
be
acquired.
This
is
done
via
the
defaultReader.releaseLock()
,
byobReader.releaseLock()
,
or
writer.releaseLock()
method,
as
appropriate.
3. Conventions
This specification depends on the Infra Standard. [INFRA]
This specification uses the abstract operation concept from the JavaScript specification for its internal algorithms. This includes treating their return values as completion records , and the use of ! and ? prefixes for unwrapping those completion records. [ECMASCRIPT]
This specification also uses the internal slot concept and notation from the JavaScript specification. (Although, the internal slots are on Web IDL platform objects instead of on JavaScript objects.)
The reasons for the usage of these foreign JavaScript specification conventions are largely historical. We urge you to avoid following our example when writing your own web specifications.
In
this
specification,
all
numbers
are
represented
as
double-precision
64-bit
IEEE
754
floating
point
values
(like
the
JavaScript
Number
type
or
Web
IDL
unrestricted
double
type),
and
all
arithmetic
operations
performed
on
them
must
be
done
in
the
standard
way
for
such
values.
This
is
particularly
important
for
the
data
structure
described
in
§ 8.1
Queue-with-sizes
.
[IEEE-754]
4. Readable streams
4.1. Using readable streams
readableStream. pipeTo( writableStream) . then(() => console. log( "All data successfully written!" )) . catch ( e=> console. error( "Something went wrong!" , e));
readableStream. pipeTo( new WritableStream({ write( chunk) { console. log( "Chunk received" , chunk); }, close() { console. log( "All data successfully read!" ); }, abort( e) { console. error( "Something went wrong!" , e); } }));
By
returning
promises
from
your
write()
implementation,
you
can
signal
backpressure
to
the
readable
stream.
read()
method
to
get
successive
chunks.
For
example,
this
code
logs
the
next
chunk
in
the
stream,
if
available:
const reader= readableStream. getReader(); reader. read(). then( ({ value, done}) => { if ( done) { console. log( "The stream was already closed!" ); } else { console. log( value); } }, e=> console. error( "The stream became errored and cannot be read from!" , e) );
This more manual method of reading a stream is mainly useful for library authors building new high-level operations on streams, beyond the provided ones of piping and teeing .
const reader= readableStream. getReader({ mode: "byob" }); let startingAB= new ArrayBuffer( 1024 ); const buffer= await readInto( startingAB); console. log( "The first 1024 bytes: " , buffer); async function readInto( buffer) { let offset= 0 ; while ( offset< buffer. byteLength) { const { value: view, done} = await reader. read( new Uint8Array( buffer, offset, buffer. byteLength- offset)); buffer= view. buffer; if ( done) { break ; } offset+= view. byteLength; } return buffer; }
An
important
thing
to
note
here
is
that
the
final
buffer
value
is
different
from
the
startingAB
,
but
it
(and
all
intermediate
buffers)
shares
the
same
backing
memory
allocation.
At
each
step,
the
buffer
is
transferred
to
a
new
ArrayBuffer
object.
The
view
is
destructured
from
the
return
value
of
reading
a
new
Uint8Array
,
with
that
ArrayBuffer
object
as
its
buffer
property,
the
offset
that
bytes
were
written
to
as
its
byteOffset
property,
and
the
number
of
bytes
that
were
written
as
its
byteLength
property.
4.2.
The
ReadableStream
class
The
ReadableStream
class
is
a
concrete
instance
of
the
general
readable
stream
concept.
It
is
adaptable
to
any
chunk
type,
and
maintains
an
internal
queue
to
keep
track
of
data
supplied
by
the
underlying
source
but
not
yet
read
by
any
consumer.
4.2.1. Interface definition
The
Web
IDL
definition
for
the
ReadableStream
class
is
given
as
follows:
[Exposed=*,Transferable ]interface {
ReadableStream constructor (optional object ,
underlyingSource optional QueuingStrategy = {});
strategy );static ReadableStream from(async iterable<any> asyncIterable);readonly attribute boolean locked ;);Promise <undefined >cancel (optional any );
reason ReadableStreamReader getReader (optional ReadableStreamGetReaderOptions = {});
options = {});ReadableStream pipeThrough (ReadableWritablePair ,
transform optional StreamPipeOptions = {});
options Promise <undefined >pipeTo (WritableStream ,
destination optional StreamPipeOptions = {});
options ();sequence <ReadableStream >tee ();= {});async iterable <any >(optional ReadableStreamIteratorOptions = {}); };
options typedef (ReadableStreamDefaultReader or ReadableStreamBYOBReader );
ReadableStreamReader enum {
ReadableStreamReaderMode };
"byob" dictionary {
ReadableStreamGetReaderOptions ReadableStreamReaderMode ; };
mode dictionary {
ReadableStreamIteratorOptions boolean =
preventCancel false ; };dictionary {
ReadableWritablePair ;required ReadableStream ;
readable required WritableStream ; };
writable dictionary {
StreamPipeOptions boolean =
preventClose false ;boolean =
preventAbort false ;boolean =
preventCancel false ;AbortSignal ; };
signal
4.2.2. Internal slots
Instances
of
ReadableStream
are
created
with
the
internal
slots
described
in
the
following
table:
Internal Slot | Description ( non-normative ) |
---|---|
[[controller]] |
A
ReadableStreamDefaultController
or
ReadableByteStreamController
created
with
the
ability
to
control
the
state
and
queue
of
this
stream
|
[[Detached]] | A boolean flag set to true when the stream is transferred |
[[disturbed]] | A boolean flag set to true when the stream has been read from or canceled |
[[reader]] |
A
ReadableStreamDefaultReader
or
ReadableStreamBYOBReader
instance,
if
the
stream
is
locked
to
a
reader
,
or
undefined
if
it
is
not
|
[[state]] |
A
string
containing
the
stream’s
current
state,
used
internally;
one
of
"
readable
",
"
closed
",
or
"
errored
"
|
[[storedError]] | A value indicating how the stream failed, to be given as a failure reason or exception when trying to operate on an errored stream |
4.2.3. The underlying source API
The
ReadableStream()
constructor
accepts
as
its
first
argument
a
JavaScript
object
representing
the
underlying
source
.
Such
objects
can
contain
any
of
the
following
properties:
dictionary {
UnderlyingSource UnderlyingSourceStartCallback start ;UnderlyingSourcePullCallback pull ;UnderlyingSourceCancelCallback cancel ;ReadableStreamType type ; [EnforceRange ]unsigned long long autoAllocateChunkSize ; };typedef (ReadableStreamDefaultController or ReadableByteStreamController );
ReadableStreamController );callback =
UnderlyingSourceStartCallback any (ReadableStreamController );
controller callback =
UnderlyingSourcePullCallback Promise <undefined > (ReadableStreamController );
controller );callback =
UnderlyingSourceCancelCallback Promise <undefined > (optional any );
reason enum {
ReadableStreamType "bytes" };
-
start( controller )
, of type UnderlyingSourceStartCallback -
A function that is called immediately during creation of the
ReadableStream
.Typically this is used to adapt a push source by setting up relevant event listeners, as in the example of § 10.1 A readable stream with an underlying push source (no backpressure support) , or to acquire access to a pull source , as in § 10.4 A readable stream with an underlying pull source .
If this setup process is asynchronous, it can return a promise to signal success or failure; a rejected promise will error the stream. Any thrown exceptions will be re-thrown by the
ReadableStream()
constructor. -
pull( controller )
, of type UnderlyingSourcePullCallback -
A function that is called whenever the stream’s internal queue of chunks becomes not full, i.e. whenever the queue’s desired size becomes positive. Generally, it will be called repeatedly until the queue reaches its high water mark (i.e. until the desired size becomes non-positive).
For push sources , this can be used to resume a paused flow, as in § 10.2 A readable stream with an underlying push source and backpressure support . For pull sources , it is used to acquire new chunks to enqueue into the stream, as in § 10.4 A readable stream with an underlying pull source .
This function will not be called until
start()
successfully completes. Additionally, it will only be called repeatedly if it enqueues at least one chunk or fulfills a BYOB request; a no-oppull()
implementation will not be continually called.If the function returns a promise, then it will not be called again until that promise fulfills. (If the promise rejects, the stream will become errored.) This is mainly used in the case of pull sources, where the promise returned represents the process of acquiring a new chunk. Throwing an exception is treated the same as returning a rejected promise.
-
cancel( reason )
, of type UnderlyingSourceCancelCallback -
A function that is called whenever the consumer cancels the stream, via
stream.cancel()
orreader.cancel()
. It takes as its argument the same value as was passed to those methods by the consumer.Readable streams can additionally be canceled under certain conditions during piping ; see the definition of the
pipeTo()
method for more details.For all streams, this is generally used to release access to the underlying resource; see for example § 10.1 A readable stream with an underlying push source (no backpressure support) .
If the shutdown process is asynchronous, it can return a promise to signal success or failure; the result will be communicated via the return value of the
cancel()
method that was called. Throwing an exception is treated the same as returning a rejected promise.Even if the cancelation process fails, the stream will still close; it will not be put into an errored state. This is because a failure in the cancelation process doesn’t matter to the consumer’s view of the stream, once they’ve expressed disinterest in it by canceling. The failure is only communicated to the immediate caller of the corresponding method.
This is different from the behavior of the
close
andabort
options of aWritableStream
's underlying sink , which upon failure put the correspondingWritableStream
into an errored state. Those correspond to specific actions the producer is requesting and, if those actions fail, they indicate something more persistently wrong. -
type
(byte streams only), of type ReadableStreamType -
Can be set to "
bytes
" to signal that the constructedReadableStream
is a readable byte stream . This ensures that the resultingReadableStream
will successfully be able to vend BYOB readers via itsgetReader()
method. It also affects the controller argument passed to thestart()
andpull()
methods; see below.For an example of how to set up a readable byte stream, including using the different controller interface, see § 10.3 A readable byte stream with an underlying push source (no backpressure support) .
Setting any value other than "
bytes
" or undefined will cause theReadableStream()
constructor to throw an exception. -
autoAllocateChunkSize
(byte streams only), of type unsigned long long -
Can be set to a positive integer to cause the implementation to automatically allocate buffers for the underlying source code to write into. In this case, when a consumer is using a default reader , the stream implementation will automatically allocate an
ArrayBuffer
of the given size, so thatcontroller.byobRequest
is always present, as if the consumer was using a BYOB reader .This is generally used to cut down on the amount of code needed to handle consumers that use default readers, as can be seen by comparing § 10.3 A readable byte stream with an underlying push source (no backpressure support) without auto-allocation to § 10.5 A readable byte stream with an underlying pull source with auto-allocation.
The
type
of
the
controller
argument
passed
to
the
start()
and
pull()
methods
depends
on
the
value
of
the
type
option.
If
type
is
set
to
undefined
(including
via
omission),
then
controller
will
be
a
ReadableStreamDefaultController
.
If
it’s
set
to
"
bytes
",
then
controller
will
be
a
ReadableByteStreamController
.
4.2.4. Constructor, methods, and properties
-
stream = new
ReadableStream
( underlyingSource [, strategy ]) -
Creates a new
ReadableStream
wrapping the provided underlying source . See § 4.2.3 The underlying source API for more details on the underlyingSource argument.The strategy argument represents the stream’s queuing strategy , as described in § 7.1 The queuing strategy API . If it is not provided, the default behavior will be the same as a
CountQueuingStrategy
with a high water mark of 1. -
stream =
ReadableStream.from
( asyncIterable ) -
Creates a new
ReadableStream
wrapping the provided iterable or async iterable .This can be used to adapt various kinds of objects into a readable stream , such as an array , an async generator , or a Node.js readable stream .
-
isLocked = stream .
locked
-
Returns whether or not the readable stream is locked to a reader .
-
await stream .
cancel
([ reason ]) -
Cancels the stream, signaling a loss of interest in the stream by a consumer. The supplied reason argument will be given to the underlying source’s
cancel()
method, which might or might not use it.The returned promise will fulfill if the stream shuts down successfully, or reject if the underlying source signaled that there was an error doing so. Additionally, it will reject with a
TypeError
(without attempting to cancel the stream) if the stream is currently locked . -
reader = stream .
getReader
() -
Creates a
ReadableStreamDefaultReader
and locks the stream to the new reader. While the stream is locked, no other reader can be acquired until this one is released .This functionality is especially useful for creating abstractions that desire the ability to consume a stream in its entirety. By getting a reader for the stream, you can ensure nobody else can interleave reads with yours or cancel the stream, which would interfere with your abstraction.
-
reader = stream .
getReader
({mode
: "byob
" }) -
Creates a
ReadableStreamBYOBReader
and locks the stream to the new reader.This call behaves the same way as the no-argument variant, except that it only works on readable byte streams , i.e. streams which were constructed specifically with the ability to handle "bring your own buffer" reading. The returned BYOB reader provides the ability to directly read individual chunks from the stream via its
read()
method, into developer-supplied buffers, allowing more precise control over allocation. -
readable = stream .
pipeThrough
({writable
,readable
}[, {preventClose
,preventAbort
,preventCancel
,signal
}]) -
Provides a convenient, chainable way of piping this readable stream through a transform stream (or any other
{ writable, readable }
pair). It simply pipes the stream into the writable side of the supplied pair, and returns the readable side for further use.Piping a stream will lock it for the duration of the pipe, preventing any other consumer from acquiring a reader.
-
await stream .
pipeTo
( destination [, {preventClose
,preventAbort
,preventCancel
,signal
}]) -
Pipes this readable stream to a given writable stream destination . The way in which the piping process behaves under various error conditions can be customized with a number of passed options. It returns a promise that fulfills when the piping process completes successfully, or rejects if any errors were encountered.
Piping a stream will lock it for the duration of the pipe, preventing any other consumer from acquiring a reader.
Errors and closures of the source and destination streams propagate as follows:
-
An error in this source readable stream will abort destination , unless
preventAbort
is truthy. The returned promise will be rejected with the source’s error, or with any error that occurs during aborting the destination. -
An error in destination will cancel this source readable stream , unless
preventCancel
is truthy. The returned promise will be rejected with the destination’s error, or with any error that occurs during canceling the source. -
When this source readable stream closes, destination will be closed, unless
preventClose
is truthy. The returned promise will be fulfilled once this process completes, unless an error is encountered while closing the destination, in which case it will be rejected with that error. -
If destination starts out closed or closing, this source readable stream will be canceled , unless
preventCancel
is true. The returned promise will be rejected with an error indicating piping to a closed stream failed, or with any error that occurs during canceling the source.
The
signal
option can be set to anAbortSignal
to allow aborting an ongoing pipe operation via the correspondingAbortController
. In this case, this source readable stream will be canceled , and destination aborted , unless the respective optionspreventCancel
orpreventAbort
are set. -
-
[ branch1 , branch2 ] = stream .
tee
() -
Tees this readable stream, returning a two-element array containing the two resulting branches as new
ReadableStream
instances.Teeing a stream will lock it, preventing any other consumer from acquiring a reader. To cancel the stream, cancel both of the resulting branches; a composite cancellation reason will then be propagated to the stream’s underlying source .
If this stream is a readable byte stream , then each branch will receive its own copy of each chunk . If not, then the chunks seen in each branch will be the same object. If the chunks are not immutable, this could allow interference between the two branches.
new
ReadableStream(
underlyingSource
,
strategy
)
constructor
steps
are:
-
If underlyingSource is missing, set it to null.
-
Let underlyingSourceDict be underlyingSource , converted to an IDL value of type
UnderlyingSource
.We cannot declare the underlyingSource argument as having the
UnderlyingSource
type directly, because doing so would lose the reference to the original object. We need to retain the object so we can invoke the various methods on it. -
Perform ! InitializeReadableStream ( this ).
-
If underlyingSourceDict ["
type
"] is "bytes
":-
If strategy ["
size
"] exists , throw aRangeError
exception. -
Let highWaterMark be ? ExtractHighWaterMark ( strategy , 0).
-
Perform ? SetUpReadableByteStreamControllerFromUnderlyingSource ( this , underlyingSource , underlyingSourceDict , highWaterMark ).
-
-
Otherwise,
-
Let sizeAlgorithm be ! ExtractSizeAlgorithm ( strategy ).
-
Let highWaterMark be ? ExtractHighWaterMark ( strategy , 1).
-
Perform ? SetUpReadableStreamDefaultControllerFromUnderlyingSource ( this , underlyingSource , underlyingSourceDict , highWaterMark , sizeAlgorithm ).
from(
asyncIterable
)
method
steps
are:
-
Return ? ReadableStreamFromIterable ( asyncIterable ).
locked
getter
steps
are:
-
Return ! IsReadableStreamLocked ( this ).
cancel(
reason
)
method
steps
are:
-
If ! IsReadableStreamLocked ( this ) is true, return a promise rejected with a
TypeError
exception. -
Return ! ReadableStreamCancel ( this , reason ).
getReader(
options
)
method
steps
are:
-
If options ["
mode
"] does not exist , return ? AcquireReadableStreamDefaultReader ( this ). -
Return ? AcquireReadableStreamBYOBReader ( this ).
function readAllChunks( readableStream) { const reader= readableStream. getReader(); const chunks= []; return pump(); function pump() { return reader. read(). then(({ value, done}) => { if ( done) { return chunks; } chunks. push( value); return pump(); }); } }
Note how the first thing it does is obtain a reader, and from then on it uses the reader exclusively. This ensures that no other consumer can interfere with the stream, either by reading chunks or by canceling the stream.
pipeThrough(
transform
,
options
)
method
steps
are:
-
If ! IsReadableStreamLocked ( this ) is true, throw a
TypeError
exception. -
If ! IsWritableStreamLocked ( transform ["
writable
"]) is true, throw aTypeError
exception. -
Let signal be options ["
signal
"] if it exists , or undefined otherwise. -
Let promise be ! ReadableStreamPipeTo ( this , transform ["
writable
"], options ["preventClose
"], options ["preventAbort
"], options ["preventCancel
"], signal ). -
Set promise .[[PromiseIsHandled]] to true.
-
Return transform ["
readable
"].
pipeThrough(transform,
options)
would
look
like
httpResponseBody. pipeThrough( decompressorTransform) . pipeThrough( ignoreNonImageFilesTransform) . pipeTo( mediaGallery);
pipeTo(
destination
,
options
)
method
steps
are:
-
If ! IsReadableStreamLocked ( this ) is true, return a promise rejected with a
TypeError
exception. -
If ! IsWritableStreamLocked ( destination ) is true, return a promise rejected with a
TypeError
exception. -
Let signal be options ["
signal
"] if it exists , or undefined otherwise. -
Return ! ReadableStreamPipeTo ( this , destination , options ["
preventClose
"], options ["preventAbort
"], options ["preventCancel
"], signal ).
AbortSignal
,
as
follows:
const controller= new AbortController(); readable. pipeTo( writable, { signal: controller. signal}); // ... some time later ... controller. abort();
(The
above
omits
error
handling
for
the
promise
returned
by
pipeTo()
.
Additionally,
the
impact
of
the
preventAbort
and
preventCancel
options
what
happens
when
piping
is
stopped
are
worth
considering.)
ReadableStream
being
piped,
while
writing
into
the
same
WritableStream
:
const controller= new AbortController(); const pipePromise= readable1. pipeTo( writable, { preventAbort: true , signal: controller. signal}); // ... some time later ... controller. abort(); // Wait for the pipe to complete before starting a new one: try { await pipePromise; } catch ( e) { // Swallow "AbortError" DOMExceptions as expected, but rethrow any unexpected failures. if ( e. name!== "AbortError" ) { throw e; } } // Start the new pipe! readable2. pipeTo( writable);
tee()
method
steps
are:
-
Return ? ReadableStreamTee ( this , false).
cacheEntry
representing
an
on-disk
file,
and
another
writable
stream
httpRequestBody
representing
an
upload
to
a
remote
server,
you
could
pipe
the
same
readable
stream
to
both
destinations
at
once:
const [ forLocal, forRemote] = readableStream. tee(); Promise. all([ forLocal. pipeTo( cacheEntry), forRemote. pipeTo( httpRequestBody) ]) . then(() => console. log( "Saved the stream to the cache and also uploaded it!" )) . catch ( e=> console. error( "Either caching or uploading failed: " , e));
4.2.5. Asynchronous iteration
-
for await (const chunk of stream ) { ... }
-
for await (const chunk of stream .values({
preventCancel
: true })) { ... } -
Asynchronously iterates over the chunks in the stream’s internal queue.
Asynchronously iterating over the stream will lock it, preventing any other consumer from acquiring a reader. The lock will be released if the async iterator’s
return()
method is called, e.g. bybreak
ing out of the loop.By default, calling the async iterator’s
return()
method will also cancel the stream. To prevent this, use the stream’svalues()
method, passing true for thepreventCancel
option.
ReadableStream
,
given
stream
,
iterator
,
and
args
,
are:
-
Let reader be ? AcquireReadableStreamDefaultReader ( stream ).
-
Set iterator ’s reader to reader .
-
Let preventCancel be args [0]["
preventCancel
"]. -
Set iterator ’s prevent cancel to preventCancel .
ReadableStream
,
given
stream
and
iterator
,
are:
-
Let reader be iterator ’s reader .
-
Assert: reader . [[stream]] is not undefined.
-
Let promise be a new promise .
-
Let readRequest be a new read request with the following items :
- chunk steps , given chunk
-
-
Resolve promise with chunk .
-
- close steps
-
-
Perform ! ReadableStreamDefaultReaderRelease ( reader ).
-
Resolve promise with end of iteration .
-
- error steps , given e
-
-
Perform ! ReadableStreamDefaultReaderRelease ( reader ).
-
Reject promise with e .
-
-
Perform ! ReadableStreamDefaultReaderRead ( this , readRequest ).
-
Return promise .
ReadableStream
,
given
stream
,
iterator
,
and
arg
,
are:
-
Let reader be iterator ’s reader .
-
Assert: reader . [[stream]] is not undefined.
-
Assert: reader . [[readRequests]] is empty , as the async iterator machinery guarantees that any previous calls to
next()
have settled before this is called. -
If iterator ’s prevent cancel is false:
-
Let result be ! ReadableStreamReaderGenericCancel ( reader , arg ).
-
Perform ! ReadableStreamDefaultReaderRelease ( reader ).
-
Return result .
-
-
Perform ! ReadableStreamDefaultReaderRelease ( reader ).
-
Return a promise resolved with undefined.
4.2.6.
Transfer
via
postMessage()
-
destination.postMessage(rs, { transfer: [rs] });
-
Sends a
ReadableStream
to another frame, window, or worker.The transferred stream can be used exactly like the original. The original will become locked and no longer directly usable.
ReadableStream
objects
are
transferable
objects
.
Their
transfer
steps
,
given
value
and
dataHolder
,
are:
-
If ! IsReadableStreamLocked ( value ) is true, throw a "
DataCloneError
"DOMException
. -
Let port1 be a new
MessagePort
in the current Realm . -
Let port2 be a new
MessagePort
in the current Realm . -
Entangle port1 and port2 .
-
Let writable be a new
WritableStream
in the current Realm . -
Perform ! SetUpCrossRealmTransformWritable ( writable , port1 ).
-
Let promise be ! ReadableStreamPipeTo ( value , writable , false, false, false).
-
Set promise .[[PromiseIsHandled]] to true.
-
Set dataHolder .[[port]] to ! StructuredSerializeWithTransfer ( port2 , « port2 »).
-
Let deserializedRecord be ! StructuredDeserializeWithTransfer ( dataHolder .[[port]], the current Realm ).
-
Let port be deserializedRecord .[[Deserialized]].
-
Perform ! SetUpCrossRealmTransformReadable ( value , port ).
4.3.
The
ReadableStreamGenericReader
mixin
The
ReadableStreamGenericReader
mixin
defines
common
internal
slots,
getters
and
methods
that
are
shared
between
ReadableStreamDefaultReader
and
ReadableStreamBYOBReader
objects.
4.3.1. Mixin definition
The
Web
IDL
definition
for
the
ReadableStreamGenericReader
mixin
is
given
as
follows:
interface mixin {
ReadableStreamGenericReader readonly attribute Promise <undefined >closed ;);Promise <undefined >cancel (optional any ); };
reason
4.3.2. Internal slots
Instances
of
classes
including
the
ReadableStreamGenericReader
mixin
are
created
with
the
internal
slots
described
in
the
following
table:
Internal Slot | Description ( non-normative ) |
---|---|
[[closedPromise]] |
A
promise
returned
by
the
reader’s
closed
getter
|
[[stream]] |
A
ReadableStream
instance
that
owns
this
reader
|
4.3.3. Methods and properties
closed
getter
steps
are:
-
Return this . [[closedPromise]] .
cancel(
reason
)
method
steps
are:
-
If this . [[stream]] is undefined, return a promise rejected with a
TypeError
exception. -
Return ! ReadableStreamReaderGenericCancel ( this , reason ).
4.4.
The
ReadableStreamDefaultReader
class
The
ReadableStreamDefaultReader
class
represents
a
default
reader
designed
to
be
vended
by
a
ReadableStream
instance.
4.4.1. Interface definition
The
Web
IDL
definition
for
the
ReadableStreamDefaultReader
class
is
given
as
follows:
[Exposed=*]interface {
ReadableStreamDefaultReader );constructor (ReadableStream );
stream Promise <ReadableStreamReadResult >read ();undefined releaseLock (); };ReadableStreamDefaultReader includes ReadableStreamGenericReader ;dictionary {
ReadableStreamReadResult ;any ;
value boolean ; };
done
4.4.2. Internal slots
Instances
of
ReadableStreamDefaultReader
are
created
with
the
internal
slots
defined
by
ReadableStreamGenericReader
,
and
those
described
in
the
following
table:
Internal Slot | Description ( non-normative ) |
---|---|
[[readRequests]] | A list of read requests , used when a consumer requests chunks sooner than they are available |
A read request is a struct containing three algorithms to perform in reaction to filling the readable stream 's internal queue or changing its state. It has the following items :
- chunk steps
-
An algorithm taking a chunk , called when a chunk is available for reading
- close steps
-
An algorithm taking no arguments, called when no chunks are available because the stream is closed
- error steps
-
An algorithm taking a JavaScript value, called when no chunks are available because the stream is errored
4.4.3. Constructor, methods, and properties
-
reader = new
ReadableStreamDefaultReader
( stream ) -
This is equivalent to calling
stream .
.getReader()
-
await reader .
closed
-
Returns a promise that will be fulfilled when the stream becomes closed, or rejected if the stream ever errors or the reader’s lock is released before the stream finishes closing.
-
await reader .
cancel
([ reason ]) -
If the reader is active , behaves the same as
stream .
.cancel
( reason ) -
{ value , done } = await reader .
read
() -
Returns a promise that allows access to the next chunk from the stream’s internal queue, if available.
-
If
the
chunk
does
become
available,
the
promise
will
be
fulfilled
with
an
object
of
the
form
{ value: theChunk, done: false } -
If
the
stream
becomes
closed,
the
promise
will
be
fulfilled
with
an
object
of
the
form
{ value: undefined , done: true } - If the stream becomes errored, the promise will be rejected with the relevant error.
If reading a chunk causes the queue to become empty, more data will be pulled from the underlying source .
-
If
the
chunk
does
become
available,
the
promise
will
be
fulfilled
with
an
object
of
the
form
-
reader .
releaseLock
() -
Releases the reader’s lock on the corresponding stream. After the lock is released, the reader is no longer active . If the associated stream is errored when the lock is released, the reader will appear errored in the same way from now on; otherwise, the reader will appear closed.
If the reader’s lock is released while it still has pending read requests, then the promises returned by the reader’s
read()
method are immediately rejected with aTypeError
. Any unread chunks remain in the stream’s internal queue and can be read later by acquiring a new reader.
new
ReadableStreamDefaultReader(
stream
)
constructor
steps
are:
-
Perform ? SetUpReadableStreamDefaultReader ( this , stream ).
read()
method
steps
are:
-
If this . [[stream]] is undefined, return a promise rejected with a
TypeError
exception. -
Let promise be a new promise .
-
Let readRequest be a new read request with the following items :
- chunk steps , given chunk
- close steps
- error steps , given e
-
-
Reject promise with e .
-
-
Perform ! ReadableStreamDefaultReaderRead ( this , readRequest ).
-
Return promise .
releaseLock()
method
steps
are:
-
If this . [[stream]] is undefined, return.
-
Perform ! ReadableStreamDefaultReaderRelease ( this ).
4.5.
The
ReadableStreamBYOBReader
class
The
ReadableStreamBYOBReader
class
represents
a
BYOB
reader
designed
to
be
vended
by
a
ReadableStream
instance.
4.5.1. Interface definition
The
Web
IDL
definition
for
the
ReadableStreamBYOBReader
class
is
given
as
follows:
[Exposed=*]interface {
ReadableStreamBYOBReader );constructor (ReadableStream );
stream Promise <ReadableStreamReadResult >read (ArrayBufferView );
view undefined releaseLock (); };ReadableStreamBYOBReader includes ReadableStreamGenericReader ;
4.5.2. Internal slots
Instances
of
ReadableStreamBYOBReader
are
created
with
the
internal
slots
defined
by
ReadableStreamGenericReader
,
and
those
described
in
the
following
table:
Internal Slot | Description ( non-normative ) |
---|---|
[[readIntoRequests]] | A list of read-into requests , used when a consumer requests chunks sooner than they are available |
A read-into request is a struct containing three algorithms to perform in reaction to filling the readable byte stream 's internal queue or changing its state. It has the following items :
- chunk steps
-
An algorithm taking a chunk , called when a chunk is available for reading
- close steps
-
An algorithm taking a chunk or undefined, called when no chunks are available because the stream is closed
- error steps
-
An algorithm taking a JavaScript value, called when no chunks are available because the stream is errored
The
close
steps
take
a
chunk
so
that
it
can
return
the
backing
memory
to
the
caller
if
possible.
For
example,
byobReader.read(chunk)
will
fulfill
with
for
closed
streams.
If
the
stream
is
canceled
,
the
backing
memory
is
discarded
and
byobReader.read(chunk)
fulfills
with
the
more
traditional
instead.
4.5.3. Constructor, methods, and properties
-
reader = new
ReadableStreamBYOBReader
( stream ) -
This is equivalent to calling
stream .
.getReader
({mode
: "byob
" }) -
await reader .
closed
-
Returns a promise that will be fulfilled when the stream becomes closed, or rejected if the stream ever errors or the reader’s lock is released before the stream finishes closing.
-
await reader .
cancel
([ reason ]) -
If the reader is active , behaves the same
stream .
.cancel
( reason ) -
{ value , done } = await reader .
read
( view ) -
Attempts to reads bytes into view , and returns a promise resolved with the result:
-
If
the
chunk
does
become
available,
the
promise
will
be
fulfilled
with
an
object
of
the
form
{ value: theChunk, done: false } theChunk
will be a new view (of the same type) onto the same backing memory region, with the chunk’s data written into it. -
If
the
stream
becomes
closed,
the
promise
will
be
fulfilled
with
an
object
of
the
form
{ value: theChunk, done: true } theChunk
will be a new view (of the same type) onto the same backing memory region, with no modifications, to ensure the memory is returned to the caller. -
If
the
reader
is
canceled
,
the
promise
will
be
fulfilled
with
an
object
of
the
form
{ value: undefined , done: true } - If the stream becomes errored, the promise will be rejected with the relevant error.
If reading a chunk causes the queue to become empty, more data will be pulled from the underlying source .
-
If
the
chunk
does
become
available,
the
promise
will
be
fulfilled
with
an
object
of
the
form
-
reader .
releaseLock
() -
Releases the reader’s lock on the corresponding stream. After the lock is released, the reader is no longer active . If the associated stream is errored when the lock is released, the reader will appear errored in the same way from now on; otherwise, the reader will appear closed.
If the reader’s lock is released while it still has pending read requests, then the promises returned by the reader’s
read()
method are immediately rejected with aTypeError
. Any unread chunks remain in the stream’s internal queue and can be read later by acquiring a new reader.
new
ReadableStreamBYOBReader(
stream
)
constructor
steps
are:
-
Perform ? SetUpReadableStreamBYOBReader ( this , stream ).
read(
view
)
method
steps
are:
-
If view .[[ByteLength]] is 0, return a promise rejected with a
TypeError
exception. -
If view .[[ViewedArrayBuffer]].[[ArrayBufferByteLength]] is 0, return a promise rejected with a
TypeError
exception. -
If ! IsDetachedBuffer ( view .[[ViewedArrayBuffer]]) is true, return a promise rejected with a
TypeError
exception. -
If this . [[stream]] is undefined, return a promise rejected with a
TypeError
exception. -
Let promise be a new promise .
-
Let readIntoRequest be a new read-into request with the following items :
- chunk steps , given chunk
- close steps , given chunk
- error steps , given e
-
-
Reject promise with e .
-
-
Perform ! ReadableStreamBYOBReaderRead ( this , view , readIntoRequest ).
-
Return promise .
releaseLock()
method
steps
are:
-
If this . [[stream]] is undefined, return.
-
Perform ! ReadableStreamBYOBReaderRelease ( this ).
4.6.
The
ReadableStreamDefaultController
class
The
ReadableStreamDefaultController
class
has
methods
that
allow
control
of
a
ReadableStream
's
state
and
internal
queue
.
When
constructing
a
ReadableStream
that
is
not
a
readable
byte
stream
,
the
underlying
source
is
given
a
corresponding
ReadableStreamDefaultController
instance
to
manipulate.
4.6.1. Interface definition
The
Web
IDL
definition
for
the
ReadableStreamDefaultController
class
is
given
as
follows:
[Exposed=*]interface {
ReadableStreamDefaultController readonly attribute unrestricted double ?desiredSize ;undefined close ();); );undefined enqueue (optional any );
chunk undefined error (optional any ); };
e
4.6.2. Internal slots
Instances
of
ReadableStreamDefaultController
are
created
with
the
internal
slots
described
in
the
following
table:
Internal Slot | Description ( non-normative ) |
---|---|
[[cancelAlgorithm]] | A promise-returning algorithm, taking one argument (the cancel reason), which communicates a requested cancelation to the underlying source |
[[closeRequested]] | A boolean flag indicating whether the stream has been closed by its underlying source , but still has chunks in its internal queue that have not yet been read |
[[pullAgain]] | A boolean flag set to true if the stream’s mechanisms requested a call to the underlying source 's pull algorithm to pull more data, but the pull could not yet be done since a previous call is still executing |
[[pullAlgorithm]] | A promise-returning algorithm that pulls data from the underlying source |
[[pulling]] | A boolean flag set to true while the underlying source 's pull algorithm is executing and the returned promise has not yet fulfilled, used to prevent reentrant calls |
[[queue]] | A list representing the stream’s internal queue of chunks |
[[queueTotalSize]] | The total size of all the chunks stored in [[queue]] (see § 8.1 Queue-with-sizes ) |
[[started]] | A boolean flag indicating whether the underlying source has finished starting |
[[strategyHWM]] | A number supplied to the constructor as part of the stream’s queuing strategy , indicating the point at which the stream will apply backpressure to its underlying source |
[[strategySizeAlgorithm]] | An algorithm to calculate the size of enqueued chunks , as part of the stream’s queuing strategy |
[[stream]] |
The
ReadableStream
instance
controlled
|
4.6.3. Methods and properties
-
desiredSize = controller .
desiredSize
-
Returns the desired size to fill the controlled stream’s internal queue . It can be negative, if the queue is over-full. An underlying source ought to use this information to determine when and how to apply backpressure .
-
controller .
close
() -
Closes the controlled readable stream. Consumers will still be able to read any previously-enqueued chunks from the stream, but once those are read, the stream will become closed.
-
controller .
enqueue
( chunk ) -
Enqueues the given chunk chunk in the controlled readable stream.
-
controller .
error
( e ) -
Errors the controlled readable stream, making all future interactions with it fail with the given error e .
desiredSize
getter
steps
are:
-
Return ! ReadableStreamDefaultControllerGetDesiredSize ( this ).
close()
method
steps
are:
-
If ! ReadableStreamDefaultControllerCanCloseOrEnqueue ( this ) is false, throw a
TypeError
exception. -
Perform ! ReadableStreamDefaultControllerClose ( this ).
enqueue(
chunk
)
method
steps
are:
-
If ! ReadableStreamDefaultControllerCanCloseOrEnqueue ( this ) is false, throw a
TypeError
exception. -
Perform ? ReadableStreamDefaultControllerEnqueue ( this , chunk ).
error(
e
)
method
steps
are:
-
Perform ! ReadableStreamDefaultControllerError ( this , e ).
4.6.4. Internal methods
The
following
are
internal
methods
implemented
by
each
ReadableStreamDefaultController
instance.
The
readable
stream
implementation
will
polymorphically
call
to
either
these,
or
to
their
counterparts
for
BYOB
controllers,
as
discussed
in
§ 4.9.2
Interfacing
with
controllers
.
-
Perform ! ResetQueue ( this ).
-
Let result be the result of performing this . [[cancelAlgorithm]] , passing reason .
-
Perform ! ReadableStreamDefaultControllerClearAlgorithms ( this ).
-
Return result .
-
Let stream be this . [[stream]] .
-
If this . [[queue]] is not empty ,
-
Let chunk be ! DequeueValue ( this ).
-
If this . [[closeRequested]] is true and this . [[queue]] is empty ,
-
Perform ! ReadableStreamDefaultControllerClearAlgorithms ( this ).
-
Perform ! ReadableStreamClose ( stream ).
-
-
Otherwise, perform ! ReadableStreamDefaultControllerCallPullIfNeeded ( this ).
-
Perform readRequest ’s chunk steps , given chunk .
-
-
Otherwise,
-
Perform ! ReadableStreamAddReadRequest ( stream , readRequest ).
-
Perform ! ReadableStreamDefaultControllerCallPullIfNeeded ( this ).
-
-
Return.
4.7.
The
ReadableByteStreamController
class
The
ReadableByteStreamController
class
has
methods
that
allow
control
of
a
ReadableStream
's
state
and
internal
queue
.
When
constructing
a
ReadableStream
that
is
a
readable
byte
stream
,
the
underlying
source
is
given
a
corresponding
ReadableByteStreamController
instance
to
manipulate.
4.7.1. Interface definition
The
Web
IDL
definition
for
the
ReadableByteStreamController
class
is
given
as
follows:
[Exposed=*]interface {
ReadableByteStreamController readonly attribute ReadableStreamBYOBRequest ?byobRequest ;readonly attribute unrestricted double ?desiredSize ;undefined close ();undefined enqueue (ArrayBufferView );
chunk );undefined error (optional any ); };
e
4.7.2. Internal slots
Instances
of
ReadableByteStreamController
are
created
with
the
internal
slots
described
in
the
following
table:
Internal Slot | Description ( non-normative ) |
---|---|
[[autoAllocateChunkSize]] | A positive integer, when the automatic buffer allocation feature is enabled. In that case, this value specifies the size of buffer to allocate. It is undefined otherwise. |
[[byobRequest]] |
A
ReadableStreamBYOBRequest
instance
representing
the
current
BYOB
pull
request,
or
null
if
there
are
no
pending
requests
|
[[cancelAlgorithm]] | A promise-returning algorithm, taking one argument (the cancel reason), which communicates a requested cancelation to the underlying byte source |
[[closeRequested]] | A boolean flag indicating whether the stream has been closed by its underlying byte source , but still has chunks in its internal queue that have not yet been read |
[[pullAgain]] | A boolean flag set to true if the stream’s mechanisms requested a call to the underlying byte source 's pull algorithm to pull more data, but the pull could not yet be done since a previous call is still executing |
[[pullAlgorithm]] | A promise-returning algorithm that pulls data from the underlying byte source |
[[pulling]] | A boolean flag set to true while the underlying byte source 's pull algorithm is executing and the returned promise has not yet fulfilled, used to prevent reentrant calls |
[[pendingPullIntos]] | A list of pull-into descriptors |
[[queue]] | A list of readable byte stream queue entries representing the stream’s internal queue of chunks |
[[queueTotalSize]] | The total size, in bytes, of all the chunks stored in [[queue]] (see § 8.1 Queue-with-sizes ) |
[[started]] | A boolean flag indicating whether the underlying byte source has finished starting |
[[strategyHWM]] | A number supplied to the constructor as part of the stream’s queuing strategy , indicating the point at which the stream will apply backpressure to its underlying byte source |
[[stream]] |
The
ReadableStream
instance
controlled
|
Although
ReadableByteStreamController
instances
have
[[queue]]
and
[[queueTotalSize]]
slots,
we
do
not
use
most
of
the
abstract
operations
in
§ 8.1
Queue-with-sizes
on
them,
as
the
way
in
which
we
manipulate
this
queue
is
rather
different
than
the
others
in
the
spec.
Instead,
we
update
the
two
slots
together
manually.
This might be cleaned up in a future spec refactoring.
A readable byte stream queue entry is a struct encapsulating the important aspects of a chunk for the specific case of readable byte streams . It has the following items :
- buffer
-
An
ArrayBuffer
, which will be a transferred version of the one originally supplied by the underlying byte source - byte offset
-
A nonnegative integer number giving the byte offset derived from the view originally supplied by the underlying byte source
- byte length
-
A nonnegative integer number giving the byte length derived from the view originally supplied by the underlying byte source
A pull-into descriptor is a struct used to represent pending BYOB pull requests. It has the following items :
- buffer
-
An
ArrayBuffer
- buffer byte length
-
A positive integer representing the initial byte length of buffer
- byte offset
-
A nonnegative integer byte offset into the buffer where the underlying byte source will start writing
- byte length
-
A positive integer number of bytes which can be written into the buffer
- bytes filled
-
A nonnegative integer number of bytes that have been written into the buffer so far
- element size
-
A positive integer representing the number of bytes that can be written into the buffer at a time, using views of the type described by the view constructor
- view constructor
-
A typed array constructor or
%DataView%
, which will be used for constructing a view with which to write into the buffer - reader type
-
Either "
default
" or "byob
", indicating what type of readable stream reader initiated this request, or "none
" if the initiating reader was released
4.7.3. Methods and properties
-
byobRequest = controller .
byobRequest
-
Returns the current BYOB pull request, or null if there isn’t one.
-
desiredSize = controller .
desiredSize
-
Returns the desired size to fill the controlled stream’s internal queue . It can be negative, if the queue is over-full. An underlying byte source ought to use this information to determine when and how to apply backpressure .
-
controller .
close
() -
Closes the controlled readable stream. Consumers will still be able to read any previously-enqueued chunks from the stream, but once those are read, the stream will become closed.
-
controller .
enqueue
( chunk ) -
Enqueues the given chunk chunk in the controlled readable stream. The chunk has to be an
ArrayBufferView
instance, or else aTypeError
will be thrown. -
controller .
error
( e ) -
Errors the controlled readable stream, making all future interactions with it fail with the given error e .
byobRequest
getter
steps
are:
-
Return ! ReadableByteStreamControllerGetBYOBRequest ( this ).
desiredSize
getter
steps
are:
-
Return ! ReadableByteStreamControllerGetDesiredSize ( this ).
close()
method
steps
are:
-
If this . [[closeRequested]] is true, throw a
TypeError
exception. -
If this . [[stream]] . [[state]] is not "
readable
", throw aTypeError
exception. -
Perform ? ReadableByteStreamControllerClose ( this ).
enqueue(
chunk
)
method
steps
are:
-
If chunk .[[ByteLength]] is 0, throw a
TypeError
exception. -
If chunk .[[ViewedArrayBuffer]].[[ArrayBufferByteLength]] is 0, throw a
TypeError
exception. -
If this . [[closeRequested]] is true, throw a
TypeError
exception. -
If this . [[stream]] . [[state]] is not "
readable
", throw aTypeError
exception. -
Return ? ReadableByteStreamControllerEnqueue ( this , chunk ).
error(
e
)
method
steps
are:
-
Perform ! ReadableByteStreamControllerError ( this , e ).
4.7.4. Internal methods
The
following
are
internal
methods
implemented
by
each
ReadableByteStreamController
instance.
The
readable
stream
implementation
will
polymorphically
call
to
either
these,
or
to
their
counterparts
for
default
controllers,
as
discussed
in
§ 4.9.2
Interfacing
with
controllers
.
-
Perform ! ReadableByteStreamControllerClearPendingPullIntos ( this ).
-
Perform ! ResetQueue ( this ).
-
Let result be the result of performing this . [[cancelAlgorithm]] , passing in reason .
-
Perform ! ReadableByteStreamControllerClearAlgorithms ( this ).
-
Return result .
-
Let stream be this . [[stream]] .
-
Assert: ! ReadableStreamHasDefaultReader ( stream ) is true.
-
If this . [[queueTotalSize]] > 0,
-
Assert: ! ReadableStreamGetNumReadRequests ( stream ) is 0.
-
Perform ! ReadableByteStreamControllerFillReadRequestFromQueue ( this , readRequest ).
-
Return.
-
-
Let autoAllocateChunkSize be this . [[autoAllocateChunkSize]] .
-
If autoAllocateChunkSize is not undefined,
-
Let buffer be Construct (
%ArrayBuffer%
, « autoAllocateChunkSize »). -
If buffer is an abrupt completion,
-
Perform readRequest ’s error steps , given buffer .[[Value]].
-
Return.
-
-
Let pullIntoDescriptor be a new pull-into descriptor with buffer buffer .[[Value]], buffer byte length autoAllocateChunkSize , byte offset 0, byte length autoAllocateChunkSize , bytes filled 0, element size 1, view constructor
%Uint8Array%
, and reader type "default
". -
Append pullIntoDescriptor to this . [[pendingPullIntos]] .
-
-
Perform ! ReadableStreamAddReadRequest ( stream , readRequest ).
-
Perform ! ReadableByteStreamControllerCallPullIfNeeded ( this ).
-
If this . [[pendingPullIntos]] is not empty ,
-
Let firstPendingPullInto be this . [[pendingPullIntos]] [0].
-
Set firstPendingPullInto ’s reader type to "
none
". -
Set this . [[pendingPullIntos]] to the list « firstPendingPullInto ».
-
4.8.
The
ReadableStreamBYOBRequest
class
The
ReadableStreamBYOBRequest
class
represents
a
pull-into
request
in
a
ReadableByteStreamController
.
4.8.1. Interface definition
The
Web
IDL
definition
for
the
ReadableStreamBYOBRequest
class
is
given
as
follows:
[Exposed=*]interface {
ReadableStreamBYOBRequest readonly attribute ArrayBufferView ?view ;undefined respond ([EnforceRange ]unsigned long long );
bytesWritten undefined respondWithNewView (ArrayBufferView ); };
view
4.8.2. Internal slots
Instances
of
ReadableStreamBYOBRequest
are
created
with
the
internal
slots
described
in
the
following
table:
Internal Slot | Description ( non-normative ) |
---|---|
[[controller]] |
The
parent
ReadableByteStreamController
instance
|
[[view]] | A typed array representing the destination region to which the controller can write generated data, or null after the BYOB request has been invalidated. |
4.8.3. Methods and properties
-
view = byobRequest .
view
-
Returns the view for writing in to, or null if the BYOB request has already been responded to.
-
byobRequest .
respond
( bytesWritten ) -
Indicates to the associated readable byte stream that bytesWritten bytes were written into
view
, causing the result be surfaced to the consumer .After this method is called,
view
will be transferred and no longer modifiable. -
byobRequest .
respondWithNewView
( view ) -
Indicates to the associated readable byte stream that instead of writing into
view
, the underlying byte source is providing a newArrayBufferView
, which will be given to the consumer of the readable byte stream .The new view has to be a view onto the same backing memory region as
view
, i.e. its buffer has to equal (or be a transferred version of)view
's buffer. ItsbyteOffset
has to equalview
'sbyteOffset
, and itsbyteLength
(representing the number of bytes written) has to be less than or equal to that ofview
.After this method is called, view will be transferred and no longer modifiable.
respond(
bytesWritten
)
method
steps
are:
-
If this . [[controller]] is undefined, throw a
TypeError
exception. -
If ! IsDetachedBuffer ( this . [[view]] .[[ArrayBuffer]]) is true, throw a
TypeError
exception. -
Assert: this . [[view]] .[[ViewedArrayBuffer]].[[ByteLength]] > 0.
-
Perform ? ReadableByteStreamControllerRespond ( this . [[controller]] , bytesWritten ).
respondWithNewView(
view
)
method
steps
are:
-
If this . [[controller]] is undefined, throw a
TypeError
exception. -
If ! IsDetachedBuffer ( view .[[ViewedArrayBuffer]]) is true, throw a
TypeError
exception. -
Return ? ReadableByteStreamControllerRespondWithNewView ( this . [[controller]] , view ).
4.9. Abstract operations
4.9.1. Working with readable streams
The
following
abstract
operations
operate
on
ReadableStream
instances
at
a
higher
level.
-
Let reader be a new
ReadableStreamBYOBReader
. -
Perform ? SetUpReadableStreamBYOBReader ( reader , stream ).
-
Return reader .
-
Let reader be a new
ReadableStreamDefaultReader
. -
Perform ? SetUpReadableStreamDefaultReader ( reader , stream ).
-
Return reader .
-
If highWaterMark was not passed, set it to 1.
-
If sizeAlgorithm was not passed, set it to an algorithm that returns 1.
-
Assert: ! IsNonNegativeNumber ( highWaterMark ) is true.
-
Let stream be a new
ReadableStream
. -
Perform ! InitializeReadableStream ( stream ).
-
Let controller be a new
ReadableStreamDefaultController
. -
Perform ? SetUpReadableStreamDefaultController ( stream , controller , startAlgorithm , pullAlgorithm , cancelAlgorithm , highWaterMark , sizeAlgorithm ).
-
Return stream .
This abstract operation will throw an exception if and only if the supplied startAlgorithm throws.
-
Let stream be a new
ReadableStream
. -
Perform ! InitializeReadableStream ( stream ).
-
Let controller be a new
ReadableByteStreamController
. -
Perform ? SetUpReadableByteStreamController ( stream , controller , startAlgorithm , pullAlgorithm , cancelAlgorithm , 0, undefined).
-
Return stream .
This abstract operation will throw an exception if and only if the supplied startAlgorithm throws.
-
Set stream . [[state]] to "
readable
". -
Set stream . [[reader]] and stream . [[storedError]] to undefined.
-
Set stream . [[disturbed]] to false.
-
If stream . [[reader]] is undefined, return false.
-
Return true.
-
Let stream be undefined.
-
Let
iteratorRecord be ? GetIterator ( asyncIterable , async). LetstartAlgorithm be an algorithm that returns undefined. -
Let pullAlgorithm be the following steps:
-
Let
nextResult be IteratorNext ( iteratorRecord ). If nextResult is an abrupt completion, return a promise rejected with nextResult .[[Value]]. LetnextPromise bea promise resolved withthe result of get an async iterable next value fromnextResult .[[Value]].asyncIterable . -
Return the result of reacting to nextPromise
with the following fulfillment steps, given iterResult:-
If
Type ( iterResult ) is not Object, throw a TypeError . Letdonenextbe ? IteratorComplete (was fulfilled with valueiterResult ).v :-
If
donev istrue: Performend of iteration , perform ! ReadableStreamDefaultControllerClose ( stream . [[controller]] ).Otherwise: Let value be ? IteratorValue ( iterResult ). -
PerformOtherwise, perform ! ReadableStreamDefaultControllerEnqueue ( stream . [[controller]] ,valuev ).
-
-
LetIfcancelAlgorithmnextbe the following steps, givenwas rejected with reason: Let iterator beiteratorRecord .[[Iterator]]. Let returnMethod be GetMethodr , perform ! ReadableStreamDefaultControllerError (iterator , " return "). Ifstream . [[controller]] ,returnMethodris an abrupt completion, return a promise rejected with returnMethod .[[Value]]. If returnMethod .[[Value]] is undefined, return a promise resolved with undefined.).
-
-
-
Let
returnResultcancelAlgorithm beCall ( returnMethod .[[Value]], iterator , «the following steps, given reason»). If returnResult is an abrupt completion, return a promise rejected with returnResult .[[Value]].:-
Let
returnPromisefinishPromise bea promise resolved with returnResult .[[Value]]. Returnthe result ofreactingclose an async iterabletoreturnPromiseasyncIterable withthe following fulfillment steps, given iterResult : If Type (reasoniterResult ) is not Object, throw a TypeError .reason . -
Return
undefined.finishPromise .
-
-
Set stream to ! CreateReadableStream ( startAlgorithm , pullAlgorithm , cancelAlgorithm , 0).
-
Return stream .
-
Assert: source implements
ReadableStream
. -
Assert: dest implements
WritableStream
. -
Assert: preventClose , preventAbort , and preventCancel are all booleans.
-
If signal was not given, let signal be undefined.
-
Assert: either signal is undefined, or signal implements
AbortSignal
. -
Assert: ! IsReadableStreamLocked ( source ) is false.
-
Assert: ! IsWritableStreamLocked ( dest ) is false.
-
If source . [[controller]] implements
ReadableByteStreamController
, let reader be either ! AcquireReadableStreamBYOBReader ( source ) or ! AcquireReadableStreamDefaultReader ( source ), at the user agent’s discretion. -
Otherwise, let reader be ! AcquireReadableStreamDefaultReader ( source ).
-
Let writer be ! AcquireWritableStreamDefaultWriter ( dest ).
-
Set source . [[disturbed]] to true.
-
Let shuttingDown be false.
-
Let promise be a new promise .
-
If signal is not undefined,
-
Let abortAlgorithm be the following steps:
-
Let error be signal ’s abort reason .
-
Let actions be an empty ordered set .
-
If preventAbort is false, append the following action to actions :
-
If dest . [[state]] is "
writable
", return ! WritableStreamAbort ( dest , error ). -
Otherwise, return a promise resolved with undefined.
-
-
If preventCancel is false, append the following action action to actions :
-
If source . [[state]] is "
readable
", return ! ReadableStreamCancel ( source , error ). -
Otherwise, return a promise resolved with undefined.
-
-
Shutdown with an action consisting of getting a promise to wait for all of the actions in actions , and with error .
-
-
If signal is aborted , perform abortAlgorithm and return promise .
-
Add abortAlgorithm to signal .
-
-
In parallel but not really; see #905 , using reader and writer , read all chunks from source and write them to dest . Due to the locking provided by the reader and writer, the exact manner in which this happens is not observable to author code, and so there is flexibility in how this is done. The following constraints apply regardless of the exact algorithm used:
-
Public API must not be used: while reading or writing, or performing any of the operations below, the JavaScript-modifiable reader, writer, and stream APIs (i.e. methods on the appropriate prototypes) must not be used. Instead, the streams must be manipulated directly.
-
Backpressure must be enforced:
-
While WritableStreamDefaultWriterGetDesiredSize ( writer ) is ≤ 0 or is null, the user agent must not read from reader .
-
If reader is a BYOB reader , WritableStreamDefaultWriterGetDesiredSize ( writer ) should be used as a basis to determine the size of the chunks read from reader .
It’s frequently inefficient to read chunks that are too small or too large. Other information might be factored in to determine the optimal chunk size.
-
Reads or writes should not be delayed for reasons other than these backpressure signals.
An implementation that waits for each write to successfully complete before proceeding to the next read/write operation violates this recommendation. In doing so, such an implementation makes the internal queue of dest useless, as it ensures dest always contains at most one queued chunk .
-
-
Shutdown must stop activity: if shuttingDown becomes true, the user agent must not initiate further reads from reader , and must only perform writes of already-read chunks , as described below. In particular, the user agent must check the below conditions before performing any reads or writes, since they might lead to immediate shutdown.
-
Error and close states must be propagated: the following conditions must be applied in order.
-
Errors must be propagated forward: if source . [[state]] is or becomes "
errored
", then-
If preventAbort is false, shutdown with an action of ! WritableStreamAbort ( dest , source . [[storedError]] ) and with source . [[storedError]] .
-
Otherwise, shutdown with source . [[storedError]] .
-
-
Errors must be propagated backward: if dest . [[state]] is or becomes "
errored
", then-
If preventCancel is false, shutdown with an action of ! ReadableStreamCancel ( source , dest . [[storedError]] ) and with dest . [[storedError]] .
-
Otherwise, shutdown with dest . [[storedError]] .
-
-
Closing must be propagated forward: if source . [[state]] is or becomes "
closed
", then-
If preventClose is false, shutdown with an action of ! WritableStreamDefaultWriterCloseWithErrorPropagation ( writer ).
-
Otherwise, shutdown .
-
-
Closing must be propagated backward: if ! WritableStreamCloseQueuedOrInFlight ( dest ) is true or dest . [[state]] is "
closed
", then-
Assert: no chunks have been read or written.
-
Let destClosed be a new
TypeError
. -
If preventCancel is false, shutdown with an action of ! ReadableStreamCancel ( source , destClosed ) and with destClosed .
-
Otherwise, shutdown with destClosed .
-
-
-
Shutdown with an action : if any of the above requirements ask to shutdown with an action action , optionally with an error originalError , then:
-
If shuttingDown is true, abort these substeps.
-
Set shuttingDown to true.
-
If dest . [[state]] is "
writable
" and ! WritableStreamCloseQueuedOrInFlight ( dest ) is false, -
Let p be the result of performing action .
-
Upon fulfillment of p , finalize , passing along originalError if it was given.
-
Upon rejection of p with reason newError , finalize with newError .
-
-
Shutdown : if any of the above requirements or steps ask to shutdown, optionally with an error error , then:
-
If shuttingDown is true, abort these substeps.
-
Set shuttingDown to true.
-
If dest . [[state]] is "
writable
" and ! WritableStreamCloseQueuedOrInFlight ( dest ) is false, -
Finalize , passing along error if it was given.
-
-
Finalize : both forms of shutdown will eventually ask to finalize, optionally with an error error , which means to perform the following steps:
-
Perform ! WritableStreamDefaultWriterRelease ( writer ).
-
If reader implements
ReadableStreamBYOBReader
, perform ! ReadableStreamBYOBReaderRelease ( reader ). -
Otherwise, perform ! ReadableStreamDefaultReaderRelease ( reader ).
-
If signal is not undefined, remove abortAlgorithm from signal .
-
If error was given, reject promise with error .
-
Otherwise, resolve promise with undefined.
-
-
-
Return promise .
Various abstract operations performed here include object creation (often of promises), which usually would require specifying a realm for the created object. However, because of the locking, none of these objects can be observed by author code. As such, the realm used to create them does not matter.
The second argument, cloneForBranch2 , governs whether or not the data from the original stream will be cloned (using HTML’s serializable objects framework) before appearing in the second of the returned branches. This is useful for scenarios where both branches are to be consumed in such a way that they might otherwise interfere with each other, such as by transferring their chunks . However, it does introduce a noticeable asymmetry between the two branches, and limits the possible chunks to serializable ones. [HTML]
If stream is a readable byte stream , then cloneForBranch2 is ignored and chunks are cloned unconditionally.
In this standard ReadableStreamTee is always called with cloneForBranch2 set to false; other specifications pass true via the tee wrapper algorithm.
It performs the following steps:
-
Assert: stream implements
ReadableStream
. -
Assert: cloneForBranch2 is a boolean.
-
If stream . [[controller]] implements
ReadableByteStreamController
, return ? ReadableByteStreamTee ( stream ). -
Return ? ReadableStreamDefaultTee ( stream , cloneForBranch2 ).
-
Assert: stream implements
ReadableStream
. -
Assert: cloneForBranch2 is a boolean.
-
Let reader be ? AcquireReadableStreamDefaultReader ( stream ).
-
Let reading be false.
-
Let readAgain be false.
-
Let canceled1 be false.
-
Let canceled2 be false.
-
Let reason1 be undefined.
-
Let reason2 be undefined.
-
Let branch1 be undefined.
-
Let branch2 be undefined.
-
Let cancelPromise be a new promise .
-
Let pullAlgorithm be the following steps:
-
If reading is true,
-
Set readAgain to true.
-
Return a promise resolved with undefined.
-
-
Set reading to true.
-
Let readRequest be a read request with the following items :
- chunk steps , given chunk
-
-
Queue a microtask to perform the following steps:
-
Set readAgain to false.
-
Let chunk1 and chunk2 be chunk .
-
If canceled2 is false and cloneForBranch2 is true,
-
Let cloneResult be StructuredClone ( chunk2 ).
-
If cloneResult is an abrupt completion,
-
Perform ! ReadableStreamDefaultControllerError ( branch1 . [[controller]] , cloneResult .[[Value]]).
-
Perform ! ReadableStreamDefaultControllerError ( branch2 . [[controller]] , cloneResult .[[Value]]).
-
Resolve cancelPromise with ! ReadableStreamCancel ( stream , cloneResult .[[Value]]).
-
Return.
-
-
Otherwise, set chunk2 to cloneResult .[[Value]].
-
-
If canceled1 is false, perform ! ReadableStreamDefaultControllerEnqueue ( branch1 . [[controller]] , chunk1 ).
-
If canceled2 is false, perform ! ReadableStreamDefaultControllerEnqueue ( branch2 . [[controller]] , chunk2 ).
-
Set reading to false.
-
If readAgain is true, perform pullAlgorithm .
-
The microtask delay here is necessary because it takes at least a microtask to detect errors, when we use reader . [[closedPromise]] below. We want errors in stream to error both branches immediately, so we cannot let successful synchronously-available reads happen ahead of asynchronously-available errors.
-
- close steps
-
-
Set reading to false.
-
If canceled1 is false, perform ! ReadableStreamDefaultControllerClose ( branch1 . [[controller]] ).
-
If canceled2 is false, perform ! ReadableStreamDefaultControllerClose ( branch2 . [[controller]] ).
-
If canceled1 is false or canceled2 is false, resolve cancelPromise with undefined.
-
- error steps
-
-
Set reading to false.
-
-
Perform ! ReadableStreamDefaultReaderRead ( reader , readRequest ).
-
Return a promise resolved with undefined.
-
-
Let cancel1Algorithm be the following steps, taking a reason argument:
-
Set canceled1 to true.
-
Set reason1 to reason .
-
If canceled2 is true,
-
Let compositeReason be ! CreateArrayFromList (« reason1 , reason2 »).
-
Let cancelResult be ! ReadableStreamCancel ( stream , compositeReason ).
-
Resolve cancelPromise with cancelResult .
-
-
Return cancelPromise .
-
-
Let cancel2Algorithm be the following steps, taking a reason argument:
-
Set canceled2 to true.
-
Set reason2 to reason .
-
If canceled1 is true,
-
Let compositeReason be ! CreateArrayFromList (« reason1 , reason2 »).
-
Let cancelResult be ! ReadableStreamCancel ( stream , compositeReason ).
-
Resolve cancelPromise with cancelResult .
-
-
Return cancelPromise .
-
-
Let startAlgorithm be an algorithm that returns undefined.
-
Set branch1 to ! CreateReadableStream ( startAlgorithm , pullAlgorithm , cancel1Algorithm ).
-
Set branch2 to ! CreateReadableStream ( startAlgorithm , pullAlgorithm , cancel2Algorithm ).
-
Upon rejection of reader . [[closedPromise]] with reason r ,
-
Perform ! ReadableStreamDefaultControllerError ( branch1 . [[controller]] , r ).
-
Perform ! ReadableStreamDefaultControllerError ( branch2 . [[controller]] , r ).
-
If canceled1 is false or canceled2 is false, resolve cancelPromise with undefined.
-
-
Return « branch1 , branch2 ».
-
Assert: stream implements
ReadableStream
. -
Assert: stream . [[controller]] implements
ReadableByteStreamController
. -
Let reader be ? AcquireReadableStreamDefaultReader ( stream ).
-
Let reading be false.
-
Let readAgainForBranch1 be false.
-
Let readAgainForBranch2 be false.
-
Let canceled1 be false.
-
Let canceled2 be false.
-
Let reason1 be undefined.
-
Let reason2 be undefined.
-
Let branch1 be undefined.
-
Let branch2 be undefined.
-
Let cancelPromise be a new promise .
-
Let forwardReaderError be the following steps, taking a thisReader argument:
-
Upon rejection of thisReader . [[closedPromise]] with reason r ,
-
If thisReader is not reader , return.
-
Perform ! ReadableByteStreamControllerError ( branch1 . [[controller]] , r ).
-
Perform ! ReadableByteStreamControllerError ( branch2 . [[controller]] , r ).
-
If canceled1 is false or canceled2 is false, resolve cancelPromise with undefined.
-
-
-
Let pullWithDefaultReader be the following steps:
-
If reader implements
ReadableStreamBYOBReader
,-
Assert: reader . [[readIntoRequests]] is empty .
-
Perform ! ReadableStreamBYOBReaderRelease ( reader ).
-
Set reader to ! AcquireReadableStreamDefaultReader ( stream ).
-
Perform forwardReaderError , given reader .
-
-
Let readRequest be a read request with the following items :
- chunk steps , given chunk
-
-
Queue a microtask to perform the following steps:
-
Set readAgainForBranch1 to false.
-
Set readAgainForBranch2 to false.
-
Let chunk1 and chunk2 be chunk .
-
If canceled1 is false and canceled2 is false,
-
Let cloneResult be CloneAsUint8Array ( chunk ).
-
If cloneResult is an abrupt completion,
-
Perform ! ReadableByteStreamControllerError ( branch1 . [[controller]] , cloneResult .[[Value]]).
-
Perform ! ReadableByteStreamControllerError ( branch2 . [[controller]] , cloneResult .[[Value]]).
-
Resolve cancelPromise with ! ReadableStreamCancel ( stream , cloneResult .[[Value]]).
-
Return.
-
-
Otherwise, set chunk2 to cloneResult .[[Value]].
-
-
If canceled1 is false, perform ! ReadableByteStreamControllerEnqueue ( branch1 . [[controller]] , chunk1 ).
-
If canceled2 is false, perform ! ReadableByteStreamControllerEnqueue ( branch2 . [[controller]] , chunk2 ).
-
Set reading to false.
-
If readAgainForBranch1 is true, perform pull1Algorithm .
-
Otherwise, if readAgainForBranch2 is true, perform pull2Algorithm .
-
The microtask delay here is necessary because it takes at least a microtask to detect errors, when we use reader . [[closedPromise]] below. We want errors in stream to error both branches immediately, so we cannot let successful synchronously-available reads happen ahead of asynchronously-available errors.
-
- close steps
-
-
Set reading to false.
-
If canceled1 is false, perform ! ReadableByteStreamControllerClose ( branch1 . [[controller]] ).
-
If canceled2 is false, perform ! ReadableByteStreamControllerClose ( branch2 . [[controller]] ).
-
If branch1 . [[controller]] . [[pendingPullIntos]] is not empty , perform ! ReadableByteStreamControllerRespond ( branch1 . [[controller]] , 0).
-
If branch2 . [[controller]] . [[pendingPullIntos]] is not empty , perform ! ReadableByteStreamControllerRespond ( branch2 . [[controller]] , 0).
-
If canceled1 is false or canceled2 is false, resolve cancelPromise with undefined.
-
- error steps
-
-
Set reading to false.
-
-
Perform ! ReadableStreamDefaultReaderRead ( reader , readRequest ).
-
-
Let pullWithBYOBReader be the following steps, given view and forBranch2 :
-
If reader implements
ReadableStreamDefaultReader
,-
Assert: reader . [[readRequests]] is empty .
-
Perform ! ReadableStreamDefaultReaderRelease ( reader ).
-
Set reader to ! AcquireReadableStreamBYOBReader ( stream ).
-
Perform forwardReaderError , given reader .
-
-
Let byobBranch be branch2 if forBranch2 is true, and branch1 otherwise.
-
Let otherBranch be branch2 if forBranch2 is false, and branch1 otherwise.
-
Let readIntoRequest be a read-into request with the following items :
- chunk steps , given chunk
-
-
Queue a microtask to perform the following steps:
-
Set readAgainForBranch1 to false.
-
Set readAgainForBranch2 to false.
-
Let byobCanceled be canceled2 if forBranch2 is true, and canceled1 otherwise.
-
Let otherCanceled be canceled2 if forBranch2 is false, and canceled1 otherwise.
-
If otherCanceled is false,
-
Let cloneResult be CloneAsUint8Array ( chunk ).
-
If cloneResult is an abrupt completion,
-
Perform ! ReadableByteStreamControllerError ( byobBranch . [[controller]] , cloneResult .[[Value]]).
-
Perform ! ReadableByteStreamControllerError ( otherBranch . [[controller]] , cloneResult .[[Value]]).
-
Resolve cancelPromise with ! ReadableStreamCancel ( stream , cloneResult .[[Value]]).
-
Return.
-
-
Otherwise, let clonedChunk be cloneResult .[[Value]].
-
If byobCanceled is false, perform ! ReadableByteStreamControllerRespondWithNewView ( byobBranch . [[controller]] , chunk ).
-
Perform ! ReadableByteStreamControllerEnqueue ( otherBranch . [[controller]] , clonedChunk ).
-
-
Otherwise, if byobCanceled is false, perform ! ReadableByteStreamControllerRespondWithNewView ( byobBranch . [[controller]] , chunk ).
-
Set reading to false.
-
If readAgainForBranch1 is true, perform pull1Algorithm .
-
Otherwise, if readAgainForBranch2 is true, perform pull2Algorithm .
-
The microtask delay here is necessary because it takes at least a microtask to detect errors, when we use reader . [[closedPromise]] below. We want errors in stream to error both branches immediately, so we cannot let successful synchronously-available reads happen ahead of asynchronously-available errors.
-
- close steps , given chunk
-
-
Set reading to false.
-
Let byobCanceled be canceled2 if forBranch2 is true, and canceled1 otherwise.
-
Let otherCanceled be canceled2 if forBranch2 is false, and canceled1 otherwise.
-
If byobCanceled is false, perform ! ReadableByteStreamControllerClose ( byobBranch . [[controller]] ).
-
If otherCanceled is false, perform ! ReadableByteStreamControllerClose ( otherBranch . [[controller]] ).
-
If chunk is not undefined,
-
Assert: chunk .[[ByteLength]] is 0.
-
If byobCanceled is false, perform ! ReadableByteStreamControllerRespondWithNewView ( byobBranch . [[controller]] , chunk ).
-
If otherCanceled is false and otherBranch . [[controller]] . [[pendingPullIntos]] is not empty , perform ! ReadableByteStreamControllerRespond ( otherBranch . [[controller]] , 0).
-
-
If byobCanceled is false or otherCanceled is false, resolve cancelPromise with undefined.
-
- error steps
-
-
Set reading to false.
-
-
Perform ! ReadableStreamBYOBReaderRead ( reader , view , readIntoRequest ).
-
-
Let pull1Algorithm be the following steps:
-
If reading is true,
-
Set readAgainForBranch1 to true.
-
Return a promise resolved with undefined.
-
-
Set reading to true.
-
Let byobRequest be ! ReadableByteStreamControllerGetBYOBRequest ( branch1 . [[controller]] ).
-
If byobRequest is null, perform pullWithDefaultReader .
-
Otherwise, perform pullWithBYOBReader , given byobRequest . [[view]] and false.
-
Return a promise resolved with undefined.
-
-
Let pull2Algorithm be the following steps:
-
If reading is true,
-
Set readAgainForBranch2 to true.
-
Return a promise resolved with undefined.
-
-
Set reading to true.
-
Let byobRequest be ! ReadableByteStreamControllerGetBYOBRequest ( branch2 . [[controller]] ).
-
If byobRequest is null, perform pullWithDefaultReader .
-
Otherwise, perform pullWithBYOBReader , given byobRequest . [[view]] and true.
-
Return a promise resolved with undefined.
-
-
Let cancel1Algorithm be the following steps, taking a reason argument:
-
Set canceled1 to true.
-
Set reason1 to reason .
-
If canceled2 is true,
-
Let compositeReason be ! CreateArrayFromList (« reason1 , reason2 »).
-
Let cancelResult be ! ReadableStreamCancel ( stream , compositeReason ).
-
Resolve cancelPromise with cancelResult .
-
-
Return cancelPromise .
-
-
Let cancel2Algorithm be the following steps, taking a reason argument:
-
Set canceled2 to true.
-
Set reason2 to reason .
-
If canceled1 is true,
-
Let compositeReason be ! CreateArrayFromList (« reason1 , reason2 »).
-
Let cancelResult be ! ReadableStreamCancel ( stream , compositeReason ).
-
Resolve cancelPromise with cancelResult .
-
-
Return cancelPromise .
-
-
Let startAlgorithm be an algorithm that returns undefined.
-
Set branch1 to ! CreateReadableByteStream ( startAlgorithm , pull1Algorithm , cancel1Algorithm ).
-
Set branch2 to ! CreateReadableByteStream ( startAlgorithm , pull2Algorithm , cancel2Algorithm ).
-
Perform forwardReaderError , given reader .
-
Return « branch1 , branch2 ».
4.9.2. Interfacing with controllers
In
terms
of
specification
factoring,
the
way
that
the
ReadableStream
class
encapsulates
the
behavior
of
both
simple
readable
streams
and
readable
byte
streams
into
a
single
class
is
by
centralizing
most
of
the
potentially-varying
logic
inside
the
two
controller
classes,
ReadableStreamDefaultController
and
ReadableByteStreamController
.
Those
classes
define
most
of
the
stateful
internal
slots
and
abstract
operations
for
how
a
stream’s
internal
queue
is
managed
and
how
it
interfaces
with
its
underlying
source
or
underlying
byte
source
.
Each
controller
class
defines
three
internal
methods,
which
are
called
by
the
ReadableStream
algorithms:
- [[CancelSteps]]( reason )
- The controller’s steps that run in reaction to the stream being canceled , used to clean up the state stored in the controller and inform the underlying source .
- [[PullSteps]]( readRequest )
- The controller’s steps that run when a default reader is read from, used to pull from the controller any queued chunks , or pull from the underlying source to get more chunks.
- [[ReleaseSteps]]()
- The controller’s steps that run when a reader is released , used to clean up reader-specific resources stored in the controller.
(These
are
defined
as
internal
methods,
instead
of
as
abstract
operations,
so
that
they
can
be
called
polymorphically
by
the
ReadableStream
algorithms,
without
having
to
branch
on
which
type
of
controller
is
present.)
The
rest
of
this
section
concerns
abstract
operations
that
go
in
the
other
direction:
they
are
used
by
the
controller
implementations
to
affect
their
associated
ReadableStream
object.
This
translates
internal
state
changes
of
the
controller
into
developer-facing
results
visible
through
the
ReadableStream
's
public
API.
-
Assert: stream . [[reader]] implements
ReadableStreamBYOBReader
. -
Assert: stream . [[state]] is "
readable
" or "closed
". -
Append readRequest to stream . [[reader]] . [[readIntoRequests]] .
-
Assert: stream . [[reader]] implements
ReadableStreamDefaultReader
. -
Assert: stream . [[state]] is "
readable
". -
Append readRequest to stream . [[reader]] . [[readRequests]] .
-
Set stream . [[disturbed]] to true.
-
If stream . [[state]] is "
closed
", return a promise resolved with undefined. -
If stream . [[state]] is "
errored
", return a promise rejected with stream . [[storedError]] . -
Perform ! ReadableStreamClose ( stream ).
-
Let reader be stream . [[reader]] .
-
If reader is not undefined and reader implements
ReadableStreamBYOBReader
,-
Let readIntoRequests be reader . [[readIntoRequests]] .
-
Set reader . [[readIntoRequests]] to an empty list .
-
For each readIntoRequest of readIntoRequests ,
-
Perform readIntoRequest ’s close steps , given undefined.
-
-
-
Let sourceCancelPromise be ! stream . [[controller]] . [[CancelSteps]] ( reason ).
-
Return the result of reacting to sourceCancelPromise with a fulfillment step that returns undefined.
-
Assert: stream . [[state]] is "
readable
". -
Set stream . [[state]] to "
closed
". -
Let reader be stream . [[reader]] .
-
If reader is undefined, return.
-
Resolve reader . [[closedPromise]] with undefined.
-
If reader implements
ReadableStreamDefaultReader
,-
Let readRequests be reader . [[readRequests]] .
-
Set reader . [[readRequests]] to an empty list .
-
For each readRequest of readRequests ,
-
Perform readRequest ’s close steps .
-
-
-
Assert: stream . [[state]] is "
readable
". -
Set stream . [[state]] to "
errored
". -
Set stream . [[storedError]] to e .
-
Let reader be stream . [[reader]] .
-
If reader is undefined, return.
-
Reject reader . [[closedPromise]] with e .
-
Set reader . [[closedPromise]] .[[PromiseIsHandled]] to true.
-
If reader implements
ReadableStreamDefaultReader
,-
Perform ! ReadableStreamDefaultReaderErrorReadRequests ( reader , e ).
-
-
Otherwise,
-
Assert: reader implements
ReadableStreamBYOBReader
. -
Perform ! ReadableStreamBYOBReaderErrorReadIntoRequests ( reader , e ).
-
-
Assert: ! ReadableStreamHasBYOBReader ( stream ) is true.
-
Let reader be stream . [[reader]] .
-
Assert: reader . [[readIntoRequests]] is not empty .
-
Let readIntoRequest be reader . [[readIntoRequests]] [0].
-
Remove readIntoRequest from reader . [[readIntoRequests]] .
-
If done is true, perform readIntoRequest ’s close steps , given chunk .
-
Otherwise, perform readIntoRequest ’s chunk steps , given chunk .
-
Assert: ! ReadableStreamHasDefaultReader ( stream ) is true.
-
Let reader be stream . [[reader]] .
-
Assert: reader . [[readRequests]] is not empty .
-
Let readRequest be reader . [[readRequests]] [0].
-
Remove readRequest from reader . [[readRequests]] .
-
If done is true, perform readRequest ’s close steps .
-
Otherwise, perform readRequest ’s chunk steps , given chunk .
-
Assert: ! ReadableStreamHasBYOBReader ( stream ) is true.
-
Return stream . [[reader]] . [[readIntoRequests]] 's size .
-
Assert: ! ReadableStreamHasDefaultReader ( stream ) is true.
-
Return stream . [[reader]] . [[readRequests]] 's size .
-
Let reader be stream . [[reader]] .
-
If reader is undefined, return false.
-
If reader implements
ReadableStreamBYOBReader
, return true. -
Return false.
-
Let reader be stream . [[reader]] .
-
If reader is undefined, return false.
-
If reader implements
ReadableStreamDefaultReader
, return true. -
Return false.
4.9.3. Readers
The
following
abstract
operations
support
the
implementation
and
manipulation
of
ReadableStreamDefaultReader
and
ReadableStreamBYOBReader
instances.
-
Let stream be reader . [[stream]] .
-
Assert: stream is not undefined.
-
Return ! ReadableStreamCancel ( stream , reason ).
-
Set reader . [[stream]] to stream .
-
Set stream . [[reader]] to reader .
-
If stream . [[state]] is "
readable
",-
Set reader . [[closedPromise]] to a new promise .
-
-
Otherwise, if stream . [[state]] is "
closed
",-
Set reader . [[closedPromise]] to a promise resolved with undefined.
-
-
Otherwise,
-
Assert: stream . [[state]] is "
errored
". -
Set reader . [[closedPromise]] to a promise rejected with stream . [[storedError]] .
-
Set reader . [[closedPromise]] .[[PromiseIsHandled]] to true.
-
-
Let stream be reader . [[stream]] .
-
Assert: stream is not undefined.
-
Assert: stream . [[reader]] is reader .
-
If stream . [[state]] is "
readable
", reject reader . [[closedPromise]] with aTypeError
exception. -
Otherwise, set reader . [[closedPromise]] to a promise rejected with a
TypeError
exception. -
Set reader . [[closedPromise]] .[[PromiseIsHandled]] to true.
-
Perform ! stream . [[controller]] . [[ReleaseSteps]] ().
-
Set stream . [[reader]] to undefined.
-
Set reader . [[stream]] to undefined.
-
Let readIntoRequests be reader . [[readIntoRequests]] .
-
Set reader . [[readIntoRequests]] to a new empty list .
-
For each readIntoRequest of readIntoRequests ,
-
Perform readIntoRequest ’s error steps , given e .
-
-
Let stream be reader . [[stream]] .
-
Assert: stream is not undefined.
-
Set stream . [[disturbed]] to true.
-
If stream . [[state]] is "
errored
", perform readIntoRequest ’s error steps given stream . [[storedError]] . -
Otherwise, perform ! ReadableByteStreamControllerPullInto ( stream . [[controller]] , view , readIntoRequest ).
-
Perform ! ReadableStreamReaderGenericRelease ( reader ).
-
Let e be a new
TypeError
exception. -
Perform ! ReadableStreamBYOBReaderErrorReadIntoRequests ( reader , e ).
-
Let readRequests be reader . [[readRequests]] .
-
Set reader . [[readRequests]] to a new empty list .
-
For each readRequest of readRequests ,
-
Perform readRequest ’s error steps , given e .
-
-
Let stream be reader . [[stream]] .
-
Assert: stream is not undefined.
-
Set stream . [[disturbed]] to true.
-
If stream . [[state]] is "
closed
", perform readRequest ’s close steps . -
Otherwise, if stream . [[state]] is "
errored
", perform readRequest ’s error steps given stream . [[storedError]] . -
Otherwise,
-
Assert: stream . [[state]] is "
readable
". -
Perform ! stream . [[controller]] . [[PullSteps]] ( readRequest ).
-
-
Perform ! ReadableStreamReaderGenericRelease ( reader ).
-
Let e be a new
TypeError
exception. -
Perform ! ReadableStreamDefaultReaderErrorReadRequests ( reader , e ).
-
If ! IsReadableStreamLocked ( stream ) is true, throw a
TypeError
exception. -
If stream . [[controller]] does not implement
ReadableByteStreamController
, throw aTypeError
exception. -
Perform ! ReadableStreamReaderGenericInitialize ( reader , stream ).
-
Set reader . [[readIntoRequests]] to a new empty list .
-
If ! IsReadableStreamLocked ( stream ) is true, throw a
TypeError
exception. -
Perform ! ReadableStreamReaderGenericInitialize ( reader , stream ).
-
Set reader . [[readRequests]] to a new empty list .
4.9.4. Default controllers
The
following
abstract
operations
support
the
implementation
of
the
ReadableStreamDefaultController
class.
-
Let shouldPull be ! ReadableStreamDefaultControllerShouldCallPull ( controller ).
-
If shouldPull is false, return.
-
If controller . [[pulling]] is true,
-
Set controller . [[pullAgain]] to true.
-
Return.
-
-
Assert: controller . [[pullAgain]] is false.
-
Set controller . [[pulling]] to true.
-
Let pullPromise be the result of performing controller . [[pullAlgorithm]] .
-
Upon fulfillment of pullPromise ,
-
Set controller . [[pulling]] to false.
-
If controller . [[pullAgain]] is true,
-
Set controller . [[pullAgain]] to false.
-
Perform ! ReadableStreamDefaultControllerCallPullIfNeeded ( controller ).
-
-
-
Upon rejection of pullPromise with reason e ,
-
Perform ! ReadableStreamDefaultControllerError ( controller , e ).
-
-
Let stream be controller . [[stream]] .
-
If ! ReadableStreamDefaultControllerCanCloseOrEnqueue ( controller ) is false, return false.
-
If controller . [[started]] is false, return false.
-
If ! IsReadableStreamLocked ( stream ) is true and ! ReadableStreamGetNumReadRequests ( stream ) > 0, return true.
-
Let desiredSize be ! ReadableStreamDefaultControllerGetDesiredSize ( controller ).
-
Assert: desiredSize is not null.
-
If desiredSize > 0, return true.
-
Return false.
ReadableStream
itself
is
still
referenced.
This is observable using weak references . See tc39/proposal-weakrefs#31 for more detail.
It performs the following steps:
-
Set controller . [[pullAlgorithm]] to undefined.
-
Set controller . [[cancelAlgorithm]] to undefined.
-
Set controller . [[strategySizeAlgorithm]] to undefined.
-
If ! ReadableStreamDefaultControllerCanCloseOrEnqueue ( controller ) is false, return.
-
Let stream be controller . [[stream]] .
-
Set controller . [[closeRequested]] to true.
-
If controller . [[queue]] is empty ,
-
Perform ! ReadableStreamDefaultControllerClearAlgorithms ( controller ).
-
Perform ! ReadableStreamClose ( stream ).
-
-
If ! ReadableStreamDefaultControllerCanCloseOrEnqueue ( controller ) is false, return.
-
Let stream be controller . [[stream]] .
-
If ! IsReadableStreamLocked ( stream ) is true and ! ReadableStreamGetNumReadRequests ( stream ) > 0, perform ! ReadableStreamFulfillReadRequest ( stream , chunk , false).
-
Otherwise,
-
Let result be the result of performing controller . [[strategySizeAlgorithm]] , passing in chunk , and interpreting the result as a completion record .
-
If result is an abrupt completion,
-
Perform ! ReadableStreamDefaultControllerError ( controller , result .[[Value]]).
-
Return result .
-
-
Let chunkSize be result .[[Value]].
-
Let enqueueResult be EnqueueValueWithSize ( controller , chunk , chunkSize ).
-
If enqueueResult is an abrupt completion,
-
Perform ! ReadableStreamDefaultControllerError ( controller , enqueueResult .[[Value]]).
-
Return enqueueResult .
-
-
-
Perform ! ReadableStreamDefaultControllerCallPullIfNeeded ( controller ).
-
Let stream be controller . [[stream]] .
-
If stream . [[state]] is not "
readable
", return. -
Perform ! ResetQueue ( controller ).
-
Perform ! ReadableStreamDefaultControllerClearAlgorithms ( controller ).
-
Perform ! ReadableStreamError ( stream , e ).
-
Let state be controller . [[stream]] . [[state]] .
-
If state is "
errored
", return null. -
If state is "
closed
", return 0. -
Return controller . [[strategyHWM]] − controller . [[queueTotalSize]] .
TransformStream
.
It
performs
the
following
steps:
-
If ! ReadableStreamDefaultControllerShouldCallPull ( controller ) is true, return false.
-
Otherwise, return true.
-
Let state be controller . [[stream]] . [[state]] .
-
If controller . [[closeRequested]] is false and state is "
readable
", return true. -
Otherwise, return false.
The
case
where
controller
.
[[closeRequested]]
is
false,
but
state
is
not
"
readable
",
happens
when
the
stream
is
errored
via
controller.error()
,
or
when
it
is
closed
without
its
controller’s
controller.close()
method
ever
being
called:
e.g.,
if
the
stream
was
closed
by
a
call
to
stream.cancel()
.
-
Assert: stream . [[controller]] is undefined.
-
Set controller . [[stream]] to stream .
-
Perform ! ResetQueue ( controller ).
-
Set controller . [[started]] , controller . [[closeRequested]] , controller . [[pullAgain]] , and controller . [[pulling]] to false.
-
Set controller . [[strategySizeAlgorithm]] to sizeAlgorithm and controller . [[strategyHWM]] to highWaterMark .
-
Set controller . [[pullAlgorithm]] to pullAlgorithm .
-
Set controller . [[cancelAlgorithm]] to cancelAlgorithm .
-
Set stream . [[controller]] to controller .
-
Let startResult be the result of performing startAlgorithm . (This might throw an exception.)
-
Let startPromise be a promise resolved with startResult .
-
Upon fulfillment of startPromise ,
-
Set controller . [[started]] to true.
-
Assert: controller . [[pulling]] is false.
-
Assert: controller . [[pullAgain]] is false.
-
Perform ! ReadableStreamDefaultControllerCallPullIfNeeded ( controller ).
-
-
Upon rejection of startPromise with reason r ,
-
Perform ! ReadableStreamDefaultControllerError ( controller , r ).
-
-
Let controller be a new
ReadableStreamDefaultController
. -
Let startAlgorithm be an algorithm that returns undefined.
-
Let pullAlgorithm be an algorithm that returns a promise resolved with undefined.
-
Let cancelAlgorithm be an algorithm that returns a promise resolved with undefined.
-
If underlyingSourceDict ["
start
"] exists , then set startAlgorithm to an algorithm which returns the result of invoking underlyingSourceDict ["start
"] with argument list « controller » and callback this value underlyingSource . -
If underlyingSourceDict ["
pull
"] exists , then set pullAlgorithm to an algorithm which returns the result of invoking underlyingSourceDict ["pull
"] with argument list « controller » and callback this value underlyingSource . -
If underlyingSourceDict ["
cancel
"] exists , then set cancelAlgorithm to an algorithm which takes an argument reason and returns the result of invoking underlyingSourceDict ["cancel
"] with argument list « reason » and callback this value underlyingSource . -
Perform ? SetUpReadableStreamDefaultController ( stream , controller , startAlgorithm , pullAlgorithm , cancelAlgorithm , highWaterMark , sizeAlgorithm ).
4.9.5. Byte stream controllers
-
Let shouldPull be ! ReadableByteStreamControllerShouldCallPull ( controller ).
-
If shouldPull is false, return.
-
If controller . [[pulling]] is true,
-
Set controller . [[pullAgain]] to true.
-
Return.
-
-
Assert: controller . [[pullAgain]] is false.
-
Set controller . [[pulling]] to true.
-
Let pullPromise be the result of performing controller . [[pullAlgorithm]] .
-
Upon fulfillment of pullPromise ,
-
Set controller . [[pulling]] to false.
-
If controller . [[pullAgain]] is true,
-
Set controller . [[pullAgain]] to false.
-
Perform ! ReadableByteStreamControllerCallPullIfNeeded ( controller ).
-
-
-
Upon rejection of pullPromise with reason e ,
-
Perform ! ReadableByteStreamControllerError ( controller , e ).
-
ReadableStream
itself
is
still
referenced.
This is observable using weak references . See tc39/proposal-weakrefs#31 for more detail.
It performs the following steps:
-
Set controller . [[pullAlgorithm]] to undefined.
-
Set controller . [[cancelAlgorithm]] to undefined.
-
Perform ! ReadableByteStreamControllerInvalidateBYOBRequest ( controller ).
-
Set controller . [[pendingPullIntos]] to a new empty list .
-
Let stream be controller . [[stream]] .
-
If controller . [[closeRequested]] is true or stream . [[state]] is not "
readable
", return. -
If controller . [[queueTotalSize]] > 0,
-
Set controller . [[closeRequested]] to true.
-
Return.
-
-
If controller . [[pendingPullIntos]] is not empty,
-
Let firstPendingPullInto be controller . [[pendingPullIntos]] [0].
-
If firstPendingPullInto ’s bytes filled > 0,
-
Let e be a new
TypeError
exception. -
Perform ! ReadableByteStreamControllerError ( controller , e ).
-
Throw e .
-
-
-
Perform ! ReadableByteStreamControllerClearAlgorithms ( controller ).
-
Perform ! ReadableStreamClose ( stream ).
-
Assert: stream . [[state]] is not "
errored
". -
Assert: pullIntoDescriptor . reader type is not "
none
". -
Let done be false.
-
If stream . [[state]] is "
closed
",-
Assert: pullIntoDescriptor ’s bytes filled is 0.
-
Set done to true.
-
-
Let filledView be ! ReadableByteStreamControllerConvertPullIntoDescriptor ( pullIntoDescriptor ).
-
If pullIntoDescriptor ’s reader type is "
default
",-
Perform ! ReadableStreamFulfillReadRequest ( stream , filledView , done ).
-
-
Otherwise,
-
Assert: pullIntoDescriptor ’s reader type is "
byob
". -
Perform ! ReadableStreamFulfillReadIntoRequest ( stream , filledView , done ).
-
-
Let bytesFilled be pullIntoDescriptor ’s bytes filled .
-
Let elementSize be pullIntoDescriptor ’s element size .
-
Assert: bytesFilled ≤ pullIntoDescriptor ’s byte length .
-
Assert: bytesFilled mod elementSize is 0.
-
Let buffer be ! TransferArrayBuffer ( pullIntoDescriptor ’s buffer ).
-
Return ! Construct ( pullIntoDescriptor ’s view constructor , « buffer , pullIntoDescriptor ’s byte offset , bytesFilled ÷ elementSize »).
-
Let stream be controller . [[stream]] .
-
If controller . [[closeRequested]] is true or stream . [[state]] is not "
readable
", return. -
Let buffer be chunk .[[ViewedArrayBuffer]].
-
Let byteOffset be chunk .[[ByteOffset]].
-
Let byteLength be chunk .[[ByteLength]].
-
If ! IsDetachedBuffer ( buffer ) is true, throw a
TypeError
exception. -
Let transferredBuffer be ? TransferArrayBuffer ( buffer ).
-
If controller . [[pendingPullIntos]] is not empty ,
-
Let firstPendingPullInto be controller . [[pendingPullIntos]] [0].
-
If ! IsDetachedBuffer ( firstPendingPullInto ’s buffer ) is true, throw a
TypeError
exception. -
Perform ! ReadableByteStreamControllerInvalidateBYOBRequest ( controller ).
-
Set firstPendingPullInto ’s buffer to ! TransferArrayBuffer ( firstPendingPullInto ’s buffer ).
-
If firstPendingPullInto ’s reader type is "
none
", perform ? ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue ( controller , firstPendingPullInto ).
-
-
If ! ReadableStreamHasDefaultReader ( stream ) is true,
-
Perform ! ReadableByteStreamControllerProcessReadRequestsUsingQueue ( controller ).
-
If ! ReadableStreamGetNumReadRequests ( stream ) is 0,
-
Assert: controller . [[pendingPullIntos]] is empty .
-
Perform ! ReadableByteStreamControllerEnqueueChunkToQueue ( controller , transferredBuffer , byteOffset , byteLength ).
-
-
Otherwise,
-
If controller . [[pendingPullIntos]] is not empty ,
-
Assert: controller . [[pendingPullIntos]] [0]'s reader type is "
default
". -
Perform ! ReadableByteStreamControllerShiftPendingPullInto ( controller ).
-
-
Let transferredView be ! Construct (
%Uint8Array%
, « transferredBuffer , byteOffset , byteLength »). -
Perform ! ReadableStreamFulfillReadRequest ( stream , transferredView , false).
-
-
Otherwise, if ! ReadableStreamHasBYOBReader ( stream ) is true,
-
Perform ! ReadableByteStreamControllerEnqueueChunkToQueue ( controller , transferredBuffer , byteOffset , byteLength ).
-
Perform ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue ( controller ).
-
-
Otherwise,
-
Assert: ! IsReadableStreamLocked ( stream ) is false.
-
Perform ! ReadableByteStreamControllerEnqueueChunkToQueue ( controller , transferredBuffer , byteOffset , byteLength ).
-
-
Perform ! ReadableByteStreamControllerCallPullIfNeeded ( controller ).
-
Append a new readable byte stream queue entry with buffer buffer , byte offset byteOffset , and byte length byteLength to controller . [[queue]] .
-
Set controller . [[queueTotalSize]] to controller . [[queueTotalSize]] + byteLength .
-
Let cloneResult be CloneArrayBuffer ( buffer , byteOffset , byteLength ,
%ArrayBuffer%
). -
If cloneResult is an abrupt completion,
-
Perform ! ReadableByteStreamControllerError ( controller , cloneResult .[[Value]]).
-
Return cloneResult .
-
-
Perform ! ReadableByteStreamControllerEnqueueChunkToQueue ( controller , cloneResult .[[Value]], 0, byteLength ).
-
Assert: pullIntoDescriptor ’s reader type is "
none
". -
If pullIntoDescriptor ’s bytes filled > 0, perform ? ReadableByteStreamControllerEnqueueClonedChunkToQueue ( controller , pullIntoDescriptor ’s buffer , pullIntoDescriptor ’s byte offset , pullIntoDescriptor ’s bytes filled ).
-
Perform ! ReadableByteStreamControllerShiftPendingPullInto ( controller ).
-
Let stream be controller . [[stream]] .
-
If stream . [[state]] is not "
readable
", return. -
Perform ! ReadableByteStreamControllerClearPendingPullIntos ( controller ).
-
Perform ! ResetQueue ( controller ).
-
Perform ! ReadableByteStreamControllerClearAlgorithms ( controller ).
-
Perform ! ReadableStreamError ( stream , e ).
-
Assert: either controller . [[pendingPullIntos]] is empty , or controller . [[pendingPullIntos]] [0] is pullIntoDescriptor .
-
Assert: controller . [[byobRequest]] is null.
-
Set pullIntoDescriptor ’s bytes filled to bytes filled + size .
-
Let elementSize be pullIntoDescriptor .[[elementSize]].
-
Let currentAlignedBytes be pullIntoDescriptor ’s bytes filled − ( pullIntoDescriptor ’s bytes filled mod elementSize ).
-
Let maxBytesToCopy be min( controller . [[queueTotalSize]] , pullIntoDescriptor ’s byte length − pullIntoDescriptor ’s bytes filled ).
-
Let maxBytesFilled be pullIntoDescriptor ’s bytes filled + maxBytesToCopy .
-
Let maxAlignedBytes be maxBytesFilled − ( maxBytesFilled mod elementSize ).
-
Let totalBytesToCopyRemaining be maxBytesToCopy .
-
Let ready be false.
-
If maxAlignedBytes > currentAlignedBytes ,
-
Set totalBytesToCopyRemaining to maxAlignedBytes − pullIntoDescriptor ’s bytes filled .
-
Set ready to true.
-
-
Let queue be controller . [[queue]] .
-
While totalBytesToCopyRemaining > 0,
-
Let headOfQueue be queue [0].
-
Let bytesToCopy be min( totalBytesToCopyRemaining , headOfQueue ’s byte length ).
-
Let destStart be pullIntoDescriptor ’s byte offset + pullIntoDescriptor ’s bytes filled .
-
Perform ! CopyDataBlockBytes ( pullIntoDescriptor ’s buffer .[[ArrayBufferData]], destStart , headOfQueue ’s buffer .[[ArrayBufferData]], headOfQueue ’s byte offset , bytesToCopy ).
-
If headOfQueue ’s byte length is bytesToCopy ,
-
Remove queue [0].
-
-
Otherwise,
-
Set headOfQueue ’s byte offset to headOfQueue ’s byte offset + bytesToCopy .
-
Set headOfQueue ’s byte length to headOfQueue ’s byte length − bytesToCopy .
-
-
Set controller . [[queueTotalSize]] to controller . [[queueTotalSize]] − bytesToCopy .
-
Perform ! ReadableByteStreamControllerFillHeadPullIntoDescriptor ( controller , bytesToCopy , pullIntoDescriptor ).
-
Set totalBytesToCopyRemaining to totalBytesToCopyRemaining − bytesToCopy .
-
-
If ready is false,
-
Assert: controller . [[queueTotalSize]] is 0.
-
Assert: pullIntoDescriptor ’s bytes filled > 0.
-
Assert: pullIntoDescriptor ’s bytes filled < pullIntoDescriptor ’s element size .
-
-
Return ready .
-
Assert: controller . [[queueTotalSize]] > 0.
-
Let entry be controller . [[queue]] [0].
-
Set controller . [[queueTotalSize]] to controller . [[queueTotalSize]] − entry ’s byte length .
-
Perform ! ReadableByteStreamControllerHandleQueueDrain ( controller ).
-
Let view be ! Construct (
%Uint8Array%
, « entry ’s buffer , entry ’s byte offset , entry ’s byte length »). -
Perform readRequest ’s chunk steps , given view .
-
If controller . [[byobRequest]] is null and controller . [[pendingPullIntos]] is not empty ,
-
Let firstDescriptor be controller . [[pendingPullIntos]] [0].
-
Let view be ! Construct (
%Uint8Array%
, « firstDescriptor ’s buffer , firstDescriptor ’s byte offset + firstDescriptor ’s bytes filled , firstDescriptor ’s byte length − firstDescriptor ’s bytes filled »). -
Let byobRequest be a new
ReadableStreamBYOBRequest
. -
Set byobRequest . [[controller]] to controller .
-
Set byobRequest . [[view]] to view .
-
Set controller . [[byobRequest]] to byobRequest .
-
-
Return controller . [[byobRequest]] .
-
Let state be controller . [[stream]] . [[state]] .
-
If state is "
errored
", return null. -
If state is "
closed
", return 0. -
Return controller . [[strategyHWM]] − controller . [[queueTotalSize]] .
-
Assert: controller . [[stream]] . [[state]] is "
readable
". -
If controller . [[queueTotalSize]] is 0 and controller . [[closeRequested]] is true,
-
Perform ! ReadableByteStreamControllerClearAlgorithms ( controller ).
-
Perform ! ReadableStreamClose ( controller . [[stream]] ).
-
-
Otherwise,
-
Perform ! ReadableByteStreamControllerCallPullIfNeeded ( controller ).
-
-
If controller . [[byobRequest]] is null, return.
-
Set controller . [[byobRequest]] . [[controller]] to undefined.
-
Set controller . [[byobRequest]] . [[view]] to null.
-
Set controller . [[byobRequest]] to null.
-
Assert: controller . [[closeRequested]] is false.
-
While controller . [[pendingPullIntos]] is not empty ,
-
If controller . [[queueTotalSize]] is 0, return.
-
Let pullIntoDescriptor be controller . [[pendingPullIntos]] [0].
-
If ! ReadableByteStreamControllerFillPullIntoDescriptorFromQueue ( controller , pullIntoDescriptor ) is true,
-
Perform ! ReadableByteStreamControllerShiftPendingPullInto ( controller ).
-
Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor ( controller . [[stream]] , pullIntoDescriptor ).
-
-
-
Let reader be controller . [[stream]] . [[reader]] .
-
Assert: reader implements
ReadableStreamDefaultReader
. -
While reader . [[readRequests]] is not empty ,
-
If controller . [[queueTotalSize]] is 0, return.
-
Let readRequest be reader . [[readRequests]] [0].
-
Remove readRequest from reader . [[readRequests]] .
-
Perform ! ReadableByteStreamControllerFillReadRequestFromQueue ( controller , readRequest ).
-
-
Let stream be controller . [[stream]] .
-
Let elementSize be 1.
-
Let ctor be
%DataView%
. -
If view has a [[TypedArrayName]] internal slot (i.e., it is not a
DataView
),-
Set elementSize to the element size specified in the typed array constructors table for view .[[TypedArrayName]].
-
Set ctor to the constructor specified in the typed array constructors table for view .[[TypedArrayName]].
-
-
Let byteOffset be view .[[ByteOffset]].
-
Let byteLength be view .[[ByteLength]].
-
Let bufferResult be TransferArrayBuffer ( view .[[ViewedArrayBuffer]]).
-
If bufferResult is an abrupt completion,
-
Perform readIntoRequest ’s error steps , given bufferResult .[[Value]].
-
Return.
-
-
Let buffer be bufferResult .[[Value]].
-
Let pullIntoDescriptor be a new pull-into descriptor with buffer buffer , buffer byte length buffer .[[ArrayBufferByteLength]], byte offset byteOffset , byte length byteLength , bytes filled 0, element size elementSize , view constructor ctor , and reader type "
byob
". -
If controller . [[pendingPullIntos]] is not empty,
-
Append pullIntoDescriptor to controller . [[pendingPullIntos]] .
-
Perform ! ReadableStreamAddReadIntoRequest ( stream , readIntoRequest ).
-
Return.
-
-
If stream . [[state]] is "
closed
",-
Let emptyView be ! Construct ( ctor , « pullIntoDescriptor ’s buffer , pullIntoDescriptor ’s byte offset , 0 »).
-
Perform readIntoRequest ’s close steps , given emptyView .
-
Return.
-
-
If controller . [[queueTotalSize]] > 0,
-
If ! ReadableByteStreamControllerFillPullIntoDescriptorFromQueue ( controller , pullIntoDescriptor ) is true,
-
Let filledView be ! ReadableByteStreamControllerConvertPullIntoDescriptor ( pullIntoDescriptor ).
-
Perform ! ReadableByteStreamControllerHandleQueueDrain ( controller ).
-
Perform readIntoRequest ’s chunk steps , given filledView .
-
Return.
-
-
If controller . [[closeRequested]] is true,
-
Let e be a
TypeError
exception. -
Perform ! ReadableByteStreamControllerError ( controller , e ).
-
Perform readIntoRequest ’s error steps , given e .
-
Return.
-
-
-
Append pullIntoDescriptor to controller . [[pendingPullIntos]] .
-
Perform ! ReadableStreamAddReadIntoRequest ( stream , readIntoRequest ).
-
Perform ! ReadableByteStreamControllerCallPullIfNeeded ( controller ).
-
Assert: controller . [[pendingPullIntos]] is not empty.
-
Let firstDescriptor be controller . [[pendingPullIntos]] [0].
-
Let state be controller . [[stream]] . [[state]] .
-
If state is "
closed
",-
If bytesWritten is not 0, throw a
TypeError
exception.
-
-
Otherwise,
-
Assert: state is "
readable
". -
If bytesWritten is 0, throw a
TypeError
exception. -
If firstDescriptor ’s bytes filled + bytesWritten > firstDescriptor ’s byte length , throw a
RangeError
exception.
-
-
Set firstDescriptor ’s buffer to ! TransferArrayBuffer ( firstDescriptor ’s buffer ).
-
Perform ? ReadableByteStreamControllerRespondInternal ( controller , bytesWritten ).
-
Assert: firstDescriptor ’s bytes filled is 0.
-
If firstDescriptor ’s reader type is "
none
", perform ! ReadableByteStreamControllerShiftPendingPullInto ( controller ). -
Let stream be controller . [[stream]] .
-
If ! ReadableStreamHasBYOBReader ( stream ) is true,
-
While ! ReadableStreamGetNumReadIntoRequests ( stream ) > 0,
-
Let pullIntoDescriptor be ! ReadableByteStreamControllerShiftPendingPullInto ( controller ).
-
Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor ( stream , pullIntoDescriptor ).
-
-
-
Assert: pullIntoDescriptor ’s bytes filled + bytesWritten ≤ pullIntoDescriptor ’s byte length .
-
Perform ! ReadableByteStreamControllerFillHeadPullIntoDescriptor ( controller , bytesWritten , pullIntoDescriptor ).
-
If pullIntoDescriptor ’s reader type is "
none
",-
Perform ? ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue ( controller , pullIntoDescriptor ).
-
Perform ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue ( controller ).
-
Return.
-
-
If pullIntoDescriptor ’s bytes filled < pullIntoDescriptor ’s element size , return.
-
Perform ! ReadableByteStreamControllerShiftPendingPullInto ( controller ).
-
Let remainderSize be pullIntoDescriptor ’s bytes filled mod pullIntoDescriptor ’s element size .
-
If remainderSize > 0,
-
Let end be pullIntoDescriptor ’s byte offset + pullIntoDescriptor ’s bytes filled .
-
Perform ? ReadableByteStreamControllerEnqueueClonedChunkToQueue ( controller , pullIntoDescriptor ’s buffer , end − remainderSize , remainderSize ).
-
-
Set pullIntoDescriptor ’s bytes filled to pullIntoDescriptor ’s bytes filled − remainderSize .
-
Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor ( controller . [[stream]] , pullIntoDescriptor ).
-
Perform ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue ( controller ).
-
Let firstDescriptor be controller . [[pendingPullIntos]] [0].
-
Assert: ! CanTransferArrayBuffer ( firstDescriptor ’s buffer ) is true.
-
Perform ! ReadableByteStreamControllerInvalidateBYOBRequest ( controller ).
-
Let state be controller . [[stream]] . [[state]] .
-
If state is "
closed
",-
Assert: bytesWritten is 0.
-
Perform ! ReadableByteStreamControllerRespondInClosedState ( controller , firstDescriptor ).
-
-
Otherwise,
-
Assert: state is "
readable
". -
Assert: bytesWritten > 0.
-
Perform ? ReadableByteStreamControllerRespondInReadableState ( controller , bytesWritten , firstDescriptor ).
-
-
Perform ! ReadableByteStreamControllerCallPullIfNeeded ( controller ).
-
Assert: controller . [[pendingPullIntos]] is not empty .
-
Assert: ! IsDetachedBuffer ( view .[[ViewedArrayBuffer]]) is false.
-
Let firstDescriptor be controller . [[pendingPullIntos]] [0].
-
Let state be controller . [[stream]] . [[state]] .
-
If state is "
closed
",-
If view .[[ByteLength]] is not 0, throw a
TypeError
exception.
-
-
Otherwise,
-
Assert: state is "
readable
". -
If view .[[ByteLength]] is 0, throw a
TypeError
exception.
-
-
If firstDescriptor ’s byte offset + firstDescriptor ’ bytes filled is not view .[[ByteOffset]], throw a
RangeError
exception. -
If firstDescriptor ’s buffer byte length is not view .[[ViewedArrayBuffer]].[[ByteLength]], throw a
RangeError
exception. -
If firstDescriptor ’s bytes filled + view .[[ByteLength]] > firstDescriptor ’s byte length , throw a
RangeError
exception. -
Let viewByteLength be view .[[ByteLength]].
-
Set firstDescriptor ’s buffer to ? TransferArrayBuffer ( view .[[ViewedArrayBuffer]]).
-
Perform ? ReadableByteStreamControllerRespondInternal ( controller , viewByteLength ).
-
Assert: controller . [[byobRequest]] is null.
-
Let descriptor be controller . [[pendingPullIntos]] [0].
-
Remove descriptor from controller . [[pendingPullIntos]] .
-
Return descriptor .
-
Let stream be controller . [[stream]] .
-
If stream . [[state]] is not "
readable
", return false. -
If controller . [[closeRequested]] is true, return false.
-
If controller . [[started]] is false, return false.
-
If ! ReadableStreamHasDefaultReader ( stream ) is true and ! ReadableStreamGetNumReadRequests ( stream ) > 0, return true.
-
If ! ReadableStreamHasBYOBReader ( stream ) is true and ! ReadableStreamGetNumReadIntoRequests ( stream ) > 0, return true.
-
Let desiredSize be ! ReadableByteStreamControllerGetDesiredSize ( controller ).
-
Assert: desiredSize is not null.
-
If desiredSize > 0, return true.
-
Return false.
-
Assert: stream . [[controller]] is undefined.
-
If autoAllocateChunkSize is not undefined,
-
Assert: ! IsInteger ( autoAllocateChunkSize ) is true.
-
Assert: autoAllocateChunkSize is positive.
-
-
Set controller . [[stream]] to stream .
-
Set controller . [[pullAgain]] and controller . [[pulling]] to false.
-
Set controller . [[byobRequest]] to null.
-
Perform ! ResetQueue ( controller ).
-
Set controller . [[closeRequested]] and controller . [[started]] to false.
-
Set controller . [[strategyHWM]] to highWaterMark .
-
Set controller . [[pullAlgorithm]] to pullAlgorithm .
-
Set controller . [[cancelAlgorithm]] to cancelAlgorithm .
-
Set controller . [[autoAllocateChunkSize]] to autoAllocateChunkSize .
-
Set controller . [[pendingPullIntos]] to a new empty list .
-
Set stream . [[controller]] to controller .
-
Let startResult be the result of performing startAlgorithm .
-
Let startPromise be a promise resolved with startResult .
-
Upon fulfillment of startPromise ,
-
Set controller . [[started]] to true.
-
Assert: controller . [[pulling]] is false.
-
Assert: controller . [[pullAgain]] is false.
-
Perform ! ReadableByteStreamControllerCallPullIfNeeded ( controller ).
-
-
Upon rejection of startPromise with reason r ,
-
Perform ! ReadableByteStreamControllerError ( controller , r ).
-
-
Let controller be a new
ReadableByteStreamController
. -
Let startAlgorithm be an algorithm that returns undefined.
-
Let pullAlgorithm be an algorithm that returns a promise resolved with undefined.
-
Let cancelAlgorithm be an algorithm that returns a promise resolved with undefined.
-
If underlyingSourceDict ["
start
"] exists , then set startAlgorithm to an algorithm which returns the result of invoking underlyingSourceDict ["start
"] with argument list « controller » and callback this value underlyingSource . -
If underlyingSourceDict ["
pull
"] exists , then set pullAlgorithm to an algorithm which returns the result of invoking underlyingSourceDict ["pull
"] with argument list « controller » and callback this value underlyingSource . -
If underlyingSourceDict ["
cancel
"] exists , then set cancelAlgorithm to an algorithm which takes an argument reason and returns the result of invoking underlyingSourceDict ["cancel
"] with argument list « reason » and callback this value underlyingSource . -
Let autoAllocateChunkSize be underlyingSourceDict ["
autoAllocateChunkSize
"], if it exists , or undefined otherwise. -
If autoAllocateChunkSize is 0, then throw a
TypeError
exception. -
Perform ? SetUpReadableByteStreamController ( stream , controller , startAlgorithm , pullAlgorithm , cancelAlgorithm , highWaterMark , autoAllocateChunkSize ).
5. Writable streams
5.1. Using writable streams
readableStream. pipeTo( writableStream) . then(() => console. log( "All data successfully written!" )) . catch ( e=> console. error( "Something went wrong!" , e));
write()
and
close()
methods.
Since
writable
streams
queue
any
incoming
writes,
and
take
care
internally
to
forward
them
to
the
underlying
sink
in
sequence,
you
can
indiscriminately
write
to
a
writable
stream
without
much
ceremony:
function writeArrayToStream( array, writableStream) { const writer= writableStream. getWriter(); array. forEach( chunk=> writer. write( chunk). catch (() => {})); return writer. close(); } writeArrayToStream([ 1 , 2 , 3 , 4 , 5 ], writableStream) . then(() => console. log( "All done!" )) . catch ( e=> console. error( "Error with the stream: " + e));
Note
how
we
use
.catch(()
=>
{})
to
suppress
any
rejections
from
the
write()
method;
we’ll
be
notified
of
any
fatal
errors
via
a
rejection
of
the
close()
method,
and
leaving
them
un-caught
would
cause
potential
unhandledrejection
events
and
console
warnings.
close()
method.
That
promise
will
reject
if
anything
goes
wrong
with
the
stream—initializing
it,
writing
to
it,
or
closing
it.
And
it
will
fulfill
once
the
stream
is
successfully
closed.
Often
this
is
all
you
care
about.
However,
if
you
care
about
the
success
of
writing
a
specific
chunk
,
you
can
use
the
promise
returned
by
the
writer’s
write()
method:
writer. write( "i am a chunk of data" ) . then(() => console. log( "chunk successfully written!" )) . catch ( e=> console. error( e));
What "success" means is up to a given stream instance (or more precisely, its underlying sink ) to decide. For example, for a file stream it could simply mean that the OS has accepted the write, and not necessarily that the chunk has been flushed to disk. Some streams might not be able to give such a signal at all, in which case the returned promise will fulfill immediately.
desiredSize
and
ready
properties
of
writable
stream
writers
allow
producers
to
more
precisely
respond
to
flow
control
signals
from
the
stream,
to
keep
memory
usage
below
the
stream’s
specified
high
water
mark
.
The
following
example
writes
an
infinite
sequence
of
random
bytes
to
a
stream,
using
desiredSize
to
determine
how
many
bytes
to
generate
at
a
given
time,
and
using
ready
to
wait
for
the
backpressure
to
subside.
async function writeRandomBytesForever( writableStream) { const writer= writableStream. getWriter(); while ( true ) { await writer. ready; const bytes= new Uint8Array( writer. desiredSize); crypto. getRandomValues( bytes); // Purposefully don't await; awaiting writer.ready is enough. writer. write( bytes). catch (() => {}); } } writeRandomBytesForever( myWritableStream). catch ( e=> console. error( "Something broke" , e));
Note
how
we
don’t
await
the
promise
returned
by
write()
;
this
would
be
redundant
with
await
ing
the
ready
promise.
Additionally,
similar
to
a
previous
example
,
we
use
the
.catch(()
=>
{})
pattern
on
the
promises
returned
by
write()
;
in
this
case
we’ll
be
notified
about
any
failures
await
ing
the
ready
promise.
await
the
promise
returned
by
write()
,
consider
a
modification
of
the
above
example,
where
we
continue
to
use
the
WritableStreamDefaultWriter
interface
directly,
but
we
don’t
control
how
many
bytes
we
have
to
write
at
a
given
time.
In
that
case,
the
backpressure
-respecting
code
looks
the
same:
async function writeSuppliedBytesForever( writableStream, getBytes) { const writer= writableStream. getWriter(); while ( true ) { await writer. ready; const bytes= getBytes(); writer. write( bytes). catch (() => {}); } }
Unlike
the
previous
example,
where—because
we
were
always
writing
exactly
writer.desiredSize
bytes
each
time—the
write()
and
ready
promises
were
synchronized,
in
this
case
it’s
quite
possible
that
the
ready
promise
fulfills
before
the
one
returned
by
write()
does.
Remember,
the
ready
promise
fulfills
when
the
desired
size
becomes
positive,
which
might
be
before
the
write
succeeds
(especially
in
cases
with
a
larger
high
water
mark
).
In
other
words,
await
ing
the
return
value
of
write()
means
you
never
queue
up
writes
in
the
stream’s
internal
queue
,
instead
only
executing
a
write
after
the
previous
one
succeeds,
which
can
result
in
low
throughput.
5.2.
The
WritableStream
class
The
WritableStream
represents
a
writable
stream
.
5.2.1. Interface definition
The
Web
IDL
definition
for
the
WritableStream
class
is
given
as
follows:
[Exposed=*,Transferable ]interface {
WritableStream constructor (optional object ,
underlyingSink optional QueuingStrategy = {});
strategy readonly attribute boolean locked ;);Promise <undefined >abort (optional any );
reason Promise <undefined >close ();WritableStreamDefaultWriter getWriter (); };
5.2.2. Internal slots
Instances
of
WritableStream
are
created
with
the
internal
slots
described
in
the
following
table:
Internal Slot | Description ( non-normative ) |
---|---|
[[backpressure]] | A boolean indicating the backpressure signal set by the controller |
[[closeRequest]] |
The
promise
returned
from
the
writer’s
close()
method
|
[[controller]] |
A
WritableStreamDefaultController
created
with
the
ability
to
control
the
state
and
queue
of
this
stream
|
[[Detached]] | A boolean flag set to true when the stream is transferred |
[[inFlightWriteRequest]] | A slot set to the promise for the current in-flight write operation while the underlying sink 's write algorithm is executing and has not yet fulfilled, used to prevent reentrant calls |
[[inFlightCloseRequest]] |
A
slot
set
to
the
promise
for
the
current
in-flight
close
operation
while
the
underlying
sink
's
close
algorithm
is
executing
and
has
not
yet
fulfilled,
used
to
prevent
the
abort()
method
from
interrupting
close
|
[[pendingAbortRequest]] | A pending abort request |
[[state]] |
A
string
containing
the
stream’s
current
state,
used
internally;
one
of
"
writable
",
"
closed
",
"
erroring
",
or
"
errored
"
|
[[storedError]] |
A
value
indicating
how
the
stream
failed,
to
be
given
as
a
failure
reason
or
exception
when
trying
to
operate
on
the
stream
while
in
the
"
errored
"
state
|
[[writer]] |
A
WritableStreamDefaultWriter
instance,
if
the
stream
is
locked
to
a
writer
,
or
undefined
if
it
is
not
|
[[writeRequests]] | A list of promises representing the stream’s internal queue of write requests not yet processed by the underlying sink |
The [[inFlightCloseRequest]] slot and [[closeRequest]] slot are mutually exclusive. Similarly, no element will be removed from [[writeRequests]] while [[inFlightWriteRequest]] is not undefined. Implementations can optimize storage for these slots based on these invariants.
A pending abort request is a struct used to track a request to abort the stream before that request is finally processed. It has the following items :
- promise
-
A promise returned from WritableStreamAbort
- reason
-
A JavaScript value that was passed as the abort reason to WritableStreamAbort
- was already erroring
-
A boolean indicating whether or not the stream was in the "
erroring
" state when WritableStreamAbort was called, which impacts the outcome of the abort request
5.2.3. The underlying sink API
The
WritableStream()
constructor
accepts
as
its
first
argument
a
JavaScript
object
representing
the
underlying
sink
.
Such
objects
can
contain
any
of
the
following
properties:
dictionary {
UnderlyingSink UnderlyingSinkStartCallback start ;UnderlyingSinkWriteCallback write ;UnderlyingSinkCloseCallback close ;UnderlyingSinkAbortCallback abort ;;any type ; };); );callback =
UnderlyingSinkStartCallback any (WritableStreamDefaultController );
controller callback =
UnderlyingSinkWriteCallback Promise <undefined > (any ,
chunk WritableStreamDefaultController );
controller callback =
UnderlyingSinkCloseCallback Promise <undefined > (););callback =
UnderlyingSinkAbortCallback Promise <undefined > (optional any );
reason
-
start( controller )
, of type UnderlyingSinkStartCallback -
A function that is called immediately during creation of the
WritableStream
.Typically this is used to acquire access to the underlying sink resource being represented.
If this setup process is asynchronous, it can return a promise to signal success or failure; a rejected promise will error the stream. Any thrown exceptions will be re-thrown by the
WritableStream()
constructor. -
write( chunk , controller )
, of type UnderlyingSinkWriteCallback -
A function that is called when a new chunk of data is ready to be written to the underlying sink . The stream implementation guarantees that this function will be called only after previous writes have succeeded, and never before
start()
has succeeded or afterclose()
orabort()
have been called.This function is used to actually send the data to the resource presented by the underlying sink , for example by calling a lower-level API.
If the process of writing data is asynchronous, and communicates success or failure signals back to its user, then this function can return a promise to signal success or failure. This promise return value will be communicated back to the caller of
writer.write()
, so they can monitor that individual write. Throwing an exception is treated the same as returning a rejected promise.Note that such signals are not always available; compare e.g. § 10.6 A writable stream with no backpressure or success signals with § 10.7 A writable stream with backpressure and success signals . In such cases, it’s best to not return anything.
The promise potentially returned by this function also governs whether the given chunk counts as written for the purposes of computed the desired size to fill the stream’s internal queue . That is, during the time it takes the promise to settle,
writer.desiredSize
will stay at its previous value, only increasing to signal the desire for more chunks once the write succeeds.Finally, the promise potentially returned by this function is used to ensure that well-behaved producers do not attempt to mutate the chunk before it has been fully processed. (This is not guaranteed by any specification machinery, but instead is an informal contract between producers and the underlying sink .)
-
close()
, of type UnderlyingSinkCloseCallback -
A function that is called after the producer signals, via
writer.close()
, that they are done writing chunks to the stream, and subsequently all queued-up writes have successfully completed.This function can perform any actions necessary to finalize or flush writes to the underlying sink , and release access to any held resources.
If the shutdown process is asynchronous, the function can return a promise to signal success or failure; the result will be communicated via the return value of the called
writer.close()
method. Additionally, a rejected promise will error the stream, instead of letting it close successfully. Throwing an exception is treated the same as returning a rejected promise. -
abort( reason )
, of type UnderlyingSinkAbortCallback -
A function that is called after the producer signals, via
stream.abort()
orwriter.abort()
, that they wish to abort the stream. It takes as its argument the same value as was passed to those methods by the producer.Writable streams can additionally be aborted under certain conditions during piping ; see the definition of the
pipeTo()
method for more details.This function can clean up any held resources, much like
close()
, but perhaps with some custom handling.If the shutdown process is asynchronous, the function can return a promise to signal success or failure; the result will be communicated via the return value of the called
writer.abort()
method. Throwing an exception is treated the same as returning a rejected promise. Regardless, the stream will be errored with a newTypeError
indicating that it was aborted. -
type
, of type any -
This property is reserved for future use, so any attempts to supply a value will throw an exception.
The
controller
argument
passed
to
start()
and
write()
is
an
instance
of
WritableStreamDefaultController
,
and
has
the
ability
to
error
the
stream.
This
is
mainly
used
for
bridging
the
gap
with
non-promise-based
APIs,
as
seen
for
example
in
§ 10.6
A
writable
stream
with
no
backpressure
or
success
signals
.
5.2.4. Constructor, methods, and properties
-
stream = new
WritableStream
( underlyingSink [, strategy ) -
Creates a new
WritableStream
wrapping the provided underlying sink . See § 5.2.3 The underlying sink API for more details on the underlyingSink argument.The strategy argument represents the stream’s queuing strategy , as described in § 7.1 The queuing strategy API . If it is not provided, the default behavior will be the same as a
CountQueuingStrategy
with a high water mark of 1. -
isLocked = stream .
locked
-
Returns whether or not the writable stream is locked to a writer .
-
await stream .
abort
([ reason ]) -
Aborts the stream, signaling that the producer can no longer successfully write to the stream and it is to be immediately moved to an errored state, with any queued-up writes discarded. This will also execute any abort mechanism of the underlying sink .
The returned promise will fulfill if the stream shuts down successfully, or reject if the underlying sink signaled that there was an error doing so. Additionally, it will reject with a
TypeError
(without attempting to cancel the stream) if the stream is currently locked . -
await stream .
close
() -
Closes the stream. The underlying sink will finish processing any previously-written chunks , before invoking its close behavior. During this time any further attempts to write will fail (without erroring the stream).
The method returns a promise that will fulfill if all remaining chunks are successfully written and the stream successfully closes, or rejects if an error is encountered during this process. Additionally, it will reject with a
TypeError
(without attempting to cancel the stream) if the stream is currently locked . -
writer = stream .
getWriter
() -
Creates a writer (an instance of
WritableStreamDefaultWriter
) and locks the stream to the new writer. While the stream is locked, no other writer can be acquired until this one is released .This functionality is especially useful for creating abstractions that desire the ability to write to a stream without interruption or interleaving. By getting a writer for the stream, you can ensure nobody else can write at the same time, which would cause the resulting written data to be unpredictable and probably useless.
new
WritableStream(
underlyingSink
,
strategy
)
constructor
steps
are:
-
If underlyingSink is missing, set it to null.
-
Let underlyingSinkDict be underlyingSink , converted to an IDL value of type
UnderlyingSink
.We cannot declare the underlyingSink argument as having the
UnderlyingSink
type directly, because doing so would lose the reference to the original object. We need to retain the object so we can invoke the various methods on it. -
If underlyingSinkDict ["
type
"] exists , throw aRangeError
exception.This is to allow us to add new potential types in the future, without backward-compatibility concerns.
-
Perform ! InitializeWritableStream ( this ).
-
Let sizeAlgorithm be ! ExtractSizeAlgorithm ( strategy ).
-
Let highWaterMark be ? ExtractHighWaterMark ( strategy , 1).
-
Perform ? SetUpWritableStreamDefaultControllerFromUnderlyingSink ( this , underlyingSink , underlyingSinkDict , highWaterMark , sizeAlgorithm ).
locked
getter
steps
are:
-
Return ! IsWritableStreamLocked ( this ).
abort(
reason
)
method
steps
are:
-
If ! IsWritableStreamLocked ( this ) is true, return a promise rejected with a
TypeError
exception. -
Return ! WritableStreamAbort ( this , reason ).
close()
method
steps
are:
-
If ! IsWritableStreamLocked ( this ) is true, return a promise rejected with a
TypeError
exception. -
If ! WritableStreamCloseQueuedOrInFlight ( this ) is true, return a promise rejected with a
TypeError
exception. -
Return ! WritableStreamClose ( this ).
getWriter()
method
steps
are:
-
Return ? AcquireWritableStreamDefaultWriter ( this ).
5.2.5.
Transfer
via
postMessage()
-
destination.postMessage(ws, { transfer: [ws] });
-
Sends a
WritableStream
to another frame, window, or worker.The transferred stream can be used exactly like the original. The original will become locked and no longer directly usable.
WritableStream
objects
are
transferable
objects
.
Their
transfer
steps
,
given
value
and
dataHolder
,
are:
-
If ! IsWritableStreamLocked ( value ) is true, throw a "
DataCloneError
"DOMException
. -
Let port1 be a new
MessagePort
in the current Realm . -
Let port2 be a new
MessagePort
in the current Realm . -
Entangle port1 and port2 .
-
Let readable be a new
ReadableStream
in the current Realm . -
Perform ! SetUpCrossRealmTransformReadable ( readable , port1 ).
-
Let promise be ! ReadableStreamPipeTo ( readable , value , false, false, false).
-
Set promise .[[PromiseIsHandled]] to true.
-
Set dataHolder .[[port]] to ! StructuredSerializeWithTransfer ( port2 , « port2 »).
-
Let deserializedRecord be ! StructuredDeserializeWithTransfer ( dataHolder .[[port]], the current Realm ).
-
Let port be a deserializedRecord .[[Deserialized]].
-
Perform ! SetUpCrossRealmTransformWritable ( value , port ).
5.3.
The
WritableStreamDefaultWriter
class
The
WritableStreamDefaultWriter
class
represents
a
writable
stream
writer
designed
to
be
vended
by
a
WritableStream
instance.
5.3.1. Interface definition
The
Web
IDL
definition
for
the
WritableStreamDefaultWriter
class
is
given
as
follows:
[Exposed=*]interface {
WritableStreamDefaultWriter constructor (WritableStream );
stream readonly attribute Promise <undefined >closed ;readonly attribute unrestricted double ?desiredSize ;readonly attribute Promise <undefined >ready ;);Promise <undefined >abort (optional any );
reason Promise <undefined >close ();undefined releaseLock (););Promise <undefined >write (optional any ); };
chunk
5.3.2. Internal slots
Instances
of
WritableStreamDefaultWriter
are
created
with
the
internal
slots
described
in
the
following
table:
Internal Slot | Description ( non-normative ) |
---|---|
[[closedPromise]] |
A
promise
returned
by
the
writer’s
closed
getter
|
[[readyPromise]] |
A
promise
returned
by
the
writer’s
ready
getter
|
[[stream]] |
A
WritableStream
instance
that
owns
this
reader
|
5.3.3. Constructor, methods, and properties
-
writer = new
WritableStreamDefaultWriter
( stream ) -
This is equivalent to calling
stream .
.getWriter()
-
await writer .
closed
-
Returns a promise that will be fulfilled when the stream becomes closed, or rejected if the stream ever errors or the writer’s lock is released before the stream finishes closing.
-
desiredSize = writer .
desiredSize
-
Returns the desired size to fill the stream’s internal queue . It can be negative, if the queue is over-full. A producer can use this information to determine the right amount of data to write.
It will be null if the stream cannot be successfully written to (due to either being errored, or having an abort queued up). It will return zero if the stream is closed. And the getter will throw an exception if invoked when the writer’s lock is released .
-
await writer .
ready
-
Returns a promise that will be fulfilled when the desired size to fill the stream’s internal queue transitions from non-positive to positive, signaling that it is no longer applying backpressure . Once the desired size dips back to zero or below, the getter will return a new promise that stays pending until the next transition.
If the stream becomes errored or aborted, or the writer’s lock is released , the returned promise will become rejected.
-
await writer .
abort
([ reason ]) -
If the reader is active , behaves the same as
stream .
.abort
( reason ) -
await writer .
close
() -
If the reader is active , behaves the same as
stream .
.close
() -
writer .
releaseLock
() -
Releases the writer’s lock on the corresponding stream. After the lock is released, the writer is no longer active . If the associated stream is errored when the lock is released, the writer will appear errored in the same way from now on; otherwise, the writer will appear closed.
Note that the lock can still be released even if some ongoing writes have not yet finished (i.e. even if the promises returned from previous calls to
write()
have not yet settled). It’s not necessary to hold the lock on the writer for the duration of the write; the lock instead simply prevents other producers from writing in an interleaved manner. -
await writer .
write
( chunk ) -
Writes the given chunk to the writable stream, by waiting until any previous writes have finished successfully, and then sending the chunk to the underlying sink 's
write()
method. It will return a promise that fulfills with undefined upon a successful write, or rejects if the write fails or stream becomes errored before the writing process is initiated.Note that what "success" means is up to the underlying sink ; it might indicate simply that the chunk has been accepted, and not necessarily that it is safely saved to its ultimate destination.
If chunk is mutable, producers are advised to avoid mutating it after passing it to
write()
, until after the promise returned bywrite()
settles. This ensures that the underlying sink receives and processes the same value that was passed in.
new
WritableStreamDefaultWriter(
stream
)
constructor
steps
are:
-
Perform ? SetUpWritableStreamDefaultWriter ( this , stream ).
closed
getter
steps
are:
-
Return this . [[closedPromise]] .
desiredSize
getter
steps
are:
-
If this . [[stream]] is undefined, throw a
TypeError
exception. -
Return ! WritableStreamDefaultWriterGetDesiredSize ( this ).
ready
getter
steps
are:
-
Return this . [[readyPromise]] .
abort(
reason
)
method
steps
are:
-
If this . [[stream]] is undefined, return a promise rejected with a
TypeError
exception. -
Return ! WritableStreamDefaultWriterAbort ( this , reason ).
close()
method
steps
are:
-
Let stream be this . [[stream]] .
-
If stream is undefined, return a promise rejected with a
TypeError
exception. -
If ! WritableStreamCloseQueuedOrInFlight ( stream ) is true, return a promise rejected with a
TypeError
exception. -
Return ! WritableStreamDefaultWriterClose ( this ).
releaseLock()
method
steps
are:
-
Let stream be this . [[stream]] .
-
If stream is undefined, return.
-
Assert: stream . [[writer]] is not undefined.
-
Perform ! WritableStreamDefaultWriterRelease ( this ).
write(
chunk
)
method
steps
are:
-
If this . [[stream]] is undefined, return a promise rejected with a
TypeError
exception. -
Return ! WritableStreamDefaultWriterWrite ( this , chunk ).
5.4.
The
WritableStreamDefaultController
class
The
WritableStreamDefaultController
class
has
methods
that
allow
control
of
a
WritableStream
's
state.
When
constructing
a
WritableStream
,
the
underlying
sink
is
given
a
corresponding
WritableStreamDefaultController
instance
to
manipulate.
5.4.1. Interface definition
The
Web
IDL
definition
for
the
WritableStreamDefaultController
class
is
given
as
follows:
[Exposed=*]interface {
WritableStreamDefaultController readonly attribute AbortSignal signal ;);undefined error (optional any ); };
e
5.4.2. Internal slots
Instances
of
WritableStreamDefaultController
are
created
with
the
internal
slots
described
in
the
following
table:
Internal Slot | Description ( non-normative ) |
---|---|
[[abortAlgorithm]] | A promise-returning algorithm, taking one argument (the abort reason), which communicates a requested abort to the underlying sink |
[[abortController]] |
An
AbortController
that
can
be
used
to
abort
the
pending
write
or
close
operation
when
the
stream
is
aborted
.
|
[[closeAlgorithm]] | A promise-returning algorithm which communicates a requested close to the underlying sink |
[[queue]] | A list representing the stream’s internal queue of chunks |
[[queueTotalSize]] | The total size of all the chunks stored in [[queue]] (see § 8.1 Queue-with-sizes ) |
[[started]] | A boolean flag indicating whether the underlying sink has finished starting |
[[strategyHWM]] | A number supplied by the creator of the stream as part of the stream’s queuing strategy , indicating the point at which the stream will apply backpressure to its underlying sink |
[[strategySizeAlgorithm]] | An algorithm to calculate the size of enqueued chunks , as part of the stream’s queuing strategy |
[[stream]] |
The
WritableStream
instance
controlled
|
[[writeAlgorithm]] | A promise-returning algorithm, taking one argument (the chunk to write), which writes data to the underlying sink |
The close sentinel is a unique value enqueued into [[queue]] , in lieu of a chunk , to signal that the stream is closed. It is only used internally, and is never exposed to web developers.
5.4.3. Methods and properties
-
controller .
signal
-
An AbortSignal that can be used to abort the pending write or close operation when the stream is aborted .
-
controller .
error
( e ) -
Closes the controlled writable stream, making all future interactions with it fail with the given error e .
This method is rarely used, since usually it suffices to return a rejected promise from one of the underlying sink 's methods. However, it can be useful for suddenly shutting down a stream in response to an event outside the normal lifecycle of interactions with the underlying sink .
signal
getter
steps
are:
-
Return this . [[abortController]] 's signal .
error(
e
)
method
steps
are:
-
Let state be this . [[stream]] . [[state]] .
-
If state is not "
writable
", return. -
Perform ! WritableStreamDefaultControllerError ( this , e ).
5.4.4. Internal methods
The
following
are
internal
methods
implemented
by
each
WritableStreamDefaultController
instance.
The
writable
stream
implementation
will
call
into
these.
The reason these are in method form, instead of as abstract operations, is to make it clear that the writable stream implementation is decoupled from the controller implementation, and could in the future be expanded with other controllers, as long as those controllers implemented such internal methods. A similar scenario is seen for readable streams (see § 4.9.2 Interfacing with controllers ), where there actually are multiple controller types and as such the counterpart internal methods are used polymorphically.
-
Let result be the result of performing this . [[abortAlgorithm]] , passing reason .
-
Perform ! WritableStreamDefaultControllerClearAlgorithms ( this ).
-
Return result .
-
Perform ! ResetQueue ( this ).
5.5. Abstract operations
5.5.1. Working with writable streams
The
following
abstract
operations
operate
on
WritableStream
instances
at
a
higher
level.
-
Let writer be a new
WritableStreamDefaultWriter
. -
Perform ? SetUpWritableStreamDefaultWriter ( writer , stream ).
-
Return writer .
-
Assert: ! IsNonNegativeNumber ( highWaterMark ) is true.
-
Let stream be a new
WritableStream
. -
Perform ! InitializeWritableStream ( stream ).
-
Let controller be a new
WritableStreamDefaultController
. -
Perform ? SetUpWritableStreamDefaultController ( stream , controller , startAlgorithm , writeAlgorithm , closeAlgorithm , abortAlgorithm , highWaterMark , sizeAlgorithm ).
-
Return stream .
This abstract operation will throw an exception if and only if the supplied startAlgorithm throws.
-
Set stream . [[state]] to "
writable
". -
Set stream . [[storedError]] , stream . [[writer]] , stream . [[controller]] , stream . [[inFlightWriteRequest]] , stream . [[closeRequest]] , stream . [[inFlightCloseRequest]] , and stream . [[pendingAbortRequest]] to undefined.
-
Set stream . [[writeRequests]] to a new empty list .
-
Set stream . [[backpressure]] to false.
-
If stream . [[writer]] is undefined, return false.
-
Return true.
-
If ! IsWritableStreamLocked ( stream ) is true, throw a
TypeError
exception. -
Set writer . [[stream]] to stream .
-
Set stream . [[writer]] to writer .
-
Let state be stream . [[state]] .
-
If state is "
writable
",-
If ! WritableStreamCloseQueuedOrInFlight ( stream ) is false and stream . [[backpressure]] is true, set writer . [[readyPromise]] to a new promise .
-
Otherwise, set writer . [[readyPromise]] to a promise resolved with undefined.
-
Set writer . [[closedPromise]] to a new promise .
-
-
Otherwise, if state is "
erroring
",-
Set writer . [[readyPromise]] to a promise rejected with stream . [[storedError]] .
-
Set writer . [[readyPromise]] .[[PromiseIsHandled]] to true.
-
Set writer . [[closedPromise]] to a new promise .
-
-
Otherwise, if state is "
closed
",-
Set writer . [[readyPromise]] to a promise resolved with undefined.
-
Set writer . [[closedPromise]] to a promise resolved with undefined.
-
-
Otherwise,
-
Assert: state is "
errored
". -
Let storedError be stream . [[storedError]] .
-
Set writer . [[readyPromise]] to a promise rejected with storedError .
-
Set writer . [[readyPromise]] .[[PromiseIsHandled]] to true.
-
Set writer . [[closedPromise]] to a promise rejected with storedError .
-
Set writer . [[closedPromise]] .[[PromiseIsHandled]] to true.
-
-
If stream . [[state]] is "
closed
" or "errored
", return a promise resolved with undefined. -
Signal abort on stream . [[controller]] . [[abortController]] with reason .
-
Let state be stream . [[state]] .
-
If state is "
closed
" or "errored
", return a promise resolved with undefined.We re-check the state because signaling abort runs author code and that might have changed the state.
-
If stream . [[pendingAbortRequest]] is not undefined, return stream . [[pendingAbortRequest]] 's promise .
-
Assert: state is "
writable
" or "erroring
". -
Let wasAlreadyErroring be false.
-
If state is "
erroring
",-
Set wasAlreadyErroring to true.
-
Set reason to undefined.
-
-
Let promise be a new promise .
-
Set stream . [[pendingAbortRequest]] to a new pending abort request whose promise is promise , reason is reason , and was already erroring is wasAlreadyErroring .
-
If wasAlreadyErroring is false, perform ! WritableStreamStartErroring ( stream , reason ).
-
Return promise .
-
Let state be stream . [[state]] .
-
If state is "
closed
" or "errored
", return a promise rejected with aTypeError
exception. -
Assert: state is "
writable
" or "erroring
". -
Assert: ! WritableStreamCloseQueuedOrInFlight ( stream ) is false.
-
Let promise be a new promise .
-
Set stream . [[closeRequest]] to promise .
-
Let writer be stream . [[writer]] .
-
If writer is not undefined, and stream . [[backpressure]] is true, and state is "
writable
", resolve writer . [[readyPromise]] with undefined. -
Perform ! WritableStreamDefaultControllerClose ( stream . [[controller]] ).
-
Return promise .
5.5.2. Interfacing with controllers
To
allow
future
flexibility
to
add
different
writable
stream
behaviors
(similar
to
the
distinction
between
default
readable
streams
and
readable
byte
streams
),
much
of
the
internal
state
of
a
writable
stream
is
encapsulated
by
the
WritableStreamDefaultController
class.
Each
controller
class
defines
two
internal
methods,
which
are
called
by
the
WritableStream
algorithms:
- [[AbortSteps]]( reason )
- The controller’s steps that run in reaction to the stream being aborted , used to clean up the state stored in the controller and inform the underlying sink .
- [[ErrorSteps]]()
- The controller’s steps that run in reaction to the stream being errored, used to clean up the state stored in the controller.
(These
are
defined
as
internal
methods,
instead
of
as
abstract
operations,
so
that
they
can
be
called
polymorphically
by
the
WritableStream
algorithms,
without
having
to
branch
on
which
type
of
controller
is
present.
This
is
a
bit
theoretical
for
now,
given
that
only
WritableStreamDefaultController
exists
so
far.)
The
rest
of
this
section
concerns
abstract
operations
that
go
in
the
other
direction:
they
are
used
by
the
controller
implementation
to
affect
its
associated
WritableStream
object.
This
translates
internal
state
changes
of
the
controllerinto
developer-facing
results
visible
through
the
WritableStream
's
public
API.
-
Assert: ! IsWritableStreamLocked ( stream ) is true.
-
Assert: stream . [[state]] is "
writable
". -
Let promise be a new promise .
-
Append promise to stream . [[writeRequests]] .
-
Return promise .
-
If stream . [[closeRequest]] is undefined and stream . [[inFlightCloseRequest]] is undefined, return false.
-
Return true.
-
Let state be stream . [[state]] .
-
If state is "
writable
",-
Perform ! WritableStreamStartErroring ( stream , error ).
-
Return.
-
-
Assert: state is "
erroring
". -
Perform ! WritableStreamFinishErroring ( stream ).
-
Assert: stream . [[state]] is "
erroring
". -
Assert: ! WritableStreamHasOperationMarkedInFlight ( stream ) is false.
-
Set stream . [[state]] to "
errored
". -
Perform ! stream . [[controller]] . [[ErrorSteps]] ().
-
Let storedError be stream . [[storedError]] .
-
For each writeRequest of stream . [[writeRequests]] :
-
Reject writeRequest with storedError .
-
-
Set stream . [[writeRequests]] to an empty list .
-
If stream . [[pendingAbortRequest]] is undefined,
-
Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded ( stream ).
-
Return.
-
-
Let abortRequest be stream . [[pendingAbortRequest]] .
-
Set stream . [[pendingAbortRequest]] to undefined.
-
If abortRequest ’s was already erroring is true,
-
Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded ( stream ).
-
Return.
-
Let promise be ! stream . [[controller]] . [[AbortSteps]] ( abortRequest ’s reason ).
-
Upon fulfillment of promise ,
-
Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded ( stream ).
-
Upon rejection of promise with reason reason ,
-
Perform ! WritableStreamRejectCloseAndClosedPromiseIfNeeded ( stream ).
-
Assert: stream . [[inFlightCloseRequest]] is not undefined.
-
Resolve stream . [[inFlightCloseRequest]] with undefined.
-
Set stream . [[inFlightCloseRequest]] to undefined.
-
Let state be stream . [[state]] .
-
Assert: stream . [[state]] is "
writable
" or "erroring
". -
If state is "
erroring
",-
Set stream . [[storedError]] to undefined.
-
If stream . [[pendingAbortRequest]] is not undefined,
-
Resolve stream . [[pendingAbortRequest]] 's promise with undefined.
-
Set stream . [[pendingAbortRequest]] to undefined.
-
-
-
Set stream . [[state]] to "
closed
". -
Let writer be stream . [[writer]] .
-
If writer is not undefined, resolve writer . [[closedPromise]] with undefined.
-
Assert: stream . [[pendingAbortRequest]] is undefined.
-
Assert: stream . [[storedError]] is undefined.
-
Assert: stream . [[inFlightCloseRequest]] is not undefined.
-
Reject stream . [[inFlightCloseRequest]] with error .
-
Set stream . [[inFlightCloseRequest]] to undefined.
-
Assert: stream . [[state]] is "
writable
" or "erroring
". -
If stream . [[pendingAbortRequest]] is not undefined,
-
Reject stream . [[pendingAbortRequest]] 's promise with error .
-
Set stream . [[pendingAbortRequest]] to undefined.
-
-
Perform ! WritableStreamDealWithRejection ( stream , error ).
-
Assert: stream . [[inFlightWriteRequest]] is not undefined.
-
Resolve stream . [[inFlightWriteRequest]] with undefined.
-
Set stream . [[inFlightWriteRequest]] to undefined.
-
Assert: stream . [[inFlightWriteRequest]] is not undefined.
-
Reject stream . [[inFlightWriteRequest]] with error .
-
Set stream . [[inFlightWriteRequest]] to undefined.
-
Assert: stream . [[state]] is "
writable
" or "erroring
". -
Perform ! WritableStreamDealWithRejection ( stream , error ).
-
If stream . [[inFlightWriteRequest]] is undefined and stream . [[inFlightCloseRequest]] is undefined, return false.
-
Return true.
-
Assert: stream . [[inFlightCloseRequest]] is undefined.
-
Assert: stream . [[closeRequest]] is not undefined.
-
Set stream . [[inFlightCloseRequest]] to stream . [[closeRequest]] .
-
Set stream . [[closeRequest]] to undefined.
-
Assert: stream . [[inFlightWriteRequest]] is undefined.
-
Assert: stream . [[writeRequests]] is not empty.
-
Let writeRequest be stream . [[writeRequests]] [0].
-
Remove writeRequest from stream . [[writeRequests]] .
-
Set stream . [[inFlightWriteRequest]] to writeRequest .
-
Assert: stream . [[state]] is "
errored
". -
If stream . [[closeRequest]] is not undefined,
-
Assert: stream . [[inFlightCloseRequest]] is undefined.
-
Reject stream . [[closeRequest]] with stream . [[storedError]] .
-
Set stream . [[closeRequest]] to undefined.
-
-
Let writer be stream . [[writer]] .
-
If writer is not undefined,
-
Reject writer . [[closedPromise]] with stream . [[storedError]] .
-
Set writer . [[closedPromise]] .[[PromiseIsHandled]] to true.
-
-
Assert: stream . [[storedError]] is undefined.
-
Assert: stream . [[state]] is "
writable
". -
Let controller be stream . [[controller]] .
-
Assert: controller is not undefined.
-
Set stream . [[state]] to "
erroring
". -
Set stream . [[storedError]] to reason .
-
Let writer be stream . [[writer]] .
-
If writer is not undefined, perform ! WritableStreamDefaultWriterEnsureReadyPromiseRejected ( writer , reason ).
-
If ! WritableStreamHasOperationMarkedInFlight ( stream ) is false and controller . [[started]] is true, perform ! WritableStreamFinishErroring ( stream ).
-
Assert: stream . [[state]] is "
writable
". -
Assert: ! WritableStreamCloseQueuedOrInFlight ( stream ) is false.
-
Let writer be stream . [[writer]] .
-
If writer is not undefined and backpressure is not stream . [[backpressure]] ,
-
If backpressure is true, set writer . [[readyPromise]] to a new promise .
-
Otherwise,
-
Assert: backpressure is false.
-
Resolve writer . [[readyPromise]] with undefined.
-
-
-
Set stream . [[backpressure]] to backpressure .
5.5.3. Writers
The
following
abstract
operations
support
the
implementation
and
manipulation
of
WritableStreamDefaultWriter
instances.
-
Let stream be writer . [[stream]] .
-
Assert: stream is not undefined.
-
Return ! WritableStreamAbort ( stream , reason ).
-
Let stream be writer . [[stream]] .
-
Assert: stream is not undefined.
-
Return ! WritableStreamClose ( stream ).
-
Let stream be writer . [[stream]] .
-
Assert: stream is not undefined.
-
Let state be stream . [[state]] .
-
If ! WritableStreamCloseQueuedOrInFlight ( stream ) is true or state is "
closed
", return a promise resolved with undefined. -
If state is "
errored
", return a promise rejected with stream . [[storedError]] . -
Assert: state is "
writable
" or "erroring
". -
Return ! WritableStreamDefaultWriterClose ( writer ).
This
abstract
operation
helps
implement
the
error
propagation
semantics
of
ReadableStream
's
pipeTo()
.
-
If writer . [[closedPromise]] .[[PromiseState]] is "
pending
", reject writer . [[closedPromise]] with error . -
Otherwise, set writer . [[closedPromise]] to a promise rejected with error .
-
Set writer . [[closedPromise]] .[[PromiseIsHandled]] to true.
-
If writer . [[readyPromise]] .[[PromiseState]] is "
pending
", reject writer . [[readyPromise]] with error . -
Otherwise, set writer . [[readyPromise]] to a promise rejected with error .
-
Set writer . [[readyPromise]] .[[PromiseIsHandled]] to true.
-
Let stream be writer . [[stream]] .
-
Let state be stream . [[state]] .
-
If state is "
errored
" or "erroring
", return null. -
If state is "
closed
", return 0. -
Return ! WritableStreamDefaultControllerGetDesiredSize ( stream . [[controller]] ).
-
Let stream be writer . [[stream]] .
-
Assert: stream is not undefined.
-
Assert: stream . [[writer]] is writer .
-
Let releasedError be a new
TypeError
. -
Perform ! WritableStreamDefaultWriterEnsureReadyPromiseRejected ( writer , releasedError ).
-
Perform ! WritableStreamDefaultWriterEnsureClosedPromiseRejected ( writer , releasedError ).
-
Set stream . [[writer]] to undefined.
-
Set writer . [[stream]] to undefined.
-
Let stream be writer . [[stream]] .
-
Assert: stream is not undefined.
-
Let controller be stream . [[controller]] .
-
Let chunkSize be ! WritableStreamDefaultControllerGetChunkSize ( controller , chunk ).
-
If stream is not equal to writer . [[stream]] , return a promise rejected with a
TypeError
exception. -
Let state be stream . [[state]] .
-
If state is "
errored
", return a promise rejected with stream . [[storedError]] . -
If ! WritableStreamCloseQueuedOrInFlight ( stream ) is true or state is "
closed
", return a promise rejected with aTypeError
exception indicating that the stream is closing or closed. -
If state is "
erroring
", return a promise rejected with stream . [[storedError]] . -
Assert: state is "
writable
". -
Let promise be ! WritableStreamAddWriteRequest ( stream ).
-
Perform ! WritableStreamDefaultControllerWrite ( controller , chunk , chunkSize ).
-
Return promise .
5.5.4. Default controllers
The
following
abstract
operations
support
the
implementation
of
the
WritableStreamDefaultController
class.
-
Assert: stream implements
WritableStream
. -
Assert: stream . [[controller]] is undefined.
-
Set controller . [[stream]] to stream .
-
Set stream . [[controller]] to controller .
-
Perform ! ResetQueue ( controller ).
-
Set controller . [[abortController]] to a new
AbortController
. -
Set controller . [[started]] to false.
-
Set controller . [[strategySizeAlgorithm]] to sizeAlgorithm .
-
Set controller . [[strategyHWM]] to highWaterMark .
-
Set controller . [[writeAlgorithm]] to writeAlgorithm .
-
Set controller . [[closeAlgorithm]] to closeAlgorithm .
-
Set controller . [[abortAlgorithm]] to abortAlgorithm .
-
Let backpressure be ! WritableStreamDefaultControllerGetBackpressure ( controller ).
-
Perform ! WritableStreamUpdateBackpressure ( stream , backpressure ).
-
Let startResult be the result of performing startAlgorithm . (This may throw an exception.)
-
Let startPromise be a promise resolved with startResult .
-
Upon fulfillment of startPromise ,
-
Assert: stream . [[state]] is "
writable
" or "erroring
". -
Set controller . [[started]] to true.
-
Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded ( controller ).
-
-
Upon rejection of startPromise with reason r ,
-
Assert: stream . [[state]] is "
writable
" or "erroring
". -
Set controller . [[started]] to true.
-
Perform ! WritableStreamDealWithRejection ( stream , r ).
-
-
Let controller be a new
WritableStreamDefaultController
. -
Let startAlgorithm be an algorithm that returns undefined.
-
Let writeAlgorithm be an algorithm that returns a promise resolved with undefined.
-
Let closeAlgorithm be an algorithm that returns a promise resolved with undefined.
-
Let abortAlgorithm be an algorithm that returns a promise resolved with undefined.
-
If underlyingSinkDict ["
start
"] exists , then set startAlgorithm to an algorithm which returns the result of invoking underlyingSinkDict ["start
"] with argument list « controller » and callback this value underlyingSink . -
If underlyingSinkDict ["
write
"] exists , then set writeAlgorithm to an algorithm which takes an argument chunk and returns the result of invoking underlyingSinkDict ["write
"] with argument list « chunk , controller » and callback this value underlyingSink . -
If underlyingSinkDict ["
close
"] exists , then set closeAlgorithm to an algorithm which returns the result of invoking underlyingSinkDict ["close
"] with argument list «» and callback this value underlyingSink . -
If underlyingSinkDict ["
abort
"] exists , then set abortAlgorithm to an algorithm which takes an argument reason and returns the result of invoking underlyingSinkDict ["abort
"] with argument list « reason » and callback this value underlyingSink . -
Perform ? SetUpWritableStreamDefaultController ( stream , controller , startAlgorithm , writeAlgorithm , closeAlgorithm , abortAlgorithm , highWaterMark , sizeAlgorithm ).
-
Let stream be controller . [[stream]] .
-
If controller . [[started]] is false, return.
-
If stream . [[inFlightWriteRequest]] is not undefined, return.
-
Let state be stream . [[state]] .
-
Assert: state is not "
closed
" or "errored
". -
If state is "
erroring
",-
Perform ! WritableStreamFinishErroring ( stream ).
-
Return.
-
-
If controller . [[queue]] is empty, return.
-
Let value be ! PeekQueueValue ( controller ).
-
If value is the close sentinel , perform ! WritableStreamDefaultControllerProcessClose ( controller ).
-
Otherwise, perform ! WritableStreamDefaultControllerProcessWrite ( controller , value ).
WritableStream
itself
is
still
referenced.
This is observable using weak references . See tc39/proposal-weakrefs#31 for more detail.
It performs the following steps:
-
Set controller . [[writeAlgorithm]] to undefined.
-
Set controller . [[closeAlgorithm]] to undefined.
-
Set controller . [[abortAlgorithm]] to undefined.
-
Set controller . [[strategySizeAlgorithm]] to undefined.
This algorithm will be performed multiple times in some edge cases. After the first time it will do nothing.
-
Perform ! EnqueueValueWithSize ( controller , close sentinel , 0).
-
Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded ( controller ).
-
Let stream be controller . [[stream]] .
-
Assert: stream . [[state]] is "
writable
". -
Perform ! WritableStreamDefaultControllerClearAlgorithms ( controller ).
-
Perform ! WritableStreamStartErroring ( stream , error ).
-
If controller . [[stream]] . [[state]] is "
writable
", perform ! WritableStreamDefaultControllerError ( controller , error ).
-
Let desiredSize be ! WritableStreamDefaultControllerGetDesiredSize ( controller ).
-
Return true if desiredSize ≤ 0, or false otherwise.
-
Let returnValue be the result of performing controller . [[strategySizeAlgorithm]] , passing in chunk , and interpreting the result as a completion record .
-
If returnValue is an abrupt completion,
-
Perform ! WritableStreamDefaultControllerErrorIfNeeded ( controller , returnValue .[[Value]]).
-
Return 1.
-
-
Return returnValue .[[Value]].
-
Return controller . [[strategyHWM]] − controller . [[queueTotalSize]] .
-
Let stream be controller . [[stream]] .
-
Perform ! WritableStreamMarkCloseRequestInFlight ( stream ).
-
Perform ! DequeueValue ( controller ).
-
Assert: controller . [[queue]] is empty.
-
Let sinkClosePromise be the result of performing controller . [[closeAlgorithm]] .
-
Perform ! WritableStreamDefaultControllerClearAlgorithms ( controller ).
-
Upon fulfillment of sinkClosePromise ,
-
Perform ! WritableStreamFinishInFlightClose ( stream ).
-
-
Upon rejection of sinkClosePromise with reason reason ,
-
Perform ! WritableStreamFinishInFlightCloseWithError ( stream , reason ).
-
-
Let stream be controller . [[stream]] .
-
Perform ! WritableStreamMarkFirstWriteRequestInFlight ( stream ).
-
Let sinkWritePromise be the result of performing controller . [[writeAlgorithm]] , passing in chunk .
-
Upon fulfillment of sinkWritePromise ,
-
Perform ! WritableStreamFinishInFlightWrite ( stream ).
-
Let state be stream . [[state]] .
-
Assert: state is "
writable
" or "erroring
". -
Perform ! DequeueValue ( controller ).
-
If ! WritableStreamCloseQueuedOrInFlight ( stream ) is false and state is "
writable
",-
Let backpressure be ! WritableStreamDefaultControllerGetBackpressure ( controller ).
-
Perform ! WritableStreamUpdateBackpressure ( stream , backpressure ).
-
-
Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded ( controller ).
-
-
Upon rejection of sinkWritePromise with reason ,
-
If stream . [[state]] is "
writable
", perform ! WritableStreamDefaultControllerClearAlgorithms ( controller ). -
Perform ! WritableStreamFinishInFlightWriteWithError ( stream , reason ).
-
-
Let enqueueResult be EnqueueValueWithSize ( controller , chunk , chunkSize ).
-
If enqueueResult is an abrupt completion,
-
Perform ! WritableStreamDefaultControllerErrorIfNeeded ( controller , enqueueResult .[[Value]]).
-
Return.
-
-
Let stream be controller . [[stream]] .
-
If ! WritableStreamCloseQueuedOrInFlight ( stream ) is false and stream . [[state]] is "
writable
",-
Let backpressure be ! WritableStreamDefaultControllerGetBackpressure ( controller ).
-
Perform ! WritableStreamUpdateBackpressure ( stream , backpressure ).
-
-
Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded ( controller ).
6. Transform streams
6.1. Using transform streams
readableStream. pipeThrough( transformStream) . pipeTo( writableStream) . then(() => console. log( "All data successfully transformed!" )) . catch ( e=> console. error( "Something went wrong!" , e));
readable
and
writable
properties
of
a
transform
stream
directly
to
access
the
usual
interfaces
of
a
readable
stream
and
writable
stream
.
In
this
example
we
supply
data
to
the
writable
side
of
the
stream
using
its
writer
interface.
The
readable
side
is
then
piped
to
anotherWritableStream
.
const writer= transformStream. writable. getWriter(); writer. write( "input chunk" ); transformStream. readable. pipeTo( anotherWritableStream);
fetch()
API
accepts
a
readable
stream
request
body
,
but
it
can
be
more
convenient
to
write
data
for
uploading
via
a
writable
stream
interface.
Using
an
identity
transform
stream
addresses
this:
const { writable, readable} = new TransformStream(); fetch( "..." , { body: readable}). then( response=> /* ... */ ); const writer= writable. getWriter(); writer. write( new Uint8Array([ 0x73 , 0x74 , 0x72 , 0x65 , 0x61 , 0x6D , 0x73 , 0x21 ])); writer. close();
Another
use
of
identity
transform
streams
is
to
add
additional
buffering
to
a
pipe
.
In
this
example
we
add
extra
buffering
between
readableStream
and
writableStream
.
const writableStrategy= new ByteLengthQueuingStrategy({ highWaterMark: 1024 * 1024 }); readableStream. pipeThrough( new TransformStream( undefined , writableStrategy)) . pipeTo( writableStream);
6.2.
The
TransformStream
class
The
TransformStream
class
is
a
concrete
instance
of
the
general
transform
stream
concept.
6.2.1. Interface definition
The
Web
IDL
definition
for
the
TransformStream
class
is
given
as
follows:
[Exposed=*,Transferable ]interface {
TransformStream constructor (optional object ,
transformer optional QueuingStrategy = {},
writableStrategy optional QueuingStrategy = {});
readableStrategy ;readonly attribute ReadableStream readable ;readonly attribute WritableStream writable ; };
6.2.2. Internal slots
Instances
of
TransformStream
are
created
with
the
internal
slots
described
in
the
following
table:
Internal Slot | Description ( non-normative ) |
---|---|
[[backpressure]] | Whether there was backpressure on [[readable]] the last time it was observed |
[[backpressureChangePromise]] | A promise which is fulfilled and replaced every time the value of [[backpressure]] changes |
[[controller]] |
A
TransformStreamDefaultController
created
with
the
ability
to
control
[[readable]]
and
[[writable]]
|
[[Detached]] | A boolean flag set to true when the stream is transferred |
[[readable]] |
The
ReadableStream
instance
controlled
by
this
object
|
[[writable]] |
The
WritableStream
instance
controlled
by
this
object
|
6.2.3. The transformer API
The
TransformStream()
constructor
accepts
as
its
first
argument
a
JavaScript
object
representing
the
transformer
.
Such
objects
can
contain
any
of
the
following
methods:
dictionary {
Transformer TransformerStartCallback start ;TransformerTransformCallback transform ;TransformerFlushCallback flush ;TransformerCancelCallback cancel ;; ;any readableType ;any writableType ; };);callback =
TransformerStartCallback any (TransformStreamDefaultController );
controller callback =
TransformerFlushCallback Promise <undefined > (TransformStreamDefaultController );
controller ); );callback =
TransformerTransformCallback Promise <undefined > (any ,
chunk TransformStreamDefaultController );
controller callback =
TransformerCancelCallback Promise <undefined > (any );
reason
-
start( controller )
, of type TransformerStartCallback -
A function that is called immediately during creation of the
TransformStream
.Typically this is used to enqueue prefix chunks , using
controller.enqueue()
. Those chunks will be read from the readable side but don’t depend on any writes to the writable side .If this initial process is asynchronous, for example because it takes some effort to acquire the prefix chunks, the function can return a promise to signal success or failure; a rejected promise will error the stream. Any thrown exceptions will be re-thrown by the
TransformStream()
constructor. -
transform( chunk , controller )
, of type TransformerTransformCallback -
A function called when a new chunk originally written to the writable side is ready to be transformed. The stream implementation guarantees that this function will be called only after previous transforms have succeeded, and never before
start()
has completed or afterflush()
has been called.This function performs the actual transformation work of the transform stream. It can enqueue the results using
controller.enqueue()
. This permits a single chunk written to the writable side to result in zero or multiple chunks on the readable side , depending on how many timescontroller.enqueue()
is called. § 10.9 A transform stream that replaces template tags demonstrates this by sometimes enqueuing zero chunks.If the process of transforming is asynchronous, this function can return a promise to signal success or failure of the transformation. A rejected promise will error both the readable and writable sides of the transform stream.
The promise potentially returned by this function is used to ensure that well-behaved producers do not attempt to mutate the chunk before it has been fully transformed. (This is not guaranteed by any specification machinery, but instead is an informal contract between producers and the transformer .)
If no
transform()
method is supplied, the identity transform is used, which enqueues chunks unchanged from the writable side to the readable side. -
flush( controller )
, of type TransformerFlushCallback -
A function called after all chunks written to the writable side have been transformed by successfully passing through
transform()
, and the writable side is about to be closed.Typically this is used to enqueue suffix chunks to the readable side , before that too becomes closed. An example can be seen in § 10.9 A transform stream that replaces template tags .
If the flushing process is asynchronous, the function can return a promise to signal success or failure; the result will be communicated to the caller of
stream.writable.write()
. Additionally, a rejected promise will error both the readable and writable sides of the stream. Throwing an exception is treated the same as returning a rejected promise.(Note that there is no need to call
controller.terminate()
insideflush()
; the stream is already in the process of successfully closing down, and terminating it would be counterproductive.) -
cancel( reason )
, of type TransformerCancelCallback -
A function called when the readable side is cancelled, or when the writable side is aborted.
Typically this is used to clean up underlying transformer resources when the stream is aborted or cancelled.
If the cancellation process is asynchronous, the function can return a promise to signal success or failure; the result will be communicated to the caller of
stream.writable.abort()
orstream.readable.cancel()
. Throwing an exception is treated the same as returning a rejected promise.(Note that there is no need to call
controller.terminate()
insidecancel()
; the stream is already in the process of cancelling/aborting, and terminating it would be counterproductive.) -
readableType
, of type any -
This property is reserved for future use, so any attempts to supply a value will throw an exception.
-
writableType
, of type any -
This property is reserved for future use, so any attempts to supply a value will throw an exception.
The
controller
object
passed
to
start()
,
transform()
,
and
flush()
is
an
instance
of
TransformStreamDefaultController
,
and
has
the
ability
to
enqueue
chunks
to
the
readable
side
,
or
to
terminate
or
error
the
stream.
6.2.4. Constructor and properties
-
stream = new
TransformStream
([ transformer [, writableStrategy [, readableStrategy ]]]) -
Creates a new
TransformStream
wrapping the provided transformer . See § 6.2.3 The transformer API for more details on the transformer argument.If no transformer argument is supplied, then the result will be an identity transform stream . See this example for some cases where that can be useful.
The writableStrategy and readableStrategy arguments are the queuing strategy objects for the writable and readable sides respectively. These are used in the construction of the
WritableStream
andReadableStream
objects and can be used to add buffering to aTransformStream
, in order to smooth out variations in the speed of the transformation, or to increase the amount of buffering in a pipe . If they are not provided, the default behavior will be the same as aCountQueuingStrategy
, with respective high water marks of 1 and 0. -
readable = stream .
readable
-
Returns a
ReadableStream
representing the readable side of this transform stream. -
writable = stream .
writable
-
Returns a
WritableStream
representing the writable side of this transform stream.
new
TransformStream(
transformer
,
writableStrategy
,
readableStrategy
)
constructor
steps
are:
-
If transformer is missing, set it to null.
-
Let transformerDict be transformer , converted to an IDL value of type
Transformer
.We cannot declare the transformer argument as having the
Transformer
type directly, because doing so would lose the reference to the original object. We need to retain the object so we can invoke the various methods on it. -
If transformerDict ["
readableType
"] exists , throw aRangeError
exception. -
If transformerDict ["
writableType
"] exists , throw aRangeError
exception. -
Let readableHighWaterMark be ? ExtractHighWaterMark ( readableStrategy , 0).
-
Let readableSizeAlgorithm be ! ExtractSizeAlgorithm ( readableStrategy ).
-
Let writableHighWaterMark be ? ExtractHighWaterMark ( writableStrategy , 1).
-
Let writableSizeAlgorithm be ! ExtractSizeAlgorithm ( writableStrategy ).
-
Let startPromise be a new promise .
-
Perform ! InitializeTransformStream ( this , startPromise , writableHighWaterMark , writableSizeAlgorithm , readableHighWaterMark , readableSizeAlgorithm ).
-
Perform ? SetUpTransformStreamDefaultControllerFromTransformer ( this , transformer , transformerDict ).
-
If transformerDict ["
start
"] exists , then resolve startPromise with the result of invoking transformerDict ["start
"] with argument list « this . [[controller]] » and callback this value transformer . -
Otherwise, resolve startPromise with undefined.
readable
getter
steps
are:
-
Return this . [[readable]] .
writable
getter
steps
are:
-
Return this . [[writable]] .
6.2.5.
Transfer
via
postMessage()
-
destination.postMessage(ts, { transfer: [ts] });
-
Sends a
TransformStream
to another frame, window, or worker.The transferred stream can be used exactly like the original. Its readable and writable sides will become locked and no longer directly usable.
TransformStream
objects
are
transferable
objects
.
Their
transfer
steps
,
given
value
and
dataHolder
,
are:
-
Let readable be value . [[readable]] .
-
Let writable be value . [[writable]] .
-
If ! IsReadableStreamLocked ( readable ) is true, throw a "
DataCloneError
"DOMException
. -
If ! IsWritableStreamLocked ( writable ) is true, throw a "
DataCloneError
"DOMException
. -
Set dataHolder .[[readable]] to ! StructuredSerializeWithTransfer ( readable , « readable »).
-
Set dataHolder .[[writable]] to ! StructuredSerializeWithTransfer ( writable , « writable »).
-
Let readableRecord be ! StructuredDeserializeWithTransfer ( dataHolder .[[readable]], the current Realm ).
-
Let writableRecord be ! StructuredDeserializeWithTransfer ( dataHolder .[[writable]], the current Realm ).
-
Set value . [[readable]] to readableRecord .[[Deserialized]].
-
Set value . [[writable]] to writableRecord .[[Deserialized]].
-
Set value . [[backpressure]] , value . [[backpressureChangePromise]] , and value . [[controller]] to undefined.
The
[[backpressure]]
,
[[backpressureChangePromise]]
,
and
[[controller]]
slots
are
not
used
in
a
transferred
TransformStream
.
6.3.
The
TransformStreamDefaultController
class
The
TransformStreamDefaultController
class
has
methods
that
allow
manipulation
of
the
associated
ReadableStream
and
WritableStream
.
When
constructing
a
TransformStream
,
the
transformer
object
is
given
a
corresponding
TransformStreamDefaultController
instance
to
manipulate.
6.3.1. Interface definition
The
Web
IDL
definition
for
the
TransformStreamDefaultController
class
is
given
as
follows:
[Exposed=*]interface {
TransformStreamDefaultController readonly attribute unrestricted double ?desiredSize ;); );undefined enqueue (optional any );
chunk undefined error (optional any );
reason undefined terminate (); };
6.3.2. Internal slots
Instances
of
TransformStreamDefaultController
are
created
with
the
internal
slots
described
in
the
following
table:
Internal Slot | Description ( non-normative ) |
---|---|
[[cancelAlgorithm]] | A promise-returning algorithm, taking one argument (the reason for cancellation), which communicates a requested cancellation to the transformer |
[[finishPromise]] | A promise which resolves on completion of either the [[cancelAlgorithm]] or the [[flushAlgorithm]] . If this field is unpopulated (that is, undefined), then neither of those algorithms have been invoked yet |
[[flushAlgorithm]] | A promise-returning algorithm which communicates a requested close to the transformer |
[[stream]] |
The
TransformStream
instance
controlled
|
[[transformAlgorithm]] | A promise-returning algorithm, taking one argument (the chunk to transform), which requests the transformer perform its transformation |
6.3.3. Methods and properties
-
desiredSize = controller .
desiredSize
-
Returns the desired size to fill the readable side’s internal queue . It can be negative, if the queue is over-full.
-
controller .
enqueue
( chunk ) -
Enqueues the given chunk chunk in the readable side of the controlled transform stream.
-
controller .
error
( e ) -
Errors both the readable side and the writable side of the controlled transform stream, making all future interactions with it fail with the given error e . Any chunks queued for transformation will be discarded.
-
controller .
terminate
() -
Closes the readable side and errors the writable side of the controlled transform stream. This is useful when the transformer only needs to consume a portion of the chunks written to the writable side .
desiredSize
getter
steps
are:
-
Let readableController be this . [[stream]] . [[readable]] . [[controller]] .
-
Return ! ReadableStreamDefaultControllerGetDesiredSize ( readableController ).
enqueue(
chunk
)
method
steps
are:
-
Perform ? TransformStreamDefaultControllerEnqueue ( this , chunk ).
error(
e
)
method
steps
are:
-
Perform ? TransformStreamDefaultControllerError ( this , e ).
terminate()
method
steps
are:
-
Perform ? TransformStreamDefaultControllerTerminate ( this ).
6.4. Abstract operations
6.4.1. Working with transform streams
The
following
abstract
operations
operate
on
TransformStream
instances
at
a
higher
level.
-
Let startAlgorithm be an algorithm that returns startPromise .
-
Let writeAlgorithm be the following steps, taking a chunk argument:
-
Return ! TransformStreamDefaultSinkWriteAlgorithm ( stream , chunk ).
-
-
Let abortAlgorithm be the following steps, taking a reason argument:
-
Return ! TransformStreamDefaultSinkAbortAlgorithm ( stream , reason ).
-
-
Let closeAlgorithm be the following steps:
-
Return ! TransformStreamDefaultSinkCloseAlgorithm ( stream ).
-
-
Set stream . [[writable]] to ! CreateWritableStream ( startAlgorithm , writeAlgorithm , closeAlgorithm , abortAlgorithm , writableHighWaterMark , writableSizeAlgorithm ).
-
Let pullAlgorithm be the following steps:
-
Return ! TransformStreamDefaultSourcePullAlgorithm ( stream ).
-
-
Let cancelAlgorithm be the following steps, taking a reason argument:
-
Return ! TransformStreamDefaultSourceCancelAlgorithm ( stream , reason ).
-
-
Set stream . [[readable]] to ! CreateReadableStream ( startAlgorithm , pullAlgorithm , cancelAlgorithm , readableHighWaterMark , readableSizeAlgorithm ).
-
Set stream . [[backpressure]] and stream . [[backpressureChangePromise]] to undefined.
The [[backpressure]] slot is set to undefined so that it can be initialized by TransformStreamSetBackpressure . Alternatively, implementations can use a strictly boolean value for [[backpressure]] and change the way it is initialized. This will not be visible to user code so long as the initialization is correctly completed before the transformer’s
start()
method is called. -
Perform ! TransformStreamSetBackpressure ( stream , true).
-
Set stream . [[controller]] to undefined.
-
Perform ! ReadableStreamDefaultControllerError ( stream . [[readable]] . [[controller]] , e ).
-
Perform ! TransformStreamErrorWritableAndUnblockWrite ( stream , e ).
This operation works correctly when one or both sides are already errored. As a result, calling algorithms do not need to check stream states when responding to an error condition.
-
Perform ! TransformStreamDefaultControllerClearAlgorithms ( stream . [[controller]] ).
-
Perform ! WritableStreamDefaultControllerErrorIfNeeded ( stream . [[writable]] . [[controller]] , e ).
-
Perform ! TransformStreamUnblockWrite ( stream ).
-
Assert: stream . [[backpressure]] is not backpressure .
-
If stream . [[backpressureChangePromise]] is not undefined, resolve stream. [[backpressureChangePromise]] with undefined.
-
Set stream . [[backpressureChangePromise]] to a new promise .
-
Set stream . [[backpressure]] to backpressure .
-
If stream . [[backpressure]] is true, perform ! TransformStreamSetBackpressure ( stream , false).
The TransformStreamDefaultSinkWriteAlgorithm abstract operation could be waiting for the promise stored in the [[backpressureChangePromise]] slot to resolve. The call to TransformStreamSetBackpressure ensures that the promise always resolves.
6.4.2. Default controllers
The
following
abstract
operations
support
the
implementaiton
of
the
TransformStreamDefaultController
class.
-
Assert: stream implements
TransformStream
. -
Assert: stream . [[controller]] is undefined.
-
Set controller . [[stream]] to stream .
-
Set stream . [[controller]] to controller .
-
Set controller . [[transformAlgorithm]] to transformAlgorithm .
-
Set controller . [[flushAlgorithm]] to flushAlgorithm .
-
Set controller . [[cancelAlgorithm]] to cancelAlgorithm .
-
Let controller be a new
TransformStreamDefaultController
. -
Let transformAlgorithm be the following steps, taking a chunk argument:
-
Let result be TransformStreamDefaultControllerEnqueue ( controller , chunk ).
-
If result is an abrupt completion, return a promise rejected with result .[[Value]].
-
Otherwise, return a promise resolved with undefined.
-
-
Let flushAlgorithm be an algorithm which returns a promise resolved with undefined.
-
Let cancelAlgorithm be an algorithm which returns a promise resolved with undefined.
-
If transformerDict ["
transform
"] exists , set transformAlgorithm to an algorithm which takes an argument chunk and returns the result of invoking transformerDict ["transform
"] with argument list « chunk , controller » and callback this value transformer . -
If transformerDict ["
flush
"] exists , set flushAlgorithm to an algorithm which returns the result of invoking transformerDict ["flush
"] with argument list « controller » and callback this value transformer . -
If transformerDict ["
cancel
"] exists , set cancelAlgorithm to an algorithm which takes an argument reason and returns the result of invoking transformerDict ["cancel
"] with argument list « reason » and callback this value transformer . -
Perform ! SetUpTransformStreamDefaultController ( stream , controller , transformAlgorithm , flushAlgorithm , cancelAlgorithm ).
TransformStream
itself
is
still
referenced.
This is observable using weak references . See tc39/proposal-weakrefs#31 for more detail.
It performs the following steps:
-
Set controller . [[transformAlgorithm]] to undefined.
-
Set controller . [[flushAlgorithm]] to undefined.
-
Set controller . [[cancelAlgorithm]] to undefined.
-
Let stream be controller . [[stream]] .
-
Let readableController be stream . [[readable]] . [[controller]] .
-
If ! ReadableStreamDefaultControllerCanCloseOrEnqueue ( readableController ) is false, throw a
TypeError
exception. -
Let enqueueResult be ReadableStreamDefaultControllerEnqueue ( readableController , chunk ).
-
If enqueueResult is an abrupt completion,
-
Perform ! TransformStreamErrorWritableAndUnblockWrite ( stream , enqueueResult .[[Value]]).
-
Throw stream . [[readable]] . [[storedError]] .
-
-
Let backpressure be ! ReadableStreamDefaultControllerHasBackpressure ( readableController ).
-
If backpressure is not stream . [[backpressure]] ,
-
Assert: backpressure is true.
-
Perform ! TransformStreamSetBackpressure ( stream , true).
-
-
Perform ! TransformStreamError ( controller . [[stream]] , e ).
-
Let transformPromise be the result of performing controller . [[transformAlgorithm]] , passing chunk .
-
Return the result of reacting to transformPromise with the following rejection steps given the argument r :
-
Perform ! TransformStreamError ( controller . [[stream]] , r ).
-
Throw r .
-
-
Let stream be controller . [[stream]] .
-
Let readableController be stream . [[readable]] . [[controller]] .
-
Perform ! ReadableStreamDefaultControllerClose ( readableController ).
-
Let error be a
TypeError
exception indicating that the stream has been terminated. -
Perform ! TransformStreamErrorWritableAndUnblockWrite ( stream , error ).
6.4.3. Default sinks
The following abstract operations are used to implement the underlying sink for the writable side of transform streams .
-
Assert: stream . [[writable]] . [[state]] is "
writable
". -
Let controller be stream . [[controller]] .
-
If stream . [[backpressure]] is true,
-
Let backpressureChangePromise be stream . [[backpressureChangePromise]] .
-
Assert: backpressureChangePromise is not undefined.
-
Return the result of reacting to backpressureChangePromise with the following fulfillment steps:
-
Let writable be stream . [[writable]] .
-
Let state be writable . [[state]] .
-
If state is "
erroring
", throw writable . [[storedError]] . -
Assert: state is "
writable
". -
Return ! TransformStreamDefaultControllerPerformTransform ( controller , chunk ).
-
-
-
Return ! TransformStreamDefaultControllerPerformTransform ( controller , chunk ).
-
Let controller be stream . [[controller]] .
-
If controller . [[finishPromise]] is not undefined, return controller . [[finishPromise]] .
-
Let readable be stream . [[readable]] .
-
Let controller . [[finishPromise]] be a new promise.
-
Let cancelPromise be the result of performing controller . [[cancelAlgorithm]] , passing reason .
-
Perform ! TransformStreamDefaultControllerClearAlgorithms ( controller ).
-
React to cancelPromise :
-
If cancelPromise was fulfilled, then:
-
If readable . [[state]] is "
errored
", reject controller . [[finishPromise]] with readable . [[storedError]] . -
Otherwise:
-
Perform ! ReadableStreamDefaultControllerError ( readable . [[controller]] , reason ).
-
Resolve controller . [[finishPromise]] with undefined.
-
-
-
If cancelPromise was rejected with reason r , then:
-
Perform ! ReadableStreamDefaultControllerError ( readable . [[controller]] , r ).
-
Reject controller . [[finishPromise]] with r .
-
-
-
Return controller . [[finishPromise]] .
-
Let controller be stream . [[controller]] .
-
If controller . [[finishPromise]] is not undefined, return controller . [[finishPromise]] .
-
Let readable be stream . [[readable]] .
-
Let controller . [[finishPromise]] be a new promise.
-
Let flushPromise be the result of performing controller . [[flushAlgorithm]] .
-
Perform ! TransformStreamDefaultControllerClearAlgorithms ( controller ).
-
React to flushPromise :
-
If flushPromise was fulfilled, then:
-
If readable . [[state]] is "
errored
", reject controller . [[finishPromise]] with readable . [[storedError]] . -
Otherwise:
-
Perform ! ReadableStreamDefaultControllerClose ( readable . [[controller]] ).
-
Resolve controller . [[finishPromise]] with undefined.
-
-
-
If flushPromise was rejected with reason r , then:
-
Perform ! ReadableStreamDefaultControllerError ( readable . [[controller]] , r ).
-
Reject controller . [[finishPromise]] with r .
-
-
-
Return controller . [[finishPromise]] .
6.4.4. Default sources
The following abstract operation is used to implement the underlying source for the readable side of transform streams .
-
Let controller be stream . [[controller]] .
-
If controller . [[finishPromise]] is not undefined, return controller . [[finishPromise]] .
-
Let writable be stream . [[writable]] .
-
Let controller . [[finishPromise]] be a new promise.
-
Let cancelPromise be the result of performing controller . [[cancelAlgorithm]] , passing reason .
-
Perform ! TransformStreamDefaultControllerClearAlgorithms ( controller ).
-
React to cancelPromise :
-
If cancelPromise was fulfilled, then:
-
If writable . [[state]] is "
errored
", reject controller . [[finishPromise]] with writable . [[storedError]] . -
Otherwise:
-
Perform ! WritableStreamDefaultControllerErrorIfNeeded ( writable . [[controller]] , reason ).
-
Perform ! TransformStreamUnblockWrite ( stream ).
-
Resolve controller . [[finishPromise]] with undefined.
-
-
-
If cancelPromise was rejected with reason r , then:
-
Perform ! WritableStreamDefaultControllerErrorIfNeeded ( writable . [[controller]] , r ).
-
Perform ! TransformStreamUnblockWrite ( stream ).
-
Reject controller . [[finishPromise]] with r .
-
-
-
Return controller . [[finishPromise]] .
-
Assert: stream . [[backpressure]] is true.
-
Assert: stream . [[backpressureChangePromise]] is not undefined.
-
Perform ! TransformStreamSetBackpressure ( stream , false).
-
Return stream . [[backpressureChangePromise]] .
7. Queuing strategies
7.1. The queuing strategy API
The
ReadableStream()
,
WritableStream()
,
and
TransformStream()
constructors
all
accept
at
least
one
argument
representing
an
appropriate
queuing
strategy
for
the
stream
being
created.
Such
objects
contain
the
following
properties:
dictionary {
QueuingStrategy unrestricted double highWaterMark ;QueuingStrategySize size ; };);callback =
QueuingStrategySize unrestricted double (any );
chunk
-
highWaterMark
, of type unrestricted double -
A non-negative number indicating the high water mark of the stream using this queuing strategy.
-
size( chunk )
(non-byte streams only), of type QueuingStrategySize -
A function that computes and returns the finite non-negative size of the given chunk value.
The result is used to determine backpressure , manifesting via the appropriate
desiredSize
property: eitherdefaultController.desiredSize
,byteController.desiredSize
, orwriter.desiredSize
, depending on where the queuing strategy is being used. For readable streams, it also governs when the underlying source 'spull()
method is called.This function has to be idempotent and not cause side effects; very strange results can occur otherwise.
For readable byte streams , this function is not used, as chunks are always measured in bytes.
Any
object
with
these
properties
can
be
used
when
a
queuing
strategy
object
is
expected.
However,
we
provide
two
built-in
queuing
strategy
classes
that
provide
a
common
vocabulary
for
certain
cases:
ByteLengthQueuingStrategy
and
CountQueuingStrategy
.
They
both
make
use
of
the
following
Web
IDL
fragment
for
their
constructors:
dictionary {
QueuingStrategyInit required unrestricted double ; };
highWaterMark
7.2.
The
ByteLengthQueuingStrategy
class
A
common
queuing
strategy
when
dealing
with
bytes
is
to
wait
until
the
accumulated
byteLength
properties
of
the
incoming
chunks
reaches
a
specified
high-water
mark.
As
such,
this
is
provided
as
a
built-in
queuing
strategy
that
can
be
used
when
constructing
streams.
const stream= new ReadableStream( { ... }, new ByteLengthQueuingStrategy({ highWaterMark: 16 * 1024 }) );
In this case, 16 KiB worth of chunks can be enqueued by the readable stream’s underlying source before the readable stream implementation starts sending backpressure signals to the underlying source.
const stream= new WritableStream( { ... }, new ByteLengthQueuingStrategy({ highWaterMark: 32 * 1024 }) );
In this case, 32 KiB worth of chunks can be accumulated in the writable stream’s internal queue, waiting for previous writes to the underlying sink to finish, before the writable stream starts sending backpressure signals to any producers .
It
is
not
necessary
to
use
ByteLengthQueuingStrategy
with
readable
byte
streams
,
as
they
always
measure
chunks
in
bytes.
Attempting
to
construct
a
byte
stream
with
a
ByteLengthQueuingStrategy
will
fail.
7.2.1. Interface definition
The
Web
IDL
definition
for
the
ByteLengthQueuingStrategy
class
is
given
as
follows:
[Exposed=*]interface {
ByteLengthQueuingStrategy constructor (QueuingStrategyInit );
init readonly attribute unrestricted double highWaterMark ;readonly attribute Function size ; };
7.2.2. Internal slots
Instances
of
ByteLengthQueuingStrategy
have
a
[[highWaterMark]]
internal
slot,
storing
the
value
given
in
the
constructor.
Function
whose
value
must
be
initialized
as
follows:
-
Let steps be the following steps, given chunk :
-
Return ? GetV ( chunk , "
byteLength
").
-
-
Let F be ! CreateBuiltinFunction ( steps , 1, "
size
", « », globalObject ’s relevant Realm ). -
Set globalObject ’s byte length queuing strategy size function to a
Function
that represents a reference to F , with callback context equal to globalObject ’s relevant settings object .
This
design
is
somewhat
historical.
It
is
motivated
by
the
desire
to
ensure
that
size
is
a
function,
not
a
method,
i.e.
it
does
not
check
its
this
value.
See
whatwg/streams#1005
and
heycam/webidl#819
for
more
background.
7.2.3. Constructor and properties
-
strategy = new
ByteLengthQueuingStrategy
({highWaterMark
}) -
Creates a new
ByteLengthQueuingStrategy
with the provided high water mark .Note that the provided high water mark will not be validated ahead of time. Instead, if it is negative, NaN, or not a number, the resulting
ByteLengthQueuingStrategy
will cause the corresponding stream constructor to throw. -
highWaterMark = strategy .
highWaterMark
-
Returns the high water mark provided to the constructor.
-
strategy .
size
( chunk ) -
Measures the size of chunk by returning the value of its
byteLength
property.
new
ByteLengthQueuingStrategy(
init
)
constructor
steps
are:
-
Set this . [[highWaterMark]] to init ["
highWaterMark
"].
highWaterMark
getter
steps
are:
-
Return this . [[highWaterMark]] .
size
getter
steps
are:
-
Return this 's relevant global object 's byte length queuing strategy size function .
7.3.
The
CountQueuingStrategy
class
A common queuing strategy when dealing with streams of generic objects is to simply count the number of chunks that have been accumulated so far, waiting until this number reaches a specified high-water mark. As such, this strategy is also provided out of the box.
const stream= new ReadableStream( { ... }, new CountQueuingStrategy({ highWaterMark: 10 }) );
In this case, 10 chunks (of any kind) can be enqueued by the readable stream’s underlying source before the readable stream implementation starts sending backpressure signals to the underlying source.
const stream= new WritableStream( { ... }, new CountQueuingStrategy({ highWaterMark: 5 }) );
In this case, five chunks (of any kind) can be accumulated in the writable stream’s internal queue, waiting for previous writes to the underlying sink to finish, before the writable stream starts sending backpressure signals to any producers .
7.3.1. Interface definition
The
Web
IDL
definition
for
the
CountQueuingStrategy
class
is
given
as
follows:
[Exposed=*]interface {
CountQueuingStrategy constructor (QueuingStrategyInit );
init readonly attribute unrestricted double highWaterMark ;readonly attribute Function size ; };
7.3.2. Internal slots
Instances
of
CountQueuingStrategy
have
a
[[highWaterMark]]
internal
slot,
storing
the
value
given
in
the
constructor.
Function
whose
value
must
be
initialized
as
follows:
-
Let steps be the following steps:
-
Return 1.
-
-
Let F be ! CreateBuiltinFunction ( steps , 0, "
size
", « », globalObject ’s relevant Realm ). -
Set globalObject ’s count queuing strategy size function to a
Function
that represents a reference to F , with callback context equal to globalObject ’s relevant settings object .
This
design
is
somewhat
historical.
It
is
motivated
by
the
desire
to
ensure
that
size
is
a
function,
not
a
method,
i.e.
it
does
not
check
its
this
value.
See
whatwg/streams#1005
and
heycam/webidl#819
for
more
background.
7.3.3. Constructor and properties
-
strategy = new
CountQueuingStrategy
({highWaterMark
}) -
Creates a new
CountQueuingStrategy
with the provided high water mark .Note that the provided high water mark will not be validated ahead of time. Instead, if it is negative, NaN, or not a number, the resulting
CountQueuingStrategy
will cause the corresponding stream constructor to throw. -
highWaterMark = strategy .
highWaterMark
-
Returns the high water mark provided to the constructor.
-
strategy .
size
( chunk ) -
Measures the size of chunk by always returning 1. This ensures that the total queue size is a count of the number of chunks in the queue.
new
CountQueuingStrategy(
init
)
constructor
steps
are:
-
Set this . [[highWaterMark]] to init ["
highWaterMark
"].
highWaterMark
getter
steps
are:
-
Return this . [[highWaterMark]] .
size
getter
steps
are:
-
Return this 's relevant global object 's count queuing strategy size function .
7.4. Abstract operations
The
following
algorithms
are
used
by
the
stream
constructors
to
extract
the
relevant
pieces
from
a
QueuingStrategy
dictionary.
-
If strategy ["
highWaterMark
"] does not exist , return defaultHWM . -
Let highWaterMark be strategy ["
highWaterMark
"]. -
If highWaterMark is NaN or highWaterMark < 0, throw a
RangeError
exception. -
Return highWaterMark .
+∞ is explicitly allowed as a valid high water mark . It causes backpressure to never be applied.
8. Supporting abstract operations
The following abstract operations each support the implementation of more than one type of stream, and as such are not grouped under the major sections above.
8.1. Queue-with-sizes
The
streams
in
this
specification
use
a
"queue-with-sizes"
data
structure
to
store
queued
up
values,
along
with
their
determined
sizes.
Various
specification
objects
contain
a
queue-with-sizes,
represented
by
the
object
having
two
paired
internal
slots,
always
named
[[queue]]
and
[[queueTotalSize]].
[[queue]]
is
a
list
of
value-with-sizes
,
and
[[queueTotalSize]]
is
a
JavaScript
Number
,
i.e.
a
double-precision
floating
point
number.
The following abstract operations are used when operating on objects that contain queues-with-sizes, in order to ensure that the two internal slots stay synchronized.
Due to the limited precision of floating-point arithmetic, the framework specified here, of keeping a running total in the [[queueTotalSize]] slot, is not equivalent to adding up the size of all chunks in [[queue]]. (However, this only makes a difference when there is a huge (~10 15 ) variance in size between chunks, or when trillions of chunks are enqueued.)
In what follows, a value-with-size is a struct with the two items value and size .
-
Assert: container has [[queue]] and [[queueTotalSize]] internal slots.
-
Assert: container .[[queue]] is not empty .
-
Let valueWithSize be container .[[queue]][0].
-
Remove valueWithSize from container .[[queue]].
-
Set container .[[queueTotalSize]] to container .[[queueTotalSize]] − valueWithSize ’s size .
-
If container .[[queueTotalSize]] < 0, set container .[[queueTotalSize]] to 0. (This can occur due to rounding errors.)
-
Return valueWithSize ’s value .
-
Assert: container has [[queue]] and [[queueTotalSize]] internal slots.
-
If ! IsNonNegativeNumber ( size ) is false, throw a
RangeError
exception. -
If size is +∞, throw a
RangeError
exception. -
Append a new value-with-size with value value and size size to container .[[queue]].
-
Set container .[[queueTotalSize]] to container .[[queueTotalSize]] + size .
-
Assert: container has [[queue]] and [[queueTotalSize]] internal slots.
-
Set container .[[queue]] to a new empty list .
-
Set container .[[queueTotalSize]] to 0.
8.2. Transferable streams
Transferable streams are implemented using a special kind of identity transform which has the writable side in one realm and the readable side in another realm. The following abstract operations are used to implement these "cross-realm transforms".
-
Perform PackAndPostMessage ( port , "
error
", error ), discarding the result.
As we are already in an errored state when this abstract operation is performed, we cannot handle further errors, so we just discard them.
-
Let message be OrdinaryObjectCreate (null).
-
Perform ! CreateDataProperty ( message , "
type
", type ). -
Perform ! CreateDataProperty ( message , "
value
", value ). -
Let targetPort be the port with which port is entangled, if any; otherwise let it be null.
-
Let options be «[ "
transfer
" → « » ]». -
Run the message port post message steps providing targetPort , message , and options .
A
JavaScript
object
is
used
for
transfer
to
avoid
having
to
duplicate
the
message
port
post
message
steps
.
The
prototype
of
the
object
is
set
to
null
to
avoid
interference
from
%Object.prototype%
.
-
Let result be PackAndPostMessage ( port , type , value ).
-
If result is an abrupt completion,
-
Perform ! CrossRealmTransformSendError ( port , result .[[Value]]).
-
-
Return result as a completion record.
-
Perform ! InitializeReadableStream ( stream ).
-
Let controller be a new
ReadableStreamDefaultController
. -
Add a handler for port ’s
message
event with the following steps:-
Let data be the data of the message.
-
Assert: Type ( data ) is Object.
-
Let type be ! Get ( data , "
type
"). -
Let value be ! Get ( data , "
value
"). -
Assert: Type ( type ) is String.
-
If type is "
chunk
",-
Perform ! ReadableStreamDefaultControllerEnqueue ( controller , value ).
-
-
Otherwise, if type is "
close
",-
Perform ! ReadableStreamDefaultControllerClose ( controller ).
-
Disentangle port .
-
-
Otherwise, if type is "
error
",-
Perform ! ReadableStreamDefaultControllerError ( controller , value ).
-
Disentangle port .
-
-
-
Add a handler for port ’s
messageerror
event with the following steps:-
Let error be a new "
DataCloneError
"DOMException
. -
Perform ! CrossRealmTransformSendError ( port , error ).
-
Perform ! ReadableStreamDefaultControllerError ( controller , error ).
-
Disentangle port .
-
-
Enable port ’s port message queue .
-
Let startAlgorithm be an algorithm that returns undefined.
-
Let pullAlgorithm be the following steps:
-
Perform ! PackAndPostMessage ( port , "
pull
", undefined). -
Return a promise resolved with undefined.
-
-
Let cancelAlgorithm be the following steps, taking a reason argument:
-
Let result be PackAndPostMessageHandlingError ( port , "
error
", reason ). -
Disentangle port .
-
If result is an abrupt completion, return a promise rejected with result .[[Value]].
-
Otherwise, return a promise resolved with undefined.
-
-
Let sizeAlgorithm be an algorithm that returns 1.
-
Perform ! SetUpReadableStreamDefaultController ( stream , controller , startAlgorithm , pullAlgorithm , cancelAlgorithm , 0, sizeAlgorithm ).
Implementations are encouraged to explicitly handle failures from the asserts in this algorithm, as the input might come from an untrusted context. Failure to do so could lead to security issues.
-
Perform ! InitializeWritableStream ( stream ).
-
Let controller be a new
WritableStreamDefaultController
. -
Let backpressurePromise be a new promise .
-
Add a handler for port ’s
message
event with the following steps:-
Let data be the data of the message.
-
Assert: Type ( data ) is Object.
-
Let type be ! Get ( data , "
type
"). -
Let value be ! Get ( data , "
value
"). -
Assert: Type ( type ) is String.
-
If type is "
pull
",-
If backpressurePromise is not undefined,
-
Resolve backpressurePromise with undefined.
-
Set backpressurePromise to undefined.
-
-
-
Otherwise, if type is "
error
",-
Perform ! WritableStreamDefaultControllerErrorIfNeeded ( controller , value ).
-
If backpressurePromise is not undefined,
-
Resolve backpressurePromise with undefined.
-
Set backpressurePromise to undefined.
-
-
-
-
Add a handler for port ’s
messageerror
event with the following steps:-
Let error be a new "
DataCloneError
"DOMException
. -
Perform ! CrossRealmTransformSendError ( port , error ).
-
Perform ! WritableStreamDefaultControllerErrorIfNeeded ( controller , error ).
-
Disentangle port .
-
-
Enable port ’s port message queue .
-
Let startAlgorithm be an algorithm that returns undefined.
-
Let writeAlgorithm be the following steps, taking a chunk argument:
-
If backpressurePromise is undefined, set backpressurePromise to a promise resolved with undefined.
-
Return the result of reacting to backpressurePromise with the following fulfillment steps:
-
Set backpressurePromise to a new promise .
-
Let result be PackAndPostMessageHandlingError ( port , "
chunk
", chunk ). -
If result is an abrupt completion,
-
Disentangle port .
-
Return a promise rejected with result .[[Value]].
-
-
Otherwise, return a promise resolved with undefined.
-
-
-
Let closeAlgorithm be the folowing steps:
-
Perform ! PackAndPostMessage ( port , "
close
", undefined). -
Disentangle port .
-
Return a promise resolved with undefined.
-
-
Let abortAlgorithm be the following steps, taking a reason argument:
-
Let result be PackAndPostMessageHandlingError ( port , "
error
", reason ). -
Disentangle port .
-
If result is an abrupt completion, return a promise rejected with result .[[Value]].
-
Otherwise, return a promise resolved with undefined.
-
-
Let sizeAlgorithm be an algorithm that returns 1.
-
Perform ! SetUpWritableStreamDefaultController ( stream , controller , startAlgorithm , writeAlgorithm , closeAlgorithm , abortAlgorithm , 1, sizeAlgorithm ).
Implementations are encouraged to explicitly handle failures from the asserts in this algorithm, as the input might come from an untrusted context. Failure to do so could lead to security issues.
8.3. Miscellaneous
The following abstract operations are a grab-bag of utilities.
-
Assert: Type ( O ) is Object.
-
Assert: O has an [[ArrayBufferData]] internal slot.
-
If ! IsDetachedBuffer ( O ) is true, return false.
-
If SameValue ( O .[[ArrayBufferDetachKey]], undefined) is false, return false.
-
Return true.
-
If Type ( v ) is not Number, return false.
-
If v is NaN, return false.
-
If v < 0, return false.
-
Return true.
-
Assert: ! IsDetachedBuffer ( O ) is false.
-
Let arrayBufferData be O .[[ArrayBufferData]].
-
Let arrayBufferByteLength be O .[[ArrayBufferByteLength]].
-
Perform ? DetachArrayBuffer ( O ).
This will throw an exception if O has an [[ArrayBufferDetachKey]] that is not undefined, such as a
WebAssembly.Memory
'sbuffer
. [WASM-JS-API-1] -
Return a new
ArrayBuffer
object, created in the current Realm , whose [[ArrayBufferData]] internal slot value is arrayBufferData and whose [[ArrayBufferByteLength]] internal slot value is arrayBufferByteLength .
-
Assert: Type ( O ) is Object.
-
Assert: O has an [[ViewedArrayBuffer]] internal slot.
-
Assert: ! IsDetachedBuffer ( O .[[ViewedArrayBuffer]]) is false.
-
Let buffer be ? CloneArrayBuffer ( O .[[ViewedArrayBuffer]], O .[[ByteOffset]], O .[[ByteLength]],
%ArrayBuffer%
). -
Let array be ! Construct (
%Uint8Array%
, « buffer »). -
Return array .
-
Let serialized be ? StructuredSerialize ( v ).
-
Return ? StructuredDeserialize ( serialized , the current Realm ).
9. Using streams in other specifications
Much of this standard concerns itself with the internal machinery of streams. Other specifications generally do not need to worry about these details. Instead, they should interface with this standard via the various IDL types it defines, along with the following definitions.
Specifications should not directly inspect or manipulate the various internal slots defined in this standard. Similarly, they should not use the abstract operations defined here. Such direct usage can break invariants that this standard otherwise maintains.
If your specification wants to interface with streams in a way not supported here, file an issue . This section is intended to grow organically as needed.
9.1. Readable streams
9.1.1. Creation and manipulation
ReadableStream
object
stream
,
given
an
optional
algorithm
pullAlgorithm
,
an
optional
algorithm
cancelAlgorithm
,
an
optional
number
highWaterMark
(default
1),
and
an
optional
algorithm
sizeAlgorithm
,
perform
the
following
steps.
If
given,
pullAlgorithm
and
cancelAlgorithm
may
return
a
promise.
If
given,
sizeAlgorithm
must
be
an
algorithm
accepting
chunk
objects
and
returning
a
number;
and
if
given,
highWaterMark
must
be
a
non-negative,
non-NaN
number.
-
Let startAlgorithm be an algorithm that returns undefined.
-
Let pullAlgorithmWrapper be an algorithm that runs these steps:
-
Let result be the result of running pullAlgorithm , if pullAlgorithm was given, or null otherwise. If this throws an exception e , return a promise rejected with e .
-
If result is a
Promise
, then return result . -
Return a promise resolved with undefined.
-
-
Let cancelAlgorithmWrapper be an algorithm that runs these steps:
-
Let result be the result of running cancelAlgorithm , if cancelAlgorithm was given, or null otherwise. If this throws an exception e , return a promise rejected with e .
-
If result is a
Promise
, then return result . -
Return a promise resolved with undefined.
-
-
If sizeAlgorithm was not given, then set it to an algorithm that returns 1.
-
Perform ! InitializeReadableStream ( stream ).
-
Let controller be a new
ReadableStreamDefaultController
. -
Perform ! SetUpReadableStreamDefaultController ( stream , controller , startAlgorithm , pullAlgorithmWrapper , cancelAlgorithmWrapper , highWaterMark , sizeAlgorithm ).
ReadableStream
object
stream
,
given
an
optional
algorithm
pullAlgorithm
,
an
optional
algorithm
cancelAlgorithm
,
and
an
optional
number
highWaterMark
(default
0),
perform
the
following
steps.
If
given,
pullAlgorithm
and
cancelAlgorithm
may
return
a
promise.
If
given,
highWaterMark
must
be
a
non-negative,
non-NaN
number.
-
Let startAlgorithm be an algorithm that returns undefined.
-
Let pullAlgorithmWrapper be an algorithm that runs these steps:
-
Let result be the result of running pullAlgorithm , if pullAlgorithm was given, or null otherwise. If this throws an exception e , return a promise rejected with e .
-
If result is a
Promise
, then return result . -
Return a promise resolved with undefined.
-
-
Let cancelAlgorithmWrapper be an algorithm that runs these steps:
-
Let result be the result of running cancelAlgorithm , if cancelAlgorithm was given, or null otherwise. If this throws an exception e , return a promise rejected with e .
-
If result is a
Promise
, then return result . -
Return a promise resolved with undefined.
-
-
Perform ! InitializeReadableStream ( stream ).
-
Let controller be a new
ReadableByteStreamController
. -
Perform ! SetUpReadableByteStreamController ( stream , controller , startAlgorithm , pullAlgorithmWrapper , cancelAlgorithmWrapper , highWaterMark , undefined).
ReadableStream
from
other
specifications
is
thus
a
two-step
process,
like
so:
-
Let readableStream be a new
ReadableStream
. -
Set up readableStream given….
Subclasses
of
ReadableStream
will
use
the
set
up
or
set
up
with
byte
reading
support
operations
directly
on
the
this
value
inside
their
constructor
steps.
The
following
algorithms
must
only
be
used
on
ReadableStream
instances
initialized
via
the
above
set
up
or
set
up
with
byte
reading
support
algorithms
(not,
e.g.,
on
web-developer-created
instances):
ReadableStream
stream
’s
desired
size
to
fill
up
to
the
high
water
mark
is
the
result
of
running
the
following
steps:
-
If stream is not readable , then return 0.
-
If stream . [[controller]] implements
ReadableByteStreamController
, then return ! ReadableByteStreamControllerGetDesiredSize ( stream . [[controller]] ). -
Return ! ReadableStreamDefaultControllerGetDesiredSize ( stream . [[controller]] ).
A
ReadableStream
needs
more
data
if
its
desired
size
to
fill
up
to
the
high
water
mark
is
greater
than
zero.
ReadableStream
stream
:
-
If stream . [[controller]] implements
ReadableByteStreamController
,-
Perform ! ReadableByteStreamControllerClose ( stream . [[controller]] ).
-
If stream . [[controller]] . [[pendingPullIntos]] is not empty , perform ! ReadableByteStreamControllerRespond ( stream . [[controller]] , 0).
-
-
Otherwise, perform ! ReadableStreamDefaultControllerClose ( stream . [[controller]] ).
ReadableStream
stream
given
a
JavaScript
value
e
:
-
If stream . [[controller]] implements
ReadableByteStreamController
, then perform ! ReadableByteStreamControllerError ( stream . [[controller]] , e ). -
Otherwise, perform ! ReadableStreamDefaultControllerError ( stream . [[controller]] , e ).
ReadableStream
stream
:
-
If stream . [[controller]] implements
ReadableStreamDefaultController
,-
Perform ! ReadableStreamDefaultControllerEnqueue ( stream . [[controller]] , chunk ).
-
-
Otherwise,
-
Assert: stream . [[controller]] implements
ReadableByteStreamController
. -
Assert: chunk is an
ArrayBufferView
. -
Let byobView be the current BYOB request view for stream .
-
If byobView is non-null, and chunk .[[ViewedArrayBuffer]] is byobView .[[ViewedArrayBuffer]], then:
-
Assert: chunk .[[ByteOffset]] is byobView .[[ByteOffset]].
-
Assert: chunk .[[ByteLength]] ≤ byobView .[[ByteLength]].
These asserts ensure that the caller does not write outside the requested range in the current BYOB request view .
-
Perform ? ReadableByteStreamControllerRespond ( stream . [[controller]] , chunk .[[ByteLength]]).
-
-
Otherwise, perform ? ReadableByteStreamControllerEnqueue ( stream . [[controller]] , chunk ).
-
The
following
algorithms
must
only
be
used
on
ReadableStream
instances
initialized
via
the
above
set
up
with
byte
reading
support
algorithm:
ReadableStream
stream
is
either
an
ArrayBufferView
or
null,
determined
by
the
following
steps:
-
Assert: stream . [[controller]] implements
ReadableByteStreamController
. -
Let byobRequest be ! ReadableByteStreamControllerGetBYOBRequest ( stream . [[controller]] ).
-
If byobRequest is null, then return null.
-
Return byobRequest . [[view]] .
Specifications must not transfer or detach the underlying buffer of the current BYOB request view .
Implementations could do something equivalent to transferring, e.g. if they want to write into the memory from another thread. But they would need to make a few adjustments to how they implement the enqueue and close algorithms to keep the same observable consequences. In specification-land, transferring and detaching is just disallowed.
Specifications
should,
when
possible,
write
into
the
current
BYOB
request
view
when
it
is
non-null,
and
then
call
enqueue
with
that
view.
They
should
only
create
a
new
ArrayBufferView
to
pass
to
enqueue
when
the
current
BYOB
request
view
is
null,
or
when
they
have
more
bytes
on
hand
than
the
current
BYOB
request
view
's
byte
length
.
This
avoids
unnecessary
copies
and
better
respects
the
wishes
of
the
stream’s
consumer
.
The following pull from bytes algorithm implements these requirements, for the common case where bytes are derived from a byte sequence that serves as the specification-level representation of an underlying byte source . Note that it is conservative and leaves bytes in the byte sequence , instead of aggressively enqueueing them, so callers of this algorithm might want to use the number of remaining bytes as a backpressure signal.
ReadableStream
stream
:
-
Assert: stream . [[controller]] implements
ReadableByteStreamController
. -
Let available be bytes ’s length .
-
Let desiredSize be available .
-
If stream ’s current BYOB request view is non-null, then set desiredSize to stream ’s current BYOB request view 's byte length .
-
Let pullSize be the smaller value of available and desiredSize .
-
Let pulled be the first pullSize bytes of bytes .
-
Remove the first pullSize bytes from bytes .
-
If stream ’s current BYOB request view is non-null, then:
-
Write pulled into stream ’s current BYOB request view .
-
Perform ? ReadableByteStreamControllerRespond ( stream . [[controller]] , pullSize ).
-
-
Otherwise,
-
Set view to the result of creating a
Uint8Array
from pulled in stream ’s relevant Realm . -
Perform ? ReadableByteStreamControllerEnqueue ( stream . [[controller]] , view ).
-
Specifications
must
not
write
into
the
current
BYOB
request
view
or
pull
from
bytes
after
closing
the
corresponding
ReadableStream
.
9.1.2. Reading
The
following
algorithms
can
be
used
on
arbitrary
ReadableStream
instances,
including
ones
that
are
created
by
web
developers.
They
can
all
fail
in
various
operation-specific
ways,
and
these
failures
should
be
handled
by
the
calling
specification.
To
get
a
reader
for
a
ReadableStream
stream
,
return
?
AcquireReadableStreamDefaultReader
(
stream
).
The
result
will
be
a
ReadableStreamDefaultReader
.
This will throw an exception if stream is already locked .
To
read
a
chunk
from
a
ReadableStreamDefaultReader
reader
,
given
a
read
request
readRequest
,
perform
!
ReadableStreamDefaultReaderRead
(
reader
,
readRequest
).
To
read
all
bytes
from
a
ReadableStreamDefaultReader
reader
,
given
successSteps
,
which
is
an
algorithm
accepting
a
byte
sequence
,
and
failureSteps
,
which
is
an
algorithm
accepting
a
JavaScript
value:
read-loop
given
reader
,
a
new
byte
sequence
,
successSteps
,
and
failureSteps
.
-
Let readRequest be a new read request with the following items :
- chunk steps , given chunk
-
-
If chunk is not a
Uint8Array
object, call failureSteps with aTypeError
and abort these steps. -
Append the bytes represented by chunk to bytes .
-
Read-loop given reader , bytes , successSteps , and failureSteps .
This recursion could potentially cause a stack overflow if implemented directly. Implementations will need to mitigate this, e.g. by using a non-recursive variant of this algorithm, or queuing a microtask , or using a more direct method of byte-reading as noted below.
-
- close steps
-
-
Call successSteps with bytes .
-
- error steps , given e
-
-
Call failureSteps with e .
-
-
Perform ! ReadableStreamDefaultReaderRead ( reader , readRequest ).
Because
reader
grants
exclusive
access
to
its
corresponding
ReadableStream
,
the
actual
mechanism
of
how
to
read
cannot
be
observed.
Implementations
could
use
a
more
direct
mechanism
if
convenient,
such
as
acquiring
and
using
a
ReadableStreamBYOBReader
instead
of
a
ReadableStreamDefaultReader
,
or
accessing
the
chunks
directly.
To
release
a
ReadableStreamDefaultReader
reader
,
perform
!
ReadableStreamDefaultReaderRelease
(
reader
).
To
cancel
a
ReadableStreamDefaultReader
reader
with
reason
,
perform
!
ReadableStreamReaderGenericCancel
(
reader
,
reason
).
The
return
value
will
be
a
promise
that
either
fulfills
with
undefined,
or
rejects
with
a
failure
reason.
To
cancel
a
ReadableStream
stream
with
reason
,
return
!
ReadableStreamCancel
(
stream
,
reason
).
The
return
value
will
be
a
promise
that
either
fulfills
with
undefined,
or
rejects
with
a
failure
reason.
To
tee
a
ReadableStream
stream
,
return
?
ReadableStreamTee
(
stream
,
true).
Because we pass true as the second argument to ReadableStreamTee , the second branch returned will have its chunks cloned (using HTML’s serializable objects framework) from those of the first branch. This prevents consumption of one of the branches from interfering with the other.
9.1.3. Introspection
The
following
predicates
can
be
used
on
arbitrary
ReadableStream
objects.
However,
note
that
apart
from
checking
whether
or
not
the
stream
is
locked
,
this
direct
introspection
is
not
possible
via
the
public
JavaScript
API,
and
so
specifications
should
instead
use
the
algorithms
in
§ 9.1.2
Reading
.
(For
example,
instead
of
testing
if
the
stream
is
readable
,
attempt
to
get
a
reader
and
handle
any
exception.)
A
ReadableStream
stream
is
readable
if
stream
.
[[state]]
is
"
readable
".
A
ReadableStream
stream
is
closed
if
stream
.
[[state]]
is
"
closed
".
A
ReadableStream
stream
is
errored
if
stream
.
[[state]]
is
"
errored
".
A
ReadableStream
stream
is
locked
if
!
IsReadableStreamLocked
(
stream
)
returns
true.
A
ReadableStream
stream
is
disturbed
if
stream
.
[[disturbed]]
is
true.
This indicates whether the stream has ever been read from or canceled. Even more so than other predicates in this section, it is best consulted sparingly, since this is not information web developers have access to even indirectly. As such, branching platform behavior on it is undesirable.
9.2. Writable streams
9.2.1. Creation and manipulation
WritableStream
object
stream
,
given
an
algorithm
writeAlgorithm
,
an
optional
algorithm
closeAlgorithm
,
an
optional
algorithm
abortAlgorithm
,
an
optional
number
highWaterMark
(default
1),
an
optional
algorithm
sizeAlgorithm
,
perform
the
following
steps.
writeAlgorithm
must
be
an
algorithm
that
accepts
a
chunk
object
and
returns
a
promise.
If
given,
closeAlgorithm
and
abortAlgorithm
may
return
a
promise.
If
given,
sizeAlgorithm
must
be
an
algorithm
accepting
chunk
objects
and
returning
a
number;
and
if
given,
highWaterMark
must
be
a
non-negative,
non-NaN
number.
-
Let startAlgorithm be an algorithm that returns undefined.
-
Let closeAlgorithmWrapper be an algorithm that runs these steps:
-
Let result be the result of running closeAlgorithm , if closeAlgorithm was given, or null otherwise. If this throws an exception e , return a promise rejected with e .
-
If result is a
Promise
, then return result . -
Return a promise resolved with undefined.
-
-
Let abortAlgorithmWrapper be an algorithm that runs these steps:
-
Let result be the result of running abortAlgorithm , if abortAlgorithm was given, or null otherwise. If this throws an exception e , return a promise rejected with e .
-
If result is a
Promise
, then return result . -
Return a promise resolved with undefined.
-
-
If sizeAlgorithm was not given, then set it to an algorithm that returns 1.
-
Perform ! InitializeWritableStream ( stream ).
-
Let controller be a new
WritableStreamDefaultController
. -
Perform ! SetUpWritableStreamDefaultController ( stream , controller , startAlgorithm , writeAlgorithm , closeAlgorithmWrapper , abortAlgorithmWrapper , highWaterMark , sizeAlgorithm ).
Other
specifications
should
be
careful
when
constructing
their
writeAlgorithm
to
avoid
in
parallel
reads
from
the
given
chunk
,
as
such
reads
can
violate
the
run-to-completion
semantics
of
JavaScript.
To
avoid
this,
they
can
make
a
synchronous
copy
or
transfer
of
the
given
value,
using
operations
such
as
StructuredSerializeWithTransfer
,
get
a
copy
of
the
bytes
held
by
the
buffer
source
,
or
transferring
an
ArrayBuffer
.
An
exception
is
when
the
chunk
is
a
SharedArrayBuffer
,
for
which
it
is
understood
that
parallel
mutations
are
a
fact
of
life.
WritableStream
from
other
specifications
is
thus
a
two-step
process,
like
so:
-
Let writableStream be a new
WritableStream
. -
Set up writableStream given….
Subclasses
of
WritableStream
will
use
the
set
up
operation
directly
on
the
this
value
inside
their
constructor
steps.
The
following
definitions
must
only
be
used
on
WritableStream
instances
initialized
via
the
above
set
up
algorithm:
To
error
a
WritableStream
stream
given
a
JavaScript
value
e
,
perform
!
WritableStreamDefaultControllerErrorIfNeeded
(
stream
.
[[controller]]
,
e
).
The
signal
of
a
WritableStream
stream
is
stream
.
[[controller]]
.
[[abortController]]
's
signal
.
Specifications
can
add
or
remove
algorithms
to
this
AbortSignal
,
or
consult
whether
it
is
aborted
and
its
abort
reason
.
The
usual
usage
is,
after
setting
up
the
WritableStream
,
add
an
algorithm
to
its
signal
,
which
aborts
any
ongoing
write
operation
to
the
underlying
sink
.
Then,
inside
the
writeAlgorithm
,
once
the
underlying
sink
has
responded,
check
if
the
signal
is
aborted
,
and
reject
the
returned
promise
with
the
signal’s
abort
reason
if
so.
9.2.2. Writing
The
following
algorithms
can
be
used
on
arbitrary
WritableStream
instances,
including
ones
that
are
created
by
web
developers.
They
can
all
fail
in
various
operation-specific
ways,
and
these
failures
should
be
handled
by
the
calling
specification.
To
get
a
writer
for
a
WritableStream
stream
,
return
?
AcquireWritableStreamDefaultWriter
(
stream
).
The
result
will
be
a
WritableStreamDefaultWriter
.
This will throw an exception if stream is already locked.
To
write
a
chunk
to
a
WritableStreamDefaultWriter
writer
,
given
a
value
chunk
,
return
!
WritableStreamDefaultWriterWrite
(
writer
,
chunk
).
To
release
a
WritableStreamDefaultWriter
writer
,
perform
!
WritableStreamDefaultWriterRelease
(
writer
).
To
close
a
WritableStream
stream
,
return
!
WritableStreamClose
(
stream
).
The
return
value
will
be
a
promise
that
either
fulfills
with
undefined,
or
rejects
with
a
failure
reason.
To
abort
a
WritableStream
stream
with
reason
,
return
!
WritableStreamAbort
(
stream
,
reason
).
The
return
value
will
be
a
promise
that
either
fulfills
with
undefined,
or
rejects
with
a
failure
reason.
9.3. Transform streams
9.3.1. Creation and manipulation
TransformStream
stream
given
an
algorithm
transformAlgorithm
,
an
optional
algorithm
flushAlgorithm
,
and
an
optional
algorithm
cancelAlgorithm
,
perform
the
following
steps.
transformAlgorithm
and,
if
given,
flushAlgorithm
and
cancelAlgorithm
,
may
return
a
promise.
-
Let writableHighWaterMark be 1.
-
Let writableSizeAlgorithm be an algorithm that returns 1.
-
Let readableHighWaterMark be 0.
-
Let readableSizeAlgorithm be an algorithm that returns 1.
-
Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk :
-
Let result be the result of running transformAlgorithm given chunk . If this throws an exception e , return a promise rejected with e .
-
If result is a
Promise
, then return result . -
Return a promise resolved with undefined.
-
-
Let flushAlgorithmWrapper be an algorithm that runs these steps:
-
Let result be the result of running flushAlgorithm , if flushAlgorithm was given, or null otherwise. If this throws an exception e , return a promise rejected with e .
-
If result is a
Promise
, then return result . -
Return a promise resolved with undefined.
-
-
Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason :
-
Let result be the result of running cancelAlgorithm given reason , if cancelAlgorithm was given, or null otherwise. If this throws an exception e , return a promise rejected with e .
-
If result is a
Promise
, then return result . -
Return a promise resolved with undefined.
-
-
Let startPromise be a promise resolved with undefined.
-
Perform ! InitializeTransformStream ( stream , startPromise , writableHighWaterMark , writableSizeAlgorithm , readableHighWaterMark , readableSizeAlgorithm ).
-
Let controller be a new
TransformStreamDefaultController
. -
Perform ! SetUpTransformStreamDefaultController ( stream , controller , transformAlgorithmWrapper , flushAlgorithmWrapper , cancelAlgorithmWrapper ).
Other
specifications
should
be
careful
when
constructing
their
transformAlgorithm
to
avoid
in
parallel
reads
from
the
given
chunk
,
as
such
reads
can
violate
the
run-to-completion
semantics
of
JavaScript.
To
avoid
this,
they
can
make
a
synchronous
copy
or
transfer
of
the
given
value,
using
operations
such
as
StructuredSerializeWithTransfer
,
get
a
copy
of
the
bytes
held
by
the
buffer
source
,
or
transferring
an
ArrayBuffer
.
An
exception
is
when
the
chunk
is
a
SharedArrayBuffer
,
for
which
it
is
understood
that
parallel
mutations
are
a
fact
of
life.
TransformStream
from
other
specifications
is
thus
a
two-step
process,
like
so:
-
Let transformStream be a new
TransformStream
. -
Set up transformStream given….
Subclasses
of
TransformStream
will
use
the
set
up
operation
directly
on
the
this
value
inside
their
constructor
steps.
TransformStream
:
-
Let transformStream be a new
TransformStream
. -
Set up transformStream with transformAlgorithm set to an algorithm which, given chunk , enqueues chunk in transformStream .
-
Return transformStream .
The
following
algorithms
must
only
be
used
on
TransformStream
instances
initialized
via
the
above
set
up
algorithm.
Usually
they
are
called
as
part
of
transformAlgorithm
or
flushAlgorithm
.
To
enqueue
the
JavaScript
value
chunk
into
a
TransformStream
stream
,
perform
!
TransformStreamDefaultControllerEnqueue
(
stream
.
[[controller]]
,
chunk
).
To
terminate
a
TransformStream
stream
,
perform
!
TransformStreamDefaultControllerTerminate
(
stream
.
[[controller]]
).
To
error
a
TransformStream
stream
given
a
JavaScript
value
e
,
perform
!
TransformStreamDefaultControllerError
(
stream
.
[[controller]]
,
e
).
9.3.2. Wrapping into a custom class
Other
specifications
which
mean
to
define
custom
transform
streams
might
not
want
to
subclass
from
the
TransformStream
interface
directly.
Instead,
if
they
need
a
new
class,
they
can
create
their
own
independent
Web
IDL
interfaces,
and
use
the
following
mixin:
interface mixin {
GenericTransformStream ;readonly attribute ReadableStream readable ;readonly attribute WritableStream writable ; };
Any
platform
object
that
includes
the
GenericTransformStream
mixin
has
an
associated
transform
,
which
is
an
actual
TransformStream
.
The
readable
getter
steps
are
to
return
this
's
transform
.
[[readable]]
.
The
writable
getter
steps
are
to
return
this
's
transform
.
[[writable]]
.
Including
the
GenericTransformStream
mixin
will
give
an
IDL
interface
the
appropriate
readable
and
writable
properties.
To
customize
the
behavior
of
the
resulting
interface,
its
constructor
(or
other
initialization
code)
must
set
each
instance’s
transform
to
a
new
TransformStream
,
and
then
set
it
up
with
appropriate
customizations
via
the
transformAlgorithm
and
optionally
flushAlgorithm
arguments.
Note:
Existing
examples
of
this
pattern
on
the
web
platform
include
CompressionStream
and
TextDecoderStream
.
[COMPRESSION]
[ENCODING]
There’s
no
need
to
create
a
wrapper
class
if
you
don’t
need
any
API
beyond
what
the
base
TransformStream
class
provides.
The
most
common
driver
for
such
a
wrapper
is
needing
custom
constructor
steps
,
but
if
your
conceptual
transform
stream
isn’t
meant
to
be
constructed,
then
using
TransformStream
directly
is
fine.
9.4. Other stream pairs
Apart from transform streams , discussed above, specifications often create pairs of readable and writable streams. This section gives some guidance for such situations.
In
all
such
cases,
specifications
should
use
the
names
readable
and
writable
for
the
two
properties
exposing
the
streams
in
question.
They
should
not
use
other
names
(such
as
input
/
output
or
readableStream
/
writableStream
),
and
they
should
not
use
methods
or
other
non-property
means
of
access
to
the
streams.
9.4.1. Duplex streams
The most common readable/writable pair is a duplex stream , where the readable and writable streams represent two sides of a single shared resource, such as a socket, connection, or device.
The trickiest thing to consider when specifying duplex streams is how to handle operations like canceling the readable side, or closing or aborting the writable side. It might make sense to leave duplex streams "half open", with such operations one one side not impacting the other side. Or it might be best to carry over their effects to the other side, e.g. by specifying that your readable side’s cancelAlgorithm will close the writable side.
A basic example of a duplex stream, created through JavaScript instead of through specification prose, is found in § 10.8 A { readable, writable } stream pair wrapping the same underlying resource . It illustrates this carry-over behavior.
Another consideration is how to handle the creation of duplex streams which need to be acquired asynchronously, e.g. via establishing a connection. The preferred pattern here is to have a constructible class with a promise-returning property that fulfills with the actual duplex stream object. That duplex stream object can also then expose any information that is only available asynchronously, e.g. connection data. The container class can then provide convenience APIs, such as a function to close the entire connection instead of only closing individual sides.
An
example
of
this
more
complex
type
of
duplex
stream
is
the
still-being-specified
WebSocketStream
.
See
its
explainer
and
design
notes
.
Because
duplex
streams
obey
the
readable
/
writable
property
contract,
they
can
be
used
with
pipeThrough()
.
This
doesn’t
always
make
sense,
but
it
could
in
cases
where
the
underlying
resource
is
in
fact
performing
some
sort
of
transformation.
For an arbitrary WebSocket, piping through a WebSocket-derived duplex stream doesn’t make sense. However, if the WebSocket server is specifically written so that it responds to incoming messages by sending the same data back in some transformed form, then this could be useful and convenient.
9.4.2. Endpoint pairs
Another type of readable/writable pair is an endpoint pair . In these cases the readable and writable streams represent the two ends of a longer pipeline, with the intention that web developer code insert transform streams into the middle of them.
createEndpointPair()
,
web
developers
would
write
code
like
so:
const { readable, writable} = createEndpointPair(); readable. pipeThrough( new TransformStream(...)). pipeTo( writable);
WebRTC
Insertable
Media
using
Streams
is
an
example
of
this
technique,
with
its
sender.createEncodedStreams()
and
receiver.createEncodedStreams()
methods.
Despite
such
endpoint
pairs
obeying
the
readable
/
writable
property
contract,
it
never
makes
sense
to
pass
them
to
pipeThrough()
.
9.5. Piping
ReadableStream
readable
piped
to
a
WritableStream
writable
,
given
an
optional
boolean
preventClose
(default
false),
an
optional
boolean
preventAbort
(default
false),
an
optional
boolean
preventCancel
(default
false),
and
an
optional
AbortSignal
signal
,
is
given
by
performing
the
following
steps.
They
will
return
a
Promise
that
fulfills
when
the
pipe
completes,
or
rejects
with
an
exception
if
it
fails.
-
Assert: ! IsReadableStreamLocked ( readable ) is false.
-
Assert: ! IsWritableStreamLocked ( writable ) is false.
-
Let signalArg be signal if signal was given, or undefined otherwise.
-
Return ! ReadableStreamPipeTo ( readable , writable , preventClose , preventAbort , preventCancel , signalArg ).
If one doesn’t care about the promise returned, referencing this concept can be a bit awkward. The best we can suggest is " pipe readable to writable ".
ReadableStream
readable
piped
through
a
TransformStream
transform
,
given
an
optional
boolean
preventClose
(default
false),
an
optional
boolean
preventAbort
(default
false),
an
optional
boolean
preventCancel
(default
false),
and
an
optional
AbortSignal
signal
,
is
given
by
performing
the
following
steps.
The
result
will
be
the
readable
side
of
transform
.
-
Assert: ! IsReadableStreamLocked ( readable ) is false.
-
Assert: ! IsWritableStreamLocked ( transform . [[writable]] ) is false.
-
Let signalArg be signal if signal was given, or undefined otherwise.
-
Let promise be ! ReadableStreamPipeTo ( readable , transform . [[writable]] , preventClose , preventAbort , preventCancel , signalArg ).
-
Set promise .[[PromiseIsHandled]] to true.
-
Return transform . [[readable]] .
ReadableStream
stream
,
perform
the
following
steps.
The
result
will
be
a
new
ReadableStream
object
which
pulls
its
data
from
stream
,
while
stream
itself
becomes
immediately
locked
and
disturbed
.
-
Let identityTransform be the result of creating an identity
TransformStream
. -
Return the result of stream piped through identityTransform .
10. Examples of creating streams
This section, and all its subsections, are non-normative.
The
previous
examples
throughout
the
standard
have
focused
on
how
to
use
streams.
Here
we
show
how
to
create
a
stream,
using
the
ReadableStream
,
WritableStream
,
and
TransformStream
constructors.
10.1. A readable stream with an underlying push source (no backpressure support)
The
following
function
creates
readable
streams
that
wrap
WebSocket
instances
[WEBSOCKETS]
,
which
are
push
sources
that
do
not
support
backpressure
signals.
It
illustrates
how,
when
adapting
a
push
source,
usually
most
of
the
work
happens
in
the
start()
method.
function makeReadableWebSocketStream( url, protocols) { const ws= new WebSocket( url, protocols); ws. binaryType= "arraybuffer" ; return new ReadableStream({ start( controller) { ws. onmessage= event=> controller. enqueue( event. data); ws. onclose= () => controller. close(); ws. onerror= () => controller. error( new Error ( "The WebSocket errored!" )); }, cancel() { ws. close(); } }); }
We can then use this function to create readable streams for a web socket, and pipe that stream to an arbitrary writable stream:
const webSocketStream= makeReadableWebSocketStream( "wss://example.com:443/" , "protocol" ); webSocketStream. pipeTo( writableStream) . then(() => console. log( "All data successfully written!" )) . catch ( e=> console. error( "Something went wrong!" , e));
However,
often
when
people
talk
about
"adding
streams
support
to
web
sockets",
they
are
hoping
instead
for
a
new
capability
to
send
an
individual
web
socket
message
in
a
streaming
fashion,
so
that
e.g.
a
file
could
be
transferred
in
a
single
message
without
holding
all
of
its
contents
in
memory
on
the
client
side.
To
accomplish
this
goal,
we’d
instead
want
to
allow
individual
web
socket
messages
to
themselves
be
ReadableStream
instances.
That
isn’t
what
we
show
in
the
above
example.
For more background, see this discussion .
10.2. A readable stream with an underlying push source and backpressure support
The
following
function
returns
readable
streams
that
wrap
"backpressure
sockets,"
which
are
hypothetical
objects
that
have
the
same
API
as
web
sockets,
but
also
provide
the
ability
to
pause
and
resume
the
flow
of
data
with
their
readStop
and
readStart
methods.
In
doing
so,
this
example
shows
how
to
apply
backpressure
to
underlying
sources
that
support
it.
function makeReadableBackpressureSocketStream( host, port) { const socket= createBackpressureSocket( host, port); return new ReadableStream({ start( controller) { socket. ondata= event=> { controller. enqueue( event. data); if ( controller. desiredSize<= 0 ) { // The internal queue is full, so propagate // the backpressure signal to the underlying source. socket. readStop(); } }; socket. onend= () => controller. close(); socket. onerror= () => controller. error( new Error ( "The socket errored!" )); }, pull() { // This is called if the internal queue has been emptied, but the // stream's consumer still wants more data. In that case, restart // the flow of data if we have previously paused it. socket. readStart(); }, cancel() { socket. close(); } }); }
We can then use this function to create readable streams for such "backpressure sockets" in the same way we do for web sockets. This time, however, when we pipe to a destination that cannot accept data as fast as the socket is producing it, or if we leave the stream alone without reading from it for some time, a backpressure signal will be sent to the socket.
10.3. A readable byte stream with an underlying push source (no backpressure support)
The
following
function
returns
readable
byte
streams
that
wraps
a
hypothetical
UDP
socket
API,
including
a
promise-returning
select2()
method
that
is
meant
to
be
evocative
of
the
POSIX
select(2)
system
call.
Since
the
UDP
protocol
does
not
have
any
built-in
backpressure
support,
the
backpressure
signal
given
by
desiredSize
is
ignored,
and
the
stream
ensures
that
when
data
is
available
from
the
socket
but
not
yet
requested
by
the
developer,
it
is
enqueued
in
the
stream’s
internal
queue
,
to
avoid
overflow
of
the
kernel-space
queue
and
a
consequent
loss
of
data.
This has some interesting consequences for how consumers interact with the stream. If the consumer does not read data as fast as the socket produces it, the chunks will remain in the stream’s internal queue indefinitely. In this case, using a BYOB reader will cause an extra copy, to move the data from the stream’s internal queue to the developer-supplied buffer. However, if the consumer consumes the data quickly enough, a BYOB reader will allow zero-copy reading directly into developer-supplied buffers.
(You
can
imagine
a
more
complex
version
of
this
example
which
uses
desiredSize
to
inform
an
out-of-band
backpressure
signaling
mechanism,
for
example
by
sending
a
message
down
the
socket
to
adjust
the
rate
of
data
being
sent.
That
is
left
as
an
exercise
for
the
reader.)
const DEFAULT_CHUNK_SIZE= 65536 ; function makeUDPSocketStream( host, port) { const socket= createUDPSocket( host, port); return new ReadableStream({ type: "bytes" , start( controller) { readRepeatedly(). catch ( e=> controller. error( e)); function readRepeatedly() { return socket. select2(). then(() => { // Since the socket can become readable even when there’s // no pending BYOB requests, we need to handle both cases. let bytesRead; if ( controller. byobRequest) { const v= controller. byobRequest. view; bytesRead= socket. readInto( v. buffer, v. byteOffset, v. byteLength); if ( bytesRead=== 0 ) { controller. close(); } controller. byobRequest. respond( bytesRead); } else { const buffer= new ArrayBuffer( DEFAULT_CHUNK_SIZE); bytesRead= socket. readInto( buffer, 0 , DEFAULT_CHUNK_SIZE); if ( bytesRead=== 0 ) { controller. close(); } else { controller. enqueue( new Uint8Array( buffer, 0 , bytesRead)); } } if ( bytesRead=== 0 ) { return ; } return readRepeatedly(); }); } }, cancel() { socket. close(); } }); }
ReadableStream
instances
returned
from
this
function
can
now
vend
BYOB
readers
,
with
all
of
the
aforementioned
benefits
and
caveats.
10.4. A readable stream with an underlying pull source
The
following
function
returns
readable
streams
that
wrap
portions
of
the
Node.js
file
system
API
(which
themselves
map
fairly
directly
to
C’s
fopen
,
fread
,
and
fclose
trio).
Files
are
a
typical
example
of
pull
sources
.
Note
how
in
contrast
to
the
examples
with
push
sources,
most
of
the
work
here
happens
on-demand
in
the
pull()
function,
and
not
at
startup
time
in
the
start()
function.
const fs= require( "fs" ). promises; const CHUNK_SIZE= 1024 ; function makeReadableFileStream( filename) { let fileHandle; let position= 0 ; return new ReadableStream({ async start() { fileHandle= await fs. open( filename, "r" ); }, async pull( controller) { const buffer= new Uint8Array( CHUNK_SIZE); const { bytesRead} = await fileHandle. read( buffer, 0 , CHUNK_SIZE, position); if ( bytesRead=== 0 ) { await fileHandle. close(); controller. close(); } else { position+= bytesRead; controller. enqueue( buffer. subarray( 0 , bytesRead)); } }, cancel() { return fileHandle. close(); } }); }
We can then create and use readable streams for files just as we could before for sockets.
10.5. A readable byte stream with an underlying pull source
The following function returns readable byte streams that allow efficient zero-copy reading of files, again using the Node.js file system API . Instead of using a predetermined chunk size of 1024, it attempts to fill the developer-supplied buffer, allowing full control.
const fs= require( "fs" ). promises; const DEFAULT_CHUNK_SIZE= 1024 ; function makeReadableByteFileStream( filename) { let fileHandle; let position= 0 ; return new ReadableStream({ type: "bytes" , async start() { fileHandle= await fs. open( filename, "r" ); }, async pull( controller) { // Even when the consumer is using the default reader, the auto-allocation // feature allocates a buffer and passes it to us via byobRequest. const v= controller. byobRequest. view; const { bytesRead} = await fileHandle. read( v, 0 , v. byteLength, position); if ( bytesRead=== 0 ) { await fileHandle. close(); controller. close(); controller. byobRequest. respond( 0 ); } else { position+= bytesRead; controller. byobRequest. respond( bytesRead); } }, cancel() { return fileHandle. close(); }, autoAllocateChunkSize: DEFAULT_CHUNK_SIZE}); }
With
this
in
hand,
we
can
create
and
use
BYOB
readers
for
the
returned
ReadableStream
.
But
we
can
also
create
default
readers
,
using
them
in
the
same
simple
and
generic
manner
as
usual.
The
adaptation
between
the
low-level
byte
tracking
of
the
underlying
byte
source
shown
here,
and
the
higher-level
chunk-based
consumption
of
a
default
reader
,
is
all
taken
care
of
automatically
by
the
streams
implementation.
The
auto-allocation
feature,
via
the
autoAllocateChunkSize
option,
even
allows
us
to
write
less
code,
compared
to
the
manual
branching
in
§ 10.3
A
readable
byte
stream
with
an
underlying
push
source
(no
backpressure
support)
.
10.6. A writable stream with no backpressure or success signals
The
following
function
returns
a
writable
stream
that
wraps
a
WebSocket
[WEBSOCKETS]
.
Web
sockets
do
not
provide
any
way
to
tell
when
a
given
chunk
of
data
has
been
successfully
sent
(without
awkward
polling
of
bufferedAmount
,
which
we
leave
as
an
exercise
to
the
reader).
As
such,
this
writable
stream
has
no
ability
to
communicate
accurate
backpressure
signals
or
write
success/failure
to
its
producers
.
That
is,
the
promises
returned
by
its
writer
's
write()
method
and
ready
getter
will
always
fulfill
immediately.
function makeWritableWebSocketStream( url, protocols) { const ws= new WebSocket( url, protocols); return new WritableStream({ start( controller) { ws. onerror= () => { controller. error( new Error ( "The WebSocket errored!" )); ws. onclose= null ; }; ws. onclose= () => controller. error( new Error ( "The server closed the connection unexpectedly!" )); return new Promise( resolve=> ws. onopen= resolve); }, write( chunk) { ws. send( chunk); // Return immediately, since the web socket gives us no easy way to tell // when the write completes. }, close() { return closeWS( 1000 ); }, abort( reason) { return closeWS( 4000 , reason&& reason. message); }, }); function closeWS( code, reasonString) { return new Promise(( resolve, reject) => { ws. onclose= e=> { if ( e. wasClean) { resolve(); } else { reject( new Error ( "The connection was not closed cleanly" )); } }; ws. close( code, reasonString); }); } }
We can then use this function to create writable streams for a web socket, and pipe an arbitrary readable stream to it:
const webSocketStream= makeWritableWebSocketStream( "wss://example.com:443/" , "protocol" ); readableStream. pipeTo( webSocketStream) . then(() => console. log( "All data successfully written!" )) . catch ( e=> console. error( "Something went wrong!" , e));
See the earlier note about this style of wrapping web sockets into streams.
10.7. A writable stream with backpressure and success signals
The
following
function
returns
writable
streams
that
wrap
portions
of
the
Node.js
file
system
API
(which
themselves
map
fairly
directly
to
C’s
fopen
,
fwrite
,
and
fclose
trio).
Since
the
API
we
are
wrapping
provides
a
way
to
tell
when
a
given
write
succeeds,
this
stream
will
be
able
to
communicate
backpressure
signals
as
well
as
whether
an
individual
write
succeeded
or
failed.
const fs= require( "fs" ). promises; function makeWritableFileStream( filename) { let fileHandle; return new WritableStream({ async start() { fileHandle= await fs. open( filename, "w" ); }, write( chunk) { return fileHandle. write( chunk, 0 , chunk. length); }, close() { return fileHandle. close(); }, abort() { return fileHandle. close(); } }); }
We can then use this function to create a writable stream for a file, and write individual chunks of data to it:
const fileStream= makeWritableFileStream( "/example/path/on/fs.txt" ); const writer= fileStream. getWriter(); writer. write( "To stream, or not to stream\n" ); writer. write( "That is the question\n" ); writer. close() . then(() => console. log( "chunks written and stream closed successfully!" )) . catch ( e=> console. error( e));
Note
that
if
a
particular
call
to
fileHandle.write
takes
a
longer
time,
the
returned
promise
will
fulfill
later.
In
the
meantime,
additional
writes
can
be
queued
up,
which
are
stored
in
the
stream’s
internal
queue.
The
accumulation
of
chunks
in
this
queue
can
change
the
stream
to
return
a
pending
promise
from
the
ready
getter,
which
is
a
signal
to
producers
that
they
would
benefit
from
backing
off
and
stopping
writing,
if
possible.
The
way
in
which
the
writable
stream
queues
up
writes
is
especially
important
in
this
case,
since
as
stated
in
the
documentation
for
fileHandle.write
,
"it
is
unsafe
to
use
filehandle.write
multiple
times
on
the
same
file
without
waiting
for
the
promise."
But
we
don’t
have
to
worry
about
that
when
writing
the
makeWritableFileStream
function,
since
the
stream
implementation
guarantees
that
the
underlying
sink
's
write()
method
will
not
be
called
until
any
promises
returned
by
previous
calls
have
fulfilled!
10.8. A { readable, writable } stream pair wrapping the same underlying resource
The
following
function
returns
an
object
of
the
form
{
readable,
writable
}
,
with
the
readable
property
containing
a
readable
stream
and
the
writable
property
containing
a
writable
stream,
where
both
streams
wrap
the
same
underlying
web
socket
resource.
In
essence,
this
combines
§ 10.1
A
readable
stream
with
an
underlying
push
source
(no
backpressure
support)
and
§ 10.6
A
writable
stream
with
no
backpressure
or
success
signals
.
While doing so, it illustrates how you can use JavaScript classes to create reusable underlying sink and underlying source abstractions.
function streamifyWebSocket( url, protocol) { const ws= new WebSocket( url, protocols); ws. binaryType= "arraybuffer" ; return { readable: new ReadableStream( new WebSocketSource( ws)), writable: new WritableStream( new WebSocketSink( ws)) }; } class WebSocketSource{ constructor ( ws) { this . _ws= ws; } start( controller) { this . _ws. onmessage= event=> controller. enqueue( event. data); this . _ws. onclose= () => controller. close(); this . _ws. addEventListener( "error" , () => { controller. error( new Error ( "The WebSocket errored!" )); }); } cancel() { this . _ws. close(); } } class WebSocketSink{ constructor ( ws) { this . _ws= ws; } start( controller) { this . _ws. onclose= () => controller. error( new Error ( "The server closed the connection unexpectedly!" )); this . _ws. addEventListener( "error" , () => { controller. error( new Error ( "The WebSocket errored!" )); this . _ws. onclose= null ; }); return new Promise( resolve=> this . _ws. onopen= resolve); } write( chunk) { this . _ws. send( chunk); } close() { return this . _closeWS( 1000 ); } abort( reason) { return this . _closeWS( 4000 , reason&& reason. message); } _closeWS( code, reasonString) { return new Promise(( resolve, reject) => { this . _ws. onclose= e=> { if ( e. wasClean) { resolve(); } else { reject( new Error ( "The connection was not closed cleanly" )); } }; this . _ws. close( code, reasonString); }); } }
We can then use the objects created by this function to communicate with a remote web socket, using the standard stream APIs:
const streamyWS= streamifyWebSocket( "wss://example.com:443/" , "protocol" ); const writer= streamyWS. writable. getWriter(); const reader= streamyWS. readable. getReader(); writer. write( "Hello" ); writer. write( "web socket!" ); reader. read(). then(({ value, done}) => { console. log( "The web socket says: " , value); });
Note
how
in
this
setup
canceling
the
readable
side
will
implicitly
close
the
writable
side,
and
similarly,
closing
or
aborting
the
writable
side
will
implicitly
close
the
readable
side.
See the earlier note about this style of wrapping web sockets into streams.
10.9. A transform stream that replaces template tags
It’s
often
useful
to
substitute
tags
with
variables
on
a
stream
of
data,
where
the
parts
that
need
to
be
replaced
are
small
compared
to
the
overall
data
size.
This
example
presents
a
simple
way
to
do
that.
It
maps
strings
to
strings,
transforming
a
template
like
"Time:
{{time}}
Message:
{{message}}"
to
"Time:
15:36
Message:
hello"
assuming
that
{
time:
"15:36",
message:
"hello"
}
was
passed
in
the
substitutions
parameter
to
LipFuzzTransformer
.
This
example
also
demonstrates
one
way
to
deal
with
a
situation
where
a
chunk
contains
partial
data
that
cannot
be
transformed
until
more
data
is
received.
In
this
case,
a
partial
template
tag
will
be
accumulated
in
the
partialChunk
property
until
either
the
end
of
the
tag
is
found
or
the
end
of
the
stream
is
reached.
class LipFuzzTransformer{ constructor ( substitutions) { this . substitutions= substitutions; this . partialChunk= "" ; this . lastIndex= undefined ; } transform( chunk, controller) { chunk= this . partialChunk+ chunk; this . partialChunk= "" ; // lastIndex is the index of the first character after the last substitution. this . lastIndex= 0 ; chunk= chunk. replace( /\{\{([a-zA-Z0-9_-]+)\}\}/g , this . replaceTag. bind( this )); // Regular expression for an incomplete template at the end of a string. const partialAtEndRegexp= /\{(\{([a-zA-Z0-9_-]+(\})?)?)?$/g ; // Avoid looking at any characters that have already been substituted. partialAtEndRegexp. lastIndex= this . lastIndex; this . lastIndex= undefined ; const match= partialAtEndRegexp. exec( chunk); if ( match) { this . partialChunk= chunk. substring( match. index); chunk= chunk. substring( 0 , match. index); } controller. enqueue( chunk); } flush( controller) { if ( this . partialChunk. length> 0 ) { controller. enqueue( this . partialChunk); } } replaceTag( match, p1, offset) { let replacement= this . substitutions[ p1]; if ( replacement=== undefined ) { replacement= "" ; } this . lastIndex= offset+ replacement. length; return replacement; } }
In
this
case
we
define
the
transformer
to
be
passed
to
the
TransformStream
constructor
as
a
class.
This
is
useful
when
there
is
instance
data
to
track.
The class would be used in code like:
const data= { userName, displayName, icon, date}; const ts= new TransformStream( new LipFuzzTransformer( data)); fetchEvent. respondWith( fetch( fetchEvent. request. url). then( response=> { const transformedBody= response. body// Decode the binary-encoded response to string . pipeThrough( new TextDecoderStream()) // Apply the LipFuzzTransformer . pipeThrough( ts) // Encode the transformed string . pipeThrough( new TextEncoderStream()); return new Response( transformedBody); }) );
For
simplicity,
LipFuzzTransformer
performs
unescaped
text
substitutions.
In
real
applications,
a
template
system
that
performs
context-aware
escaping
is
good
practice
for
security
and
robustness.
10.10. A transform stream created from a sync mapper function
The
following
function
allows
creating
new
TransformStream
instances
from
synchronous
"mapper"
functions,
of
the
type
you
would
normally
pass
to
Array.prototype.map
.
It
demonstrates
that
the
API
is
concise
even
for
trivial
transforms.
function mapperTransformStream( mapperFunction) { return new TransformStream({ transform( chunk, controller) { controller. enqueue( mapperFunction( chunk)); } }); }
This
function
can
then
be
used
to
create
a
TransformStream
that
uppercases
all
its
inputs:
const ts= mapperTransformStream( chunk=> chunk. toUpperCase()); const writer= ts. writable. getWriter(); const reader= ts. readable. getReader(); writer. write( "No need to shout" ); // Logs "NO NEED TO SHOUT": reader. read(). then(({ value}) => console. log( value));
Although a synchronous transform never causes backpressure itself, it will only transform chunks as long as there is no backpressure, so resources will not be wasted.
Exceptions error the stream in a natural way:
const ts= mapperTransformStream( chunk=> JSON. parse( chunk)); const writer= ts. writable. getWriter(); const reader= ts. readable. getReader(); writer. write( "[1, " ); // Logs a SyntaxError, twice: reader. read(). catch ( e=> console. error( e)); writer. write( "{}" ). catch ( e=> console. error( e));
10.11. Using an identity transform stream as a primitive to create new readable streams
Combining
an
identity
transform
stream
with
pipeTo()
is
a
powerful
way
to
manipulate
streams.
This
section
contains
a
couple
of
examples
of
this
general
technique.
It’s sometimes natural to treat a promise for a readable stream as if it were a readable stream. A simple adapter function is all that’s needed:
function promiseToReadable( promiseForReadable) { const ts= new TransformStream(); promiseForReadable. then( readable=> readable. pipeTo( ts. writable)) . catch ( reason=> ts. writable. abort( reason)) . catch (() => {}); return ts. readable; }
Here,
we
pipe
the
data
to
the
writable
side
and
return
the
readable
side
.
If
the
pipe
errors,
we
abort
the
writable
side,
which
automatically
propagates
the
error
to
the
returned
readable
side.
If
the
writable
side
had
already
been
errored
by
pipeTo()
,
then
the
abort()
call
will
return
a
rejection,
which
we
can
safely
ignore.
A more complex extension of this is concatenating multiple readable streams into one:
function concatenateReadables( readables) { const ts= new TransformStream(); let promise= Promise. resolve(); for ( const readableof readables) { promise= promise. then( () => readable. pipeTo( ts. writable, { preventClose: true }), reason=> { return Promise. all([ ts. writable. abort( reason), readable. cancel( reason) ]); } ); } promise. then(() => ts. writable. close(), reason=> ts. writable. abort( reason)) . catch (() => {}); return ts. readable; }
The
error
handling
here
is
subtle
because
canceling
the
concatenated
stream
has
to
cancel
all
the
input
streams.
However,
the
success
case
is
simple
enough.
We
just
pipe
each
stream
in
the
readables
iterable
one
at
a
time
to
the
identity
transform
stream
's
writable
side
,
and
then
close
it
when
we
are
done.
The
readable
side
is
then
a
concatenation
of
all
the
chunks
from
all
of
of
the
streams.
We
return
it
from
the
function.
Backpressure
is
applied
as
usual.
Acknowledgments
The editors would like to thank Anne van Kesteren, AnthumChris, Arthur Langereis, Ben Kelly, Bert Belder, Brian di Palma, Calvin Metcalf, Dominic Tarr, Ed Hager, Eric Skoglund, Forbes Lindesay, Forrest Norvell, Gary Blackwood, Gorgi Kosev, Gus Caplan, 贺师俊 (hax), Isaac Schlueter, isonmad, Jake Archibald, Jake Verbaten, James Pryor, Janessa Det, Jason Orendorff, Jeffrey Yasskin, Jens Nockert, Lennart Grahl, Luca Casonato, Mangala Sadhu Sangeet Singh Khalsa, Marcos Caceres, Marvin Hagemeister, Mattias Buelens, Michael Mior, Mihai Potra, Nidhi Jaju, Romain Bellessort, Simon Menke, Stephen Sugden, Surma, Tab Atkins, Tanguy Krotoff, Thorsten Lorenz, Till Schneidereit, Tim Caswell, Trevor Norris, tzik, Will Chan, Youenn Fablet, 平野裕 (Yutaka Hirano), and Xabier Rodríguez for their contributions to this specification. Community involvement in this specification has been above and beyond; we couldn’t have done it without you.
This standard is written by Adam Rice ( Google , ricea@chromium.org ), Domenic Denicola ( Google , d@domenic.me ), Mattias Buelens, and 吉野剛史 (Takeshi Yoshino, tyoshino@chromium.org ).
Intellectual property rights
Copyright © WHATWG (Apple, Google, Mozilla, Microsoft). This work is licensed under a Creative Commons Attribution 4.0 International License . To the extent portions of it are incorporated into source code, such portions in the source code are licensed under the BSD 3-Clause License instead.
This is the Living Standard. Those interested in the patent-review version should view the Living Standard Review Draft .