Web Streams
Веб-потоки (web streams) - это стандарт для потоков (streams), который поддерживается всеми основными веб-платформами: веб-браузерами, Node.js и Deno. Потоки - это абстракция для чтения и записи данных последовательно, небольшими частями из любого вида источника - файлов, данных, находящихся на сервере, и т.д.
Например, глобальная функция fetch (которая используется для загрузки онлайн-ресурсов) асинхронно возвращает ответ (Response
), содержащий свойство body
с веб-потоком.
1. Что такое веб-поток?
Поток - это структура данных (data structure) для доступа к таким данным, как:
- файлы;
- данные, находящиеся на сервере;
- и т.д.
Двумя основными преимуществами потоков являются:
- возможность работы с большим количеством данных благодаря тому, что потоки разделяют их на небольшие части (называ емые "чанками"/chunks), которые могут обрабатываться по одному за раз;
- возможность использования одной структуры данных для обработки разных данных, что облегчает повторное использование кода.
Веб-потоки ("веб" часто опускается) - это относительно новый стандарт, изначально поддерживаемый браузерами, но теперь поддерживаемый также Node.js
и Deno
, как показано в этой таблице.
Чанки бывают 2 видов:
- текстовые потоки (text streams): строки;
- бинарные потоки (потоки байтов) (binary streams): Uint8Array (разновидность TypedArray (типизированных массивов)).
1.1. Виды потоков
Существует 3 основных вида потоков:
RS
(поток для чтения) (далее -RS
) используется для чтения данных из источника (source). Код, который это делает, называется потребителем (consumer);WS
(поток для записи) (далее -WS
) используется для записи данных в приемник (sink). Код, который это делает, называется производителем (producer);TS
(поток для преобразования) (далее -TS
) состоит из 2 потоков:- он получает данные от записывающей стороны (стороны для записи) (writable side),
WS
; - он отправляет данные читающей стороне (стороне для чтения) (readable side),
RS
.
- он получает данные от записывающей стороны (стороны для записи) (writable side),
Основная идея TS
состоит в преобразовании данных, проходящих через туннель (конвейер) (pipe). Другими словами, данные записываются на стороне для записи и после преобразования читаются на стороне для чтения. Следующие TS
встроены в большинство платформ, поддерживающих JavaScript
:
- поскольку строки в
JS
имеют кодировкуUTF-16
, данные в кодировкеUTF-8
обрабатываются как двоичные.TextDecoderStream
конвертирует такие данные в строки; TextEncoderStream
конвертирует строки в данные в кодировкеUTF-8
;CompressionStream
сжимает двоичные данные вGZIP
и другие форматы сжатия;DecompressionStream
извлек ает данные изGZIP
и других форматов.
RS
, WS
и TS
могут применяться для передачи текстовых или бинарных данных. В статье мы будем в основном говорить о текстовых данных. Байтовые потоки для бинарных данных кратко упоминаются в конце.
1.2. Конвейер
"Туннелирование" (piping) - это операция, позволяющая объединять (pipe) RS
и WS
: до тех пор, пока RS
производит данные, данная операция читает их и записывает в WS
. При объединении 2 потоков мы получаем надежный способ передачи данных из одной локации в другую (например, для копирования файла). Однако, можно объединить больше 2 потоков и получить конвейер для обработки данных разными способами. Пример конвейера:
- начинается с
RS
; - затем следует
TS
; - цепочка (chain) заканчивается
WS
.
1.3. Противодавление
Одной из проблем конвейера может стать ситуация, когда один из звеньев цепочки получает больше данных, чем можем обработать в данный момент. Противодавление (обратное давление) (backpressure) позволяет решить эту задачу: получатель сообщает отправителю о необходимости временно прекратить передачу данных во избежание перегрузки (переполнения).
Другими словами, противодавление - это сигнал, передающийся от перегруженного звена к началу цепочки. Представим, что у нас имеется такая цепочка:
RS -> TS -> WS
Путь противодавления будет следующим:
WS
сигнализирует, что не справляется с обработкой данных;- конвейер прекращает читать данные из
TS
; - данные аккумулируются (накапливаются) внутри
TS
(это называется буферизацией/buffering); TS
сигнализирует о заполненности;- конвейер перестает читать данные из
RS
.
Мы достигли начала цепочки. Пока данные накапливаются внутри RS
, у WS
есть время на восстановление. После восстановления WS
сигнализирует о готовности к получению данных. Этот сигнал также передается в начало цепочки, и обработка данных возобновляется.
1.4. Поддержка потоков в Node.js
В Node.js
потоки доступны из 2 источников:
- из модуля
node:stream/web
; - через глобальные переменные (как в браузере).
На данный момент только один API
напрямую поддерживает потоки в Node.js
- Fetch API
:
const response = await fetch("https://exmple.com");
const readableStream = response.body;
Для всего остального следует использовать один из следующих статических методов модуля node:stream
для преобразования Node.js-потока
в веб-поток, и наоборот:
Readable
:Readable.toWeb(nodeReadable)
;Readable.fromWeb(webReadableStream, options?)
;
Writable
:Writable.toWeb(nodeWritable)
;Writable.fromWeb(webWritableStream, options)
;
Duplex
:Duplex.toWeb(nodeDuplex)
;Duplex.fromWeb(webTransformStream, options?)
.
FileHandle
- еще один API
, частично поддерживающий потоки через метод readableWebStream
.
2. Чтение из RS
RS
позволяют читать чанки данных из разных источников. Они имеют следующую сигнатуру:
interface RS<TChunk> {
getReader(): ReadableStreamDefaultReader<TChunk>;
readonly locked: boolean;
[Symbol.asyncIterator](): AsyncIterator<TChunk>;
cancel(reason?: any): Promise<void>;
pipeTo(
destination: WS<TChunk>,
options?: StreamPipeOptions
): Promise<void>;
pipeThrough<TChunk2>(
transform: ReadableWritablePair<TChunk2, TChunk>,
options?: StreamPipeOptions
): RS<TChunk2>;
// Не рассматривается в статье
tee(): [RS<TChunk>, RS<TChunk>];
}
interface StreamPipeOptions {
signal?: AbortSignal;
preventClose?: boolean;
preventAbort?: boolean;
preventCancel?: boolean;
}
Свойства:
getReader()
: возвращаетReader
- объект, позволяющий читать изRS
.Readers
похожи на итераторы, возвращаемые перебираемыми сущностями;locked
: одновременно может использоваться только одинReader
для одногоRS
.RS
блокируется на время использованияReader
,getReader()
в этот период вызываться не может;[Symbol.asyncIterator]()
: данный метод делаетRS
асинхронно перебираемыми. В настоящее время он реализован только для некоторых платформ;cancel()
: отменяет поток, поскольку потребитель больше в нем не заинтересован.reason
(причина) передается в базовый источник (underlying source)RS
(об этом позже). Возвращаемый промис разрешается после выполнения этой операции;pipeTo()
: передает содержимоеRS
вWS
. Возвращаемый промис разрешается после выполнения этой операции.pipeTo()
обеспечивает корректную передачу противодавления, сигналов закрытия, ошибок и т.п. по цепочке. В качестве второго параметра он принимает следующие настройки:signal
: позволяет передаватьAbortSignal
для прерывания цепочки с помощьюAbortController
;preventClose
: если имеет значениеtrue
, предотвращает закрытиеWS
при закрытииRS
. Может быть полезным при подключении несколькихRS
к одномуWS
;- остальные настройки в статье не рассматриваются. Почитать о них можно здесь;
pipeThrough()
: подключаетRS
кReadableWritablePair
(по сути,TS
, об этом позже). Возвращает результирующийRS
(сторону для чтенияReadableWritablePair
).
Существует 2 способа потребления RS
:
Readers
;- асинхронный перебор.
2.1. Потребление RS
через Readers
Для чтения данных из RS
могут использоваться Readers
. Они имеют следующую сигнатуру:
interface ReadableStreamGenericReader {
readonly closed: Promise<undefined>;
cancel(reason?: any): Promise<void>;
}
interface ReadableStreamDefaultReader<TChunk>
extends ReadableStreamGenericReader
{
releaseLock(): void;
read(): Promise<ReadableStreamReadResult<TChunk>>;
}
interface ReadableStreamReadResult<TChunk> {
done: boolean;
value: TChunk | undefined;
}
Свойства:
closed
: данный промис разрешается после закрытия потока. Он отклоняется при возникновении ошибки или в случае, когда блокировкаReader
снимается до закрытия потока;cancel()
: в активномReader
данный метод отменяет соответствующийRS
;releaseLock()
: деактивируетReader
и разблокирует поток;read()
: возвращает промис дляReadableStreamReadResult
(обертка для чанка) с 2 свойствами:done
: логическое значение -false
, если чанки могут читаться,true
после последнего чанка;value
: чанк илиundefined
после последнего чанка.
RS
похожи на итерируемые сущности, Readers
- на итераторы, а ReadableStreamReadResult
- на объекты, возвращаемые методом next
итераторов.
Код, демонстрирующий прот окол использования Readers
:
const reader = readableStream.getReader(); // 1
assert.equal(readableStream.locked, true); // 2
try {
while (true) {
const { done, value: chunk } = await reader.read(); // 3
if (done) break;
// Используем `chunk`
}
} finally {
reader.releaseLock(); // 4
}
Получение Reader
. Мы можем читать прямо из RS
. Сначала получаем Reader
(1). Каждый RS
может иметь только один Reader
. После получения Reader
RS
блокируется (2). Перед следующим вызовом getReader()
необходимо вызвать releaseLock()
(4).
Чтение чанков. read()
возвращает промис для объекта со свойствами done
и value
(3). После чтения последнего чанка done
принимает значение true
. Это похоже на то, как в JS
работает асинхронный перебор.
2.1.1. Пример: чтение файла через RS
В следующем примере читаются чанки (строки) из текстового файла data.txt
:
import * as fs from "node:fs";
import { Readable } from "node:stream";
const nodeReadable = fs.createReadStream(
"data.txt",
{ encoding: "utf-8" }
);
const webReadableStream = Readable.toWeb(nodeReadable); // 1
const reader = webReadableStream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
console.log(value);
}
} finally {
reader.releaseLock();
}
Поток Node.js
конвертируется в веб-поток. Затем приведенный выше протокол используется для чтения чанков.
2.1.2. Пример: формирование строки из содержимого RS
В следующем примере чанки из RS
объединяются в возвращаемую строку:
async function readableStreamToString(readableStream) {
const reader = readableStream.getReader();
try {
let result = "";
while (true) {
const { done, value } = await reader.read();
if (done) {
return result; // 1
}
result += value;
}
} finally {
reader.releaseLock(); // 2
}
}
Блок finally
выполняется всегда, независимо от результата блока try
. Поэтому блокировка корректно снимается (2) после возвращения результата (1).