Skip to main content

Tokio

Tokio - это асинхронная среда выполнения (runtime) кода Rust. Она предоставляет строительные блоки, необходимые для разработки сетевых приложений любого размера.

Обзор

На высоком уровне Tokio предоставляет несколько основных компонентов:

  • многопоточную среду выполнения асинхронного кода
  • асинхронную версию стандартной библиотеки (std)
  • большую экосистему библиотек

Назначение Tokio

Асинхронный код позволяет выполнять несколько задач одновременно. Однако Rust не предоставляет встроенных инструментов для выполнения такого кода, для этого требуется сторонняя среда выполнения. Tokio является самой популярной из таких сред.

Кроме того, Tokio предоставляет множество полезных утилит. При написании асинхронного кода мы не можем использовать обычные блокирующие API, предоставляемые стандартной библиотекой Rust, и должны использовать их асинхронные версии. Tokio предоставляет такие версии, где это необходимо.

Преимущества Tokio

Скорость

Tokio является быстрым, поскольку таковым является сам Rust. Производительность кода, написанного с помощью Tokio, сопоставима с производительностью кода, написанного самостоятельно.

Tokio является масштабируемым благодаря масштабируемости async/await. При работе с сетью существует естественный предел скорости обработки запроса из-за задержки (latency), поэтому единственным способом масштабирования является обработка нескольких запросов одновременно. async/await позволяет легко и дешево увеличивать количество одновременно выполняемых задач.

Надежность

Tokio основан на Rust - языке, позволяющем разрабатывать надежное и эффективное ПО. Разные исследования (например, это и это) показывают, что около 70% серьезных ошибок безопасности являются результатом небезопасной работы с памятью. Использование Rust избавляет от всего этого класса багов в приложении.

Tokio сосредоточен на предоставлении предсказуемого поведения без сюрпризов. Основная цель Tokio - предоставить пользователям возможность развертывать предсказуемое ПО, которое будет работать одинаково изо дня в день с одинаковым временем отклика и без непредсказуемых скачков задержки.

Простота

Благодаря появлению async/await в Rust, сложность создания асинхронных приложений существенно снизилась. В сочетании с утилитами Tokio и динамичной экосистемой, разработка асинхронных приложений становится проще простого.

Tokio следует соглашению об именовании стандартной библиотеки, когда это имеет смысл. Это позволяет легко конвертировать код, написанный с использованием стандартной библиотеки, в код, написанный с помощью Tokio. Благодаря строгой системе типов Rust, обеспечение правильности написанного кода не знает себе равных.

Случаи, для которых Tokio не подходит

Хотя Tokio полезен для многих проектов, в которых необходимо выполнять много задач одновременно, есть случаи, для которых он не подходит:

  • ускорение вычислений, выполняемых ЦП, за счет их параллельного выполнения в нескольких потоках. Tokio предназначен для приложений, связанных с вводом-выводом, где каждая отдельная задача проводит большую часть времени в ожидании ввода-вывода. Если все, что делает наше приложение, - это параллельные вычисления, нам следует использовать rayon. Впрочем, ничто не мешает нам использовать Tokio и rayon вместе (пример)
  • чтение большого количества файлов. Может показаться, что Tokio будет полезен для проектов, которым нужно читать много файлов, однако Tokio здесь не дает никаких преимуществ по сравнению с обычным пулом потоков (thread pool). Это связано с тем, что ОС обычно не предоставляют асинхронные файловые API
  • отправка единственного веб-запроса. Tokio полезен, когда нужно выполнять много задач одновременно. Если нам нужно использовать библиотеку, предназначенную для выполнения асинхронного кода, такую ​​как reqwest, но не нужно выполнять много задач одновременно, следует предпочесть блокирующую версию этой библиотеки, поскольку она упростит код проекта. Конечно, в этом случае Tokio будет работать, но не даст никаких преимуществ перед блокирующим API

Настройка

Этот туториал шаг за шагом проведет вас через процесс создания клиента и сервера Redis. Мы начнем с основ асинхронного программирования на Rust и будем двигаться дальше. Мы реализуем несколько команд Redis и получим полный обзор Токио.

Проект, который мы создадим в этом туториале, доступен как Mini-Redis на GitHub. Mini-Redis разработан с основной целью изучения Tokio и поэтому очень хорошо прокомментирован, но это также означает, что в Mini-Redis отсутствуют некоторые функции, которые нужны в настоящей библиотеке Redis. Готовые к использованию библиотеки Redis можно найти на crates.io.

Предварительные условия

Для прохождения этого туториала вы должны быть знакомы с Rust.

Хотя это и не обязательно, но некоторый опыт написания сетевого кода с использованием стандартной библиотеки Rust или другого языка программирования может оказаться полезным.

Рекомендуется использовать самую последнюю стабильную версию Rust.

Далее нужно установить сервер Mini-Redis. Он будет использоваться для тестирования разрабатываемого нами клиента.

cargo install mini-redis

Команда для запуска сервера:

mini-redis-server

Команда для получения значения по ключу foo (выполняем в отдельном терминале):

mini-redis-cli get foo

Выполнение этой команды должно привести к отображению (nil) в терминале.

Привет, Tokio

Начнем с создания очень простого приложения Tokio. Оно будет подключаться к серверу Mini-Redis и устанавливать ключ hello в значение world. Затем оно будет читать значение этого ключа. Это будет делаться с помощью клиента Mini-Redis.

Код

Создание крейта

Начнем с генерации нового приложения Rust:

cargo new my-redis
cd my-redis

Добавление зависимостей

Открываем файл Cargo.toml и добавляем следующие записи в раздел [dependencies]:

tokio = { version = "1", features = ["full"] }
mini-redis = "0.4"

Написание кода

Редактируем файл main.rs следующим образом:

use mini_redis::{client, Result};

#[tokio::main]
async fn main() -> Result<()> {
// Подключаемся к серверу mini-redis
let mut client = client::connect("127.0.0.1:6379").await?;

// Устанавливаем ключ "hello" в значение "world"
client.set("hello", "world".into()).await?;

// Получаем значение ключа "hello"
let result = client.get("hello").await?;

println!("От сервера получено: {:?}", result);

Ok(())
}

Убедитесь, что сервер Mini-Redis запущен.

Запускаем приложение my-redis:

cargo run

Получаем:

От сервера получено: Some(b"world")

Полный код примера можно найти здесь.

Разбор

Разберем код, который мы написали, построчно. Его мало, но делает он много всего.

let mut client = client::connect("127.0.0.1:6379").await?;

Функция client::connect предоставляется крейтом mini-redis. Она асинхронно устанавливает TCP-соединение с указанным сервером (адресом). После установки соединения возвращается дескриптор клиента (client). Несмотря на то, что операция выполняется асинхронно, код выглядит синхронным. Единственный признак того, что операция является асинхронной, - оператор .await.

Что такое асинхронное программирование?

Большинство компьютерных программ выполняются в том порядке, в котором они написаны. Выполняется первая строка, затем следующая и т.д. В синхронном программировании, когда программа встречает операцию, которая не может быть завершена немедленно, она блокируется до тех пор, пока операция не завершится. Например, для установки TCP-соединения требуется обмен данными с одноранговым узлом (peer) по сети, что занимает некоторое время. В это время поток блокируется.

В асинхронном программировании операции, которые не могут завершиться немедленно, приостанавливаются в фоновом режиме. Поток не блокируется и может выполнять другие задачи. После завершения операции, задача возобновляется и продолжает выполняться с того места, где она была остановлена. В нашем примере имеется только одна задача, поэтому пока она приостановлена, ничего не происходит, но асинхронные программы обычно имеют много таких задач.

Хотя асинхронное программирование может привести к созданию более быстрых приложений, оно часто приводит к созданию гораздо более сложных программ. Программисту необходимо отслеживать все состояния, необходимые для возобновления работы после завершения асинхронной операции. Это утомительная и подверженная ошибкам задача.

Зеленые потоки

Rust реализует асинхронное программирование с помощью паттерна async/await. Функции, выполняющие асинхронные операции, помечаются ключевым словом async. В нашем примере функция connect определяется следующим образом:

use mini_redis::Result;
use mini_redis::client::Client;
use tokio::net::ToSocketAddrs;

pub async fn connect<T: ToSocketAddrs>(addr: T) -> Result<Client> {
// ...
}

async fn выглядит как обычная синхронная функция, но работает асинхронно. Rust преобразует асинхронную функцию во время компиляции в подпрограмму (routine), работающую асинхронно. Вызов .await внутри async fn возвращает управление потоку. Поток может выполнять другую работу, пока асинхронная операция выполняется в фоновом режиме.

Хотя другие языки тоже реализуют async/await, Rust использует уникальный подход. Прежде всего, асинхронные операции в Rust являются ленивыми (lazy). Это приводит к отличиям в семантике времени выполнения.

Использование async/await

Асинхронная функция вызываются также, как обычная. Однако вызов такой функции не приводит к выполнению ее тела. Вместо этого вызов async fn возвращает значение, представляющее операцию. Это концептуально аналогично замыканию с нулевым аргументом (zero-argument closure). Для фактического запуска операции используется оператор .await на возвращаемом значении.

Например, результатом выполнения программы

async fn say_world() {
println!("world");
}

#[tokio::main]
async fn main() {
// Вызов функции `say_world` не выполняет ее тело
let op = say_world();

// Сначала выполняется этот код
println!("hello");

// Вызов `.await` на `op` выполняет тело `say_world()`
op.await;
}

является

hello
world

Возвращаемое значение async fn - это анонимный тип, реализующий типаж (трейт) Future (фьючер).

Асинхронная функция main

Функция main, используемая для запуска приложения, отличается от обычной, встречающейся в большинстве крейтов Rust:

  1. Это async fn.
  2. Она аннотирована с помощью #[tokio::main].

async fn используется для входа в асинхронный контекст. Однако асинхронные функции должны выполняться runtime. runtime содержит асинхронный планировщик задач, обеспечивает событийный ввод-вывод, таймеры и др. Среда выполнения не запускается автоматически, поэтому ее должна запустить функция main.

Функция #[tokio::main] - это макрос. Она преобразует async fn main() в синхронную fn main(), которая инициализирует экземпляр среды выполнения и выполняет тело асинхронной функции.

Например, код

#[tokio::main]
async fn main() {
println!("hello");
}

трансформируется в

fn main() {
let mut rt = tokio::runtime::Runtime::new().unwrap();

rt.block_on(async {
println!("hello");
})
}

Возможности Cargo

В этом туториале используются все возможности Tokio (флаг full):

tokio = { version = "1", features = ["full"] }

Tokio предоставляет богатый функционал (TCP, UDP, сокеты Unix, таймеры, утилиты синхронизации, несколько типов планировщиков и др.). Не всем приложениям нужен весь этот функционал. При оптимизации времени компиляции или размера конечного файла приложения можно указать только те функции, которые используются приложением.

Создание потоков

Приступим к разработке сервера Redis.

Сначала переместим код клиента из предыдущего раздела в отдельный файл:

mkdir -p examples
mv src/main.rs examples/hello-redis.rs

Затем создадим новый пустой файл src/main.rs.

Прием сокетов

Первое, что должен делать наш сервер, - принимать входящие TCP-сокеты. Это делается путем привязки tokio::net::TcpListener к порту 6379.

Многие типы Tokio называются также, как их синхронные эквиваленты в стандартной библиотеке Rust. Когда это имеет смысл, Tokio предоставляет те же API, что и std, но с использованием async fn.

Сокеты принимаются в цикле. Каждый сокет обрабатывается и закрывается. Прочитаем команду, выведем ее на стандартный вывод и ответим ошибкой:

// src/main.rs
use tokio::net::{TcpListener, TcpStream};
use mini_redis::{Connection, Frame};

#[tokio::main]
async fn main() {
// Привязываем обработчик к адресу
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

loop {
// Второй элемент содержит IP и порт нового подключения
let (socket, _) = listener.accept().await.unwrap();
process(socket).await;
}
}

async fn process(socket: TcpStream) {
// `Connection` позволяет читать/писать кадры (frames) redis вместо
// потоков байтов. Тип `Connection` определяется mini-redis
let mut connection = Connection::new(socket);

if let Some(frame) = connection.read_frame().await.unwrap() {
println!("GOT: {:?}", frame);

// Отвечаем ошибкой
let response = Frame::Error("Unimplemented".to_string());
connection.write_frame(&response).await.unwrap();
}
}

Запускаем программу:

cargo run

Запускаем пример hello-redis в отдельном терминале:

cargo run --example hello-redis

Вывод в терминале примера:

Error: "Unimplemented"

Вывод в терминале сервера:

GOT: Array([Bulk(b"set"), Bulk(b"hello"), Bulk(b"world")])

Одновременность

У нашего сервера есть небольшая проблема (помимо того, что он отвечает только ошибками). Он обрабатывает входящие запросы по одному. После установки соединения, сервер остается внутри цикла приема подключений до тех пор, пока ответ не будет полностью записан в сокет.

Мы хотим, чтобы наш сервер обрабатывал много запросов одновременно. Для этого нам нужно добавить немного конкурентности.

Одновременность (concurrency) и параллелизм (parallelism) - это не одно и тоже. Если мы переключаемся между двумя задачами, то мы работаем над ними одновременно, а не параллельно. Чтобы эту работу можно было считать параллельной, нам потребуются два человека, по одному на каждую задачу.

Одним из преимуществ использования Tokio является то, что асинхронный код позволяет работать над многими задачами одновременно, без необходимости работать над ними параллельно с использованием обычных потоков. Фактически, Tokio может выполнять множество задач одновременно в одном потоке!

Для одновременной обработки соединений для каждого входящего соединения создается новая задача. Соединение обрабатывается этой задачей.

Цикл принятия соединений становится таким:

use tokio::net::TcpListener;

#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

loop {
let (socket, _) = listener.accept().await.unwrap();
// Для каждого входящего сокета создается новая задача. Сокет
// перемещается в новую задачу и обрабатывается там
tokio::spawn(async move {
process(socket).await;
});
}
}

Задачи

Задача Tokio - это асинхронный зеленый поток (green thread). Они создаются путем передачи async блока в tokio::spawn(). Функция tokio::spawn возвращает JoinHandle, который вызывающая сторона может использовать для взаимодействия с созданной задачей. async блок может иметь возвращаемое значение. Вызывающая сторона может получить его с помощью .await на JoinHandle.

Например:

#[tokio::main]
async fn main() {
let handle = tokio::spawn(async {
// Выполняем асинхронную работу
"return value"
});

// Выполняем другую работу

let out = handle.await.unwrap();
println!("GOT: {}", out);
}

Ожидание JoinHandle возвращает Result. Если во время выполнения задачи возникает ошибка, JoinHandle возвращает Err. Это происходит, когда задача либо вызывает панику, либо принудительно отменяется из-за закрытия среды выполнения.

Задача - это единица выполнения (unit of execution), управляемая планировщиком. При создании задачи она передается планировщику Tokio, который гарантирует ее выполнение при появлении у нее работы. Порожденная задача может выполняться в том же потоке, в котором она была создана, или в другом потоке времени выполнения. Задачу также можно перемещать между потоками после создания.

Задачи в Токио очень легкие. По сути, им требуется только одно выделение и 64 байта памяти. Приложения должны иметь возможность свободно создавать тысячи, если не миллионы задач.

Привязка 'static

При создании задачи в среде выполнения Tokio, время жизни ее типа должно быть 'static. Это означает, что порожденная задача не должна содержать никаких ссылок на данные, не принадлежащие ей.

Распространено заблуждение, что 'static означает "жить вечно", но это не так. Тот факт, что значение является 'static, не означает, что у нас есть утечка памяти. Больше об этом можно прочитать здесь.

Например, следующий код не скомпилируется:

use tokio::task;

#[tokio::main]
async fn main() {
let v = vec![1, 2, 3];

task::spawn(async {
println!("Это вектор: {:?}", v);
});
}
error[E0373]: async block may outlive the current function, but
it borrows `v`, which is owned by the current function
--> src/main.rs:7:23
|
7 | task::spawn(async {
| _______________________^
8 | | println!("Это вектор: {:?}", v);
| | - `v` is borrowed here
9 | | });
| |_____^ may outlive borrowed value `v`
|
note: function requires argument type to outlive `'static`
--> src/main.rs:7:17
|
7 | task::spawn(async {
| _________________^
8 | | println!("Это вектор: {:?}", v);
9 | | });
| |_____^
help: to force the async block to take ownership of `v` (and any other
referenced variables), use the `move` keyword
|
7 | task::spawn(async move {
8 | println!("Это вектор: {:?}", v);
9 | });
|

Это происходит потому, что по умолчанию переменные не перемещаются в асинхронные блоки. Вектор v остается во владении функции main. println! заимствует v. Компилятор Rust любезно объясняет нам это и даже предлагает исправление! Изменение строки 7 на task::spawn(async move { даст указание компилятору переместить v в порожденную задачу. Теперь задача владеет всеми своими данными, что делает их 'static.

Если часть данных должна быть доступна одновременно в нескольких задачах, ее необходимо распределять (сделать общей) с помощью примитивов синхронизации, таких как Arc.

Обратите внимание, что в сообщении об ошибке говорится о том, что тип аргумента переживает время жизни 'static. Эта терминология может сбивать с толку, поскольку время жизни 'static длится до конца программы, поэтому, если тип переживает его, не возникает ли у нас утечки памяти? Объяснение состоит в том, что именно тип, а не значение, должен переживать время жизни 'static, и значение может быть уничтожено до того, как его тип перестанет быть действительным.

Когда мы говорим, что значение является "статическим", это означает лишь то, что было бы правильно хранить его вечно. Это важно, поскольку компилятор не может определить, как долго будет выполняться вновь созданная задача. Мы должны убедиться, что задаче разрешено жить вечно, чтобы Tokio мог выполнять ее столько, сколько необходимо.

"Привязка 'static", "тип, переживающий 'static" и "'static значение" обозначают одно и тоже - T: 'static, в отличие от "аннотации с помощью 'static", как в &'static T.

Привязка bound

Задачи, порожденные tokio::spawn(), должны реализовывать типаж Send. Это позволяет среде выполнения Tokio перемещать задачи между потоками, пока они приостановлены в .await.

Задачи являются Send, когда все данные, хранящиеся в вызовах .await, являются таковыми. При вызове .await задача возвращается (yields back) планировщику. При следующем выполнении задачи, она возобновляется с той точки, на которой была приостановлена (yielded) в последний раз. Чтобы это работало, все состояние, используемое после .await, должно сохраняться задачей. Если это состояние являются Send, т.е. его можно перемещать между потоками, то и саму задачу можно перемещать между потоками. И наоборот, если состояние не являются Send, то и задача тоже.

Например, это работает:

use tokio::task::yield_now;
use std::rc::Rc;

#[tokio::main]
async fn main() {
tokio::spawn(async {
// Область видимости уничтожает `rc` перед `.await`
{
let rc = Rc::new("hello");
println!("{}", rc);
}

// `rc` больше не используется. Он не сохраняется, когда
// задача возвращается планировщику
yield_now().await;
});
}

А это не работает:

use tokio::task::yield_now;
use std::rc::Rc;

#[tokio::main]
async fn main() {
tokio::spawn(async {
let rc = Rc::new("hello");

// `rc` используется после `.await`. Он должен быть сохранен в
// состоянии задачи
yield_now().await;

println!("{}", rc);
});
}

Попытка компиляции этого фрагмента завершается такой ошибкой:

error: future cannot be sent between threads safely
--> src/main.rs:6:5
|
6 | tokio::spawn(async {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: [..]spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in
| `tokio::task::spawn::spawn`
|
= help: within `impl std::future::Future`, the trait
| `std::marker::Send` is not implemented for
| `std::rc::Rc<&str>`
note: future is not `Send` as this value is used across an await
--> src/main.rs:10:9
|
7 | let rc = Rc::new("hello");
| -- has type `std::rc::Rc<&str>` which is not `Send`
...
10 | yield_now().await;
| ^^^^^^^^^^^^^^^^^ await occurs here, with `rc` maybe
| used later
11 | println!("{}", rc);
12 | });
| - `rc` is later dropped here

Хранение значений

Теперь мы реализуем функцию process для обработки входящих команд. Мы будем использовать HashMap для хранения значений. Команды SET будут добавлять значения в HashMap, а команды GET будут извлекать значения из HashMap. Кроме того, мы будем использовать цикл для приема нескольких команд в одном соединении.

use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};

async fn process(socket: TcpStream) {
use mini_redis::Command::{self, Get, Set};
use std::collections::HashMap;

// Хранилище данных
let mut db = HashMap::new();

// `Connection`, предоставляемое `mini-redis`, обрабатывает разбор кадров из сокета
let mut connection = Connection::new(socket);

// Используем `read_frame` для получения команды из соединения
while let Some(frame) = connection.read_frame().await.unwrap() {
let response = match Command::from_frame(frame).unwrap() {
Set(cmd) => {
// Значение хранится в виде `Vec<u8>`
db.insert(cmd.key().to_string(), cmd.value().to_vec());
Frame::Simple("OK".to_string())
}
Get(cmd) => {
if let Some(value) = db.get(cmd.key()) {
// `Frame::Bulk` ожидает, что данные будут иметь тип `Bytes`.
// Мы рассмотрим этот тип позже.
// `&Vec<u8>` преобразуется в `Bytes` с помощью метода `into`
Frame::Bulk(value.clone().into())
} else {
Frame::Null
}
}
cmd => panic!("Не реализовано {:?}", cmd),
};

// Отправляем (пишем) ответ клиенту
connection.write_frame(&response).await.unwrap();
}
}

Запускаем сервер:

cargo run

В отдельном терминале запускаем пример hello-redis:

cargo run --example hello-redis

Полный код примера можно найти здесь.

Теперь мы можем устанавливать и получать значения, но есть одна проблема: значения не распределяются между соединениями. Если другой подключенный сокет попытается получить значение по ключу hello, он получит (nil).

Общее состояние

На данный момент у нас имеется работающий сервер "ключ-значение". Однако у него есть серьезный недостаток: состояние не распределяется между соединениями. Давайте это исправим.

Стратегии

В Tokio существует несколько разных способов распределять состояние:

  1. Защита общего состояния с помощью Mutex.
  2. Создание задачи для управления состоянием и использование передачи сообщений для работы с ней.

Обычно первый подход используется для простых данных, а второй - для вещей, требующих асинхронной работы, таких как примитивы ввода-вывода. В нашем случае общим состоянием является HashMap, а операциями - insert() и get(). Ни одна из этих операций не является асинхронной, поэтому мы будем использовать Mutex.

Второй подход будет рассмотрен в следующей главе.

Добавление зависимости bytes

Вместо Vec<u8> Mini-Redis использует Bytes из крейта bytes. Цель Bytes - предоставить надежную структуру массива байтов для сетевого программирования. Основная особенность, которую он добавляет к Vec<u8>, - это поверхностное клонирование (shallow cloning). Другими словами, вызов метода clone для экземпляра Bytes не копирует данные. Тип Bytes примерно соответствует Arc<Vec<u8>>, но с некоторыми дополнительными возможностями.

Добавляем bytes в раздел [dependencies] файла Cargo.toml:

bytes = "1"

Инициализация HashMap

HashMap будет использоваться многими задачами и, возможно, многими потоками. Для поддержки этого его нужно обернуть в Arc<Mutex<_>>.

Во-первых, для удобства добавим следующий псевдоним типа после операторов use:

use bytes::Bytes;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

type Db = Arc<Mutex<HashMap<String, Bytes>>>;

Затем обновляем функцию main, чтобы инициализировать HashMap и передать дескриптор (handle) Arc в функцию process. Использование Arc позволяет одновременно ссылаться на HashMap из многих задач, потенциально работающих во многих потоках. В Tokio термин "дескриптор" используется для обозначения значения, которое обеспечивает доступ к некоторому общему состоянию.

use tokio::net::TcpListener;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

println!("Listening");

let db = Arc::new(Mutex::new(HashMap::new()));

loop {
let (socket, _) = listener.accept().await.unwrap();
// Клонируем дескриптор в `HashMap`
let db = db.clone();

println!("Accepted");

tokio::spawn(async move {
process(socket, db).await;
});
}
}

Об использовании std::sync::Mutex

Обратите внимание, что для защиты HashMap используется std::sync::Mutex, а не tokio::sync::Mutex. Распространенной ошибкой является безусловное использование tokio::sync::Mutex в асинхронном коде. Асинхронный мьютекс - это мьютекс, который блокируется при вызовах .await.

Синхронный мьютекс блокирует текущий поток в ожидании получения блокировки (acquire the lock). Это, в свою очередь, блокирует обработку других задач. Однако переключение на tokio::sync::Mutex обычно не помогает, поскольку асинхронный мьютекс использует синхронный мьютекс под капотом. Как правило, использование синхронного мьютекса в асинхронном коде вполне допустимо, пока конкуренция остается низкой и блокировка не удерживается при вызовах .await.

Обновление функции process

Функция process больше не инициализирует HashMap. Вместо этого, она принимает общий дескриптор HashMap в качестве параметра. Перед использованием HashMap его необходимо заблокировать. Помните, что типом значения HashMap теперь является Bytes (который можно легко клонировать), поэтому его также необходимо изменить.

use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};

async fn process(socket: TcpStream, db: Db) {
use mini_redis::Command::{self, Get, Set};

let mut connection = Connection::new(socket);

while let Some(frame) = connection.read_frame().await.unwrap() {
let response = match Command::from_frame(frame).unwrap() {
Set(cmd) => {
// Блокируем `HashMap`
let mut db = db.lock().unwrap();
db.insert(cmd.key().to_string(), cmd.value().clone());
Frame::Simple("OK".to_string())
}
Get(cmd) => {
// Блокируем `HashMap`
let db = db.lock().unwrap();
if let Some(value) = db.get(cmd.key()) {
Frame::Bulk(value.clone())
} else {
Frame::Null
}
}
cmd => panic!("Не реализовано {:?}", cmd),
};

connection.write_frame(&response).await.unwrap();
}
}

Задачи, потоки и конкуренция

Использование блокирующего мьютекса для защиты небольших критических блоков кода является приемлемой стратегией, когда конкуренция минимальна. Когда возникает борьба за блокировку, поток, выполняющий задачу, должен заблокироваться и дождаться мьютекса. Это заблокирует не только текущую задачу, но также другие задачи, запланированные в текущем потоке.

По умолчанию Tokio использует многопоточный планировщик. Задачи планируются для любого количества потоков, управляемых средой выполнения. Если запланировано выполнение большого количества задач и все они требуют доступа к мьютексу, возникает конфликт. С другой стороны, если используется вариант среды выполнения (runtime flavor) current_thread, то мьютекс никогда не будет конкурировать.

Вариант среды выполнения current_thread представляет собой облегченную однопоточную среду выполнения. Это хороший выбор, когда создается всего несколько задач и открывается несколько сокетов. Например, этот вариант хорошо работает при предоставлении моста синхронного API поверх асинхронной клиентской библиотеки.

Если конкуренция за синхронный мьютекс становится проблемой, переключение на мьютекс Tokio редко будет лучшим решением. Вместо этого можно рассмотреть следующие варианты:

  • переключение на специальную задачу для управления состоянием и использование передачи сообщений для работы с ней
  • сегментирование (shard) мьютекса
  • реструктуризация кода для удаления мьютекса

В нашем случае, поскольку каждый ключ независим, отлично подойдет сегментирование мьютекса. Для этого вместо одного экземпляра Mutex<HashMap<_, _>> мы создадим N отдельных экземпляров.

type ShardedDb = Arc<Vec<Mutex<HashMap<String, Vec<u8>>>>>;

fn new_sharded_db(num_shards: usize) -> ShardedDb {
let mut db = Vec::with_capacity(num_shards);
for _ in 0..num_shards {
db.push(Mutex::new(HashMap::new()));
}
Arc::new(db)
}

После этого поиск ячейки для любого заданного ключа становится двухэтапным процессом. Сначала ключ используется для определения того, частью какого сегмента он является. Затем ключ просматривается в HashMap.

let shard = db[hash(key) % db.len()].lock().unwrap();
shard.insert(key, value);

Простая реализация, описанная выше, требует использования фиксированного количества сегментов, и количество сегментов не может быть изменено после создания сегментированной карты. Крейт dashmap предоставляет реализацию более сложной сегментированной хэш-карты.

Удержание MutexGuard через .await

Мы можем написать код, который выглядит так:

use std::sync::{Mutex, MutexGuard};

async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;

do_something_async().await;
} // здесь `lock` выходит из области видимости

При попытке создать что-то, вызывающее эту функцию, мы получим следующее сообщение об ошибке:

error: future cannot be sent between threads safely
--> src/lib.rs:13:5
|
13 | tokio::spawn(async move {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in `tokio::task::spawn::spawn`
|
= help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, i32>`
note: future is not `Send` as this value is used across an await
--> src/lib.rs:7:5
|
4 | let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
| -------- has type `std::sync::MutexGuard<'_, i32>` which is not `Send`
...
7 | do_something_async().await;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `mut lock` maybe used later
8 | }
| - `mut lock` is later dropped here

Это происходит потому, что тип std::sync::MutexGuard не является Send. Это означает, что мы не можем отправить блокировку мьютекса в другой поток, и ошибка возникает, потому что Tokio может перемещать задачу между потоками при каждом .await. Следовательно, нам нужно реструктурировать код таким образом, чтобы деструктор блокировки мьютекса запускался до .await.

// Это работает
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
{
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
} // здесь `lock` выходит из области видимости

do_something_async().await;
}

Обратите внимание, что это не работает:

use std::sync::{Mutex, MutexGuard};

async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
drop(lock);

do_something_async().await;
}

Это связано с тем, что компилятор вычисляет, является ли фьючер (future) Send, только на основе информации об области видимости. Возможно, в будущем компилятор будет поддерживать явное удаление блокировки, но сейчас для этого необходимо использовать область видимости.

Не пытайтесь решить эту проблему путем создания задачи, которая не должна быть Send, потому что если Tokio приостановит задачу в .await, пока она удерживает блокировку, в том же потоке может быть запланировано выполнение другой задачи, которая также может попытаться заблокировать этот мьютекс, что приведет к взаимоблокировке, поскольку задача, ожидающая блокировки мьютекса, не позволит задаче, удерживающей мьютекс, освободить его.

Ниже мы обсудим некоторые подходы к решению этой проблемы.

Реструктуризация кода, чтобы не удерживать блокировку через .await

Мы уже видели один пример этого в приведенном выше фрагменте кода, но есть несколько более надежных способов. Например, можно обернуть мьютекс в структуру и блокировать его только внутри синхронных методов этой структуры:

use std::sync::Mutex;

struct CanIncrement {
mutex: Mutex<i32>,
}
impl CanIncrement {
// Эта функция является синхронной
fn increment(&self) {
let mut lock = self.mutex.lock().unwrap();
*lock += 1;
}
}

async fn increment_and_do_stuff(can_incr: &CanIncrement) {
can_incr.increment();
do_something_async().await;
}

Этот шаблон гарантирует, что мы не столкнемся с ошибкой Send, поскольку защита мьютекса отсутствует в асинхронной функции.

Создание задачи для управления состоянием и использование передачи сообщений для работы с ней

Это второй подход, упомянутый в начале этого раздела, и он часто используется, когда общий ресурс является ресурсом ввода-вывода. Мы подробно поговорим о нем в следующем разделе.

Использование асинхронного мьютекса Токио

Также можно использовать тип tokio::sync::Mutex, предоставляемый Tokio. Основная особенность мьютекса Tokio заключается в том, что его можно без проблем удерживать в .await. Тем не менее, асинхронный мьютекс обходится дороже, чем обычный, и лучше использовать один из двух других подходов.

// Мьютекс Tokio
use tokio::sync::Mutex;

// Это компилируется
// (но в данном случае реструктуризация кода будет лучшим решением)
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock = mutex.lock().await;
*lock += 1;

do_something_async().await;
} // здесь `lock` выходит из области видимости

Каналы

Возвращаемся к клиенту. Поместим код сервера, который мы написали, в отдельный двоичный файл:

mkdir src/bin
mv src/main.rs src/bin/server.rs

Создаем новый двоичный файл, который будет содержать код клиента:

touch src/bin/client.rs

Команда для запуска сервера:

cargo run --bin server

Команда для запуска клиента:

cargo run --bin client

Допустим, мы хотим запустить две команды Redis одновременно. Мы можем создать одну задачу для каждой команды. Тогда обе команды будут выполняться одновременно.

Мы могли бы написать что-то вроде этого:

use mini_redis::client;

#[tokio::main]
async fn main() {
// Подключаемся к серверу
let mut client = client::connect("127.0.0.1:6379").await.unwrap();

// Создаем две задачи: одна извлекает значение, другая устанавливает значение по ключу
let t1 = tokio::spawn(async {
let res = client.get("foo").await;
});

let t2 = tokio::spawn(async {
client.set("foo", "bar".into()).await;
});

t1.await.unwrap();
t2.await.unwrap();
}

Но этот код не скомпилируется, поскольку обеим задачам требуется доступ к клиенту. Поскольку Client не реализует Copy, он не будет компилироваться без кода, обеспечивающего возможность его совместного использования. Кроме того, Client::set принимает &mut self, что означает, что для его вызова требуется монопольный доступ к клиенту. Мы могли бы открывать соединение для каждой задачи, но это не оптимально. Мы не можем использовать std::sync::Mutex, так как .await нужно вызывать с удерживаемой блокировкой. Мы могли бы использовать tokio::sync::Mutex, но это позволит выполнить только один текущий запрос. Если клиент реализует конвейерную обработку, асинхронный мьютекс приведет к недостаточному использованию (underutilizing) соединения.

Передача сообщений

Ответ заключается в использовании передачи сообщений. Шаблон предполагает создание специальной задачи для управления ресурсом client. Любая задача, желающая обработать (issue) запрос, отправляет сообщение задаче client. Задача client выдает (issue) запрос от имени отправителя, и ответ отправляется отправителю (простите за тавтологию).

В этой стратегии устанавливается одно соединение. Задача, управляющая client, может получить к нему монопольный доступ для вызова get и set. Кроме того, канал работает как буфер. Операции могут отправляться задаче client, пока она занята. Как только задача client освобождается (становится доступной для обработки новых запросов), она извлекает следующий запрос из канала. Это может привести к повышению пропускной способности и может быть расширено для поддержки пула соединений.

Примитивы каналов Tokio

Tokio предоставляет несколько каналов, которые служат разным целям:

  • mpsc - канал с несколькими производителями (producer) и одним потребителем (consumer). Можно отправлять много значений
  • oneshot - канал с одним производителем и одним потребителем. Можно отправлять одно значение
  • broadcast - канал с несколькими производителями и потребителями. Можно отправлять несколько значений. Каждый получатель видит каждое значение
  • watch - канал с одним производителем и несколькими потребителями. Можно отправлять много значений, но история не сохраняется. Каждый получатель видит только последнее значение

Если нам нужен многопользовательский канал с несколькими производителями, где каждое сообщение видит только один потребитель, можно использовать крейт async-channel. Существуют также синхронные каналы, например, std::sync::mpsc и crossbeam::channel. Эти каналы ждут сообщений, блокируя поток, что не разрешено в асинхронном коде.

В этом разделе мы будем использовать mpsc и oneshot. Другие типы каналов передачи сообщений рассматриваются в следующих разделах. Полный код из этого раздела можно найти здесь.

Определение типа сообщения

В большинстве случаев при использовании передачи сообщений задача, получающая сообщения, отвечает более чем на одну команду. В нашем случае задача будет реагировать на команды GET и SET. Чтобы смоделировать это, мы определим перечисление Command и включим в него вариант для каждого типа команды:

use bytes::Bytes;

#[derive(Debug)]
enum Command {
Get {
key: String,
},
Set {
key: String,
val: Bytes,
}
}

Создание канала

Создаем канал mpsc в функции main:

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
// Создаем новый канал емкостью 32
let (tx, mut rx) = mpsc::channel(32);

// ...
}

Канал mpsc используется для отправки команд задаче, управляющей соединением Redis. Возможность работы с несколькими производителями позволяет отправлять сообщения из многих задач. Создание канала возвращает два значения: отправителя и получателя. Они используются отдельно. Их можно перемещать в другие задачи.

Канал создан с емкостью (пропускной способностью) 32. Если сообщения отправляются быстрее, чем принимаются, канал сохраняет их. Как только 32 сообщения будут сохранены в канале, вызов send(...).await перейдет в режим сна до тех пор, пока сообщение не будет удалено получателем.

Отправка из нескольких задач осуществляется путем клонирования Sender. Например:

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
let tx2 = tx.clone();

tokio::spawn(async move {
tx.send("Sending from first handle").await.unwrap();
});

tokio::spawn(async move {
tx2.send("Sending from second handle").await.unwrap();
});

while let Some(message) = rx.recv().await {
println!("GOT = {}", message);
}
}

Оба сообщения отправляются одному дескриптору Receiver. Невозможно клонировать получателя канала mpsc.

Когда все Sender вышли за пределы области видимости или были удалены другим способом, отправлять в канал сообщения больше нельзя. На этом этапе вызов recv на Receiver возвращает None, что означает, что все отправители уничтожены, и канал закрыт.

В нашем случае задача, которая управляет соединением Redis, знает, что может закрыть соединение после закрытия канала, поскольку оно больше не будет использоваться.

Создание управляющей задачи

Создаем задачу, которая обрабатывает сообщения из канала. Сначала клиент устанавливает соединение с Redis. Затем через него передаются полученные команды.

use mini_redis::client;

// Ключевое слово `move` используется для перемещения владения `rx` в задачу
let manager = tokio::spawn(async move {
// Подключаемся к серверу
let mut client = client::connect("127.0.0.1:6379").await.unwrap();

// Начинаем получать сообщения
while let Some(cmd) = rx.recv().await {
use Command::*;

match cmd {
Get { key } => {
client.get(&key).await;
}
Set { key, val } => {
client.set(&key, val).await;
}
}
}
});

Обновляем обе задачи, чтобы они отправляли команды через канал, а не обрабатывали их непосредственно:

// Дескрипторы `Sender` перемещаются в задачи. Поскольку у нас две задачи,
// нам нужен второй `Sender`
let tx2 = tx.clone();

// Создаем две задачи
let t1 = tokio::spawn(async move {
let cmd = Command::Get {
key: "foo".to_string(),
};

tx.send(cmd).await.unwrap();
});

let t2 = tokio::spawn(async move {
let cmd = Command::Set {
key: "foo".to_string(),
val: "bar".into(),
};

tx2.send(cmd).await.unwrap();
});

В конце функции main мы ожидаем дескрипторы соединения, чтобы гарантировать полное выполнение команд перед завершением процесса:

t1.await.unwrap();
t2.await.unwrap();
manager.await.unwrap();

Получение ответов

Последний шаг - получить ответ от управляющей задачи. Команде GET необходимо получить значение, а команде SET необходимо знать, успешно ли завершилась операция установки значения.

Для передачи ответа используется канал oneshot. oneshot - это канал с одним производителем и одним потребителем, оптимизированный для отправки одного значения. В нашем случае единственным значением является ответ.

Подобно mpsc, метод oneshot::channel возвращает дескрипторы отправителя и получателя:

use tokio::sync::oneshot;

let (tx, rx) = oneshot::channel();

В отличие от mpsc, емкость (capacity) канала не указывается, поскольку она всегда равна единице. Кроме того, никакой дескриптор не может быть клонирован.

Для получения ответов от управляющей задачи перед отправкой команды создается канал oneshot. Половина канала Sender включается в команду управляющей задачи. Другая часть канала используется для получения ответа.

Обновляем Command, включая в нее Sender. Для ссылки на Sender используется псевдоним типа.

use tokio::sync::oneshot;
use bytes::Bytes;

/// Несколько разных команд мультиплексируются в одном канале
#[derive(Debug)]
enum Command {
Get {
key: String,
resp: Responder<Option<Bytes>>,
},
Set {
key: String,
val: Bytes,
resp: Responder<()>,
},
}

/// Предоставляется вызывающей стороной и используется управляющей задачей для отправки
/// ответа на команду вызывающей стороне
type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;

Обновляем задачи, отправляющие команды, включая в них oneshot::Sender:

let t1 = tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel();
let cmd = Command::Get {
key: "foo".to_string(),
resp: resp_tx,
};

// Отправляем команду `GET`
tx.send(cmd).await.unwrap();

// Ждем ответ
let res = resp_rx.await;
println!("GOT = {:?}", res);
});

let t2 = tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel();
let cmd = Command::Set {
key: "foo".to_string(),
val: "bar".into(),
resp: resp_tx,
};

// Отправляем команду `SET`
tx2.send(cmd).await.unwrap();

// Ждем ответ
let res = resp_rx.await;
println!("GOT = {:?}", res);
});

Наконец, обновляем управляющую задачу для отправки ответа по каналу oneshot:

while let Some(cmd) = rx.recv().await {
match cmd {
Command::Get { key, resp } => {
let res = client.get(&key).await;
// Игнорируем ошибки
let _ = resp.send(res);
}
Command::Set { key, val, resp } => {
let res = client.set(&key, val).await;
// Игнорируем ошибки
let _ = resp.send(res);
}
}
}

Вызов send на oneshot::Sender завершается немедленно и не требует .await. Это связано с тем, что отправка по каналу oneshot всегда будет неудачной или успешной немедленно, без какого-либо ожидания.

Отправка значения по каналу oneshot возвращает Err, когда получатель уничтожен (dropped). Это указывает на то, что получатель больше не заинтересован в ответе. В нашем сценарии это является приемлемым событием. Err, возвращаемый методом resp.send, не требует обработки.

Полный код примера можно найти здесь.

Обратное давление и ограниченные каналы

Всякий раз, когда вводится одновременное выполнение или очереди задач, важно убедиться, что очередь ограничена и система справляется с нагрузкой. Неограниченные очереди могут потребить всю доступную память и привести к сбою системы.

Tokio старается избегать неявных очередей. Во многом это связано с тем, что асинхронные операции являются ленивыми (lazy). Например:

loop {
async_op();
}

Если асинхронная операция будет выполняться незамедлительно (eagerly), цикл будет помещать в очередь новую операцию async_op, не дожидаясь завершения предыдущей операции. Это может привести к неявной неограниченной очереди. Системы, основанные на колбеках, и системы, основанные на незамедлительных фьючерах, особенно восприимчивы к этому.

Однако в Tokio и асинхронном Rust async_op вообще не запустится. Это связано с тем, что .await никогда не вызывается. Если добавить в сниппет .await, цикл будет ждать завершения текущей операции перед запуском новой.

loop {
// Следующая итерация наступит только после завершения текущей `async_op`
async_op().await;
}

Одновременное выполнение и очереди должны вводиться явно, например, с помощью:

  • tokio::spawn
  • select!
  • join!
  • mpsc::channel

При этом, необходимо позаботиться о том, чтобы общее количество одновременно выполняемых задач было ограничено. Например, при создании цикла принятия TCP-соединений нужно убедиться, что общее количество открытых сокетов ограничено. При использовании mpsc::channel, важно выбрать управляемую пропускную способность канала. Конкретные ограничения зависят от потребностей приложения.

Ввод-вывод

Ввод-вывод в Tokio работает почти так же, как и в std, но асинхронно. Существует типаж для чтения (AsyncRead) и типаж для записи (AsyncWrite). Определенные типы реализуют эти типажи соответствующим образом (TcpStream, File, Stdout). AsyncRead и AsyncWrite также реализуются рядом структур данных, таких как Vec<u8> и &[u8]. Это позволяет использовать массивы байтов там, где ожидается читатель или писатель.

В этом разделе будут рассмотрены базовые операции чтения и записи с помощью Tokio, а также приведено несколько примеров. В следующем разделе будет рассмотрен более продвинутый пример обработки ввода-вывода.

AsyncRead и AsyncWrite

Эти типажи предоставляют возможности асинхронного чтения и записи в потоки байтов. Методы этих типажей обычно не вызываются напрямую, подобно тому, как мы не вызываем вручную метод call типажа Future. Вместо этого, они используются через вспомогательные методы, предоставляемые AsyncReadExt и AsyncWriteExt.

Кратко рассмотрим некоторые из этих методов. Все эти функции являются асинхронными и должны использоваться с .await.

async fn read()

AsyncReadExt::read предоставляет асинхронный метод для чтения данных в буфер, возвращающий количество прочитанных байтов.

Если read() возвращает Ok(0), это означает одно из двух:

  1. Читатель достиг EOF и, вероятно, больше не сможет производить байты. Обратите внимание: это не означает, что читатель никогда больше не сможет производить байты.
  2. Длина указанного буфера составляет 0 байт.

Дальнейшие вызовы read() будут немедленно возвращать Ok(0). Для экземпляров TcpStream это означает, что половина сокета для чтения закрыта.

use tokio::fs::File;
use tokio::io::{self, AsyncReadExt};

#[tokio::main]
async fn main() -> io::Result<()> {
let mut f = File::open("test.txt").await?;
let mut buffer = [0; 10];

// Читаем от 0 до 10 байтов
f.read(&mut buffer[..]).await?;

// Выводим в терминал первые 10 байтов
println!("The bytes: {:?}", buffer);
Ok(())
}

async fn read_to_end()

AsyncReadExt::read_to_end считывает все байты из потока до EOF:

use tokio::io::{self, AsyncReadExt};
use tokio::fs::File;

#[tokio::main]
async fn main() -> io::Result<()> {
let mut f = File::open("test.txt").await?;
let mut buffer = Vec::new();

// Читаем весь файл
f.read_to_end(&mut buffer).await?;

// Выводим в терминал содержимое файла
println!("{}", String::from_utf8_lossy(&buffer));
Ok(())
}

async fn write()

AsyncWriteExt::write записывает буфер в файл, возвращая количество записанных байтов:

use tokio::fs::File;
use tokio::io::{self, AsyncWriteExt};

#[tokio::main]
async fn main() -> io::Result<()> {
let mut file = File::create("test.txt").await?;

// Записываем часть байтовой строки в файл
let n = file.write(b"some bytes").await?;

// Выводим в терминал количество записанных байтов
println!("Wrote the first {} bytes of 'some bytes'.", n);
Ok(())
}

async fn write_all()

AsyncWriteExt::write_all записывает весь буфер в файл:

use tokio::fs::File;
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> io::Result<()> {
// Создаем или открываем файл для записи
let mut file = File::create("test.txt").await?;

// Записываем байтовую строку в файл
file.write_all(b"some bytes").await?;

// Открываем файл для чтения
file = File::open("test.txt").await?;
let mut buffer = Vec::new();

// Читаем байты из файла
file.read_to_end(&mut buffer).await?;

// Выводим в терминал содержимое файла
println!("{}", String::from_utf8_lossy(&buffer));
Ok(())
}

Вспомогательные функции

Как и std, модуль tokio::io содержит ряд полезных утилит, а также API для работы со стандартным вводом, стандартным выводом и стандартными ошибками. Например, tokio::io::copy асинхронно копирует все содержимое устройства чтения в устройство записи:

use tokio::fs::File;
use tokio::io;

#[tokio::main]
async fn main() -> io::Result<()> {
let mut reader: &[u8] = b"hello";
let mut file = File::create("test.txt").await?;

io::copy(&mut reader, &mut file).await?;
Ok(())
}

Обратите внимание, здесь используется то, что байтовые массивы также реализуют AsyncRead.

Эхо-сервер

Поупражняемся в работе с асинхронным вводом-выводом. Напишем эхо-сервер.

Эхо-сервер "привязывает" TcpListener и принимает входящие соединения в цикле. Для каждого входящего соединения данные считываются из сокета и немедленно записываются обратно в него. Клиент отправляет данные на сервер и получает их обратно.

Мы реализуем эхо-сервер дважды с помощью разных стратегий.

io::copy()

Начнем с реализации сервера с помощью утилиты io::copy.

Создаем новый двоичный файл:

touch src/bin/echo-server-copy.rs

Команда для запуска примера:

cargo run --bin echo-server-copy

Тестировать сервер можно, используя стандартный инструмент командной строки, такой как telnet, или написав простой клиент, подобный тому, который можно найти в документации tokio::net::TcpStream.

Это TCP-сервер, и ему нужен цикл принятия. Для обработки каждого входящего сокета создается новая задача.

use tokio::io;
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:6142").await?;

loop {
let (mut socket, _) = listener.accept().await?;

tokio::spawn(async move {
// Копируем данные
});
}
}

Эта утилита берет устройства чтения и записи и копирует данные из одного в другой. Однако у нас есть только один TcpStream. Он реализует как AsyncRead, так и AsyncWrite. Поскольку io::copy требует &mut как для чтения, так и для записи, сокет нельзя использовать для обоих аргументов.

// Это не будет компилироваться
io::copy(&mut socket, &mut socket).await

Разделение читателя и писателя

Для решения этой проблемы, мы должны разделить сокет на дескриптор чтения и дескриптор записи. Лучший способ это сделать зависит от конкретного типа.

Любой тип читатель + писатель можно разделить с помощью утилиты io::split. Эта функция принимает одно значение и возвращает отдельные дескрипторы чтения и записи. Эти два дескриптора можно использовать независимо, в том числе, в отдельных задачах.

Например, эхо-клиент может обрабатывать одновременные операции чтения и записи следующим образом:

use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;

#[tokio::main]
async fn main() -> io::Result<()> {
let socket = TcpStream::connect("127.0.0.1:6142").await?;
let (mut rd, mut wr) = io::split(socket);

// Записываем данные в фоновом режиме
tokio::spawn(async move {
wr.write_all(b"hello\r\n").await?;
wr.write_all(b"world\r\n").await?;

// Иногда компилятору Rust нужна небольшая помощь
// для вывода правильного типа
Ok::<_, io::Error>(())
});

let mut buf = vec![0; 128];

loop {
let n = rd.read(&mut buf).await?;

if n == 0 {
break;
}

println!("GOT {:?}", &buf[..n]);
}

Ok(())
}

Поскольку io::split() поддерживает любое значение, реализующее AsyncRead + AsyncWrite, и возвращает независимые дескрипторы, внутри io::split() используются Arc и Mutex. Этих накладных расходов можно избежать с помощью TcpStream, который предоставляет две функции разделения.

TcpStream::split принимает ссылку на поток и возвращает дескрипторы чтения и записи. Поскольку используется ссылка, оба дескриптора должны оставаться в той же задаче, из которой вызывается split(). Эта функция является бесплатной. Arc или Mutex ей не нужны. TcpStream также предоставляет функцию into_split, возвращающую дескрипторы, которые могут перемещаться между задачами за счет только Arc.

Поскольку io::copy() вызывается для задачи, которая владеет TcpStream, мы можем использовать TcpStream::split(). Задача, отвечающая за логику на сервере, будет выглядеть так:

tokio::spawn(async move {
let (mut rd, mut wr) = socket.split();

if io::copy(&mut rd, &mut wr).await.is_err() {
eprintln!("Failed to copy");
}
});

Полный код примера можно найти здесь.

Ручное копирование

Теперь посмотрим на эхо-сервер, копирующий данные вручную. Для этого мы будем использовать AsyncReadExt::read и AsyncWriteExt::write_all.

Полный код сервера:

use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:6142").await?;

loop {
let (mut socket, _) = listener.accept().await?;

tokio::spawn(async move {
let mut buf = vec![0; 1024];

loop {
match socket.read(&mut buf).await {
// `Ok(0)` свидетельствует о закрытии сокета
Ok(0) => return,
Ok(n) => {
// Копируем данные обратно в сокет
if socket.write_all(&buf[..n]).await.is_err() {
// Неожиданная ошибка сокета. Мы ничего не можем с ней сделать,
// так что просто прекращаем обработку
return;
}
}
Err(_) => {
// Неожиданная ошибка сокета
return;
}
}
}
});
}
}

Этот код можно поместить в src/bin/echo-server.rs и запустить с помощью cargo run --bin echo-server.

Разберем код построчно. Во-первых, поскольку используются утилиты AsyncRead и AsyncWrite, в область видимости должны быть включены расширяющие их типажи:

use tokio::io::{self, AsyncReadExt, AsyncWriteExt};

Выделение буфера

Стратегия состоит в том, чтобы прочитать данные из сокета в буфер, а затем записать содержимое буфера обратно в сокет:

let mut buf = vec![0; 1024];

Мы специально не используем стековый буфер. Ранее мы отмечали, что все данные задачи, которые сохраняются при вызовах .await, должны храниться в задаче. В этом случае buf используется при вызовах .await. Все данные задачи хранятся в одном месте. Об этом можно думать как о enum, где каждый вариант - это данные, которые необходимо сохранить для конкретного вызова .await.

Если буфер представлен массивом стека, внутренняя структура задач, создаваемых для каждого принятого сокета, может выглядеть примерно так:

struct Task {
// Внутренние поля задачи
task: enum {
AwaitingRead {
socket: TcpStream,
buf: [BufferType],
},
AwaitingWriteAll {
socket: TcpStream,
buf: [BufferType],
}

}
}

Если в качестве типа буфера используется стековый массив, он будет храниться внутри структуры задачи. Это сделает структуру задачи очень большой. Кроме того, размеры буфера часто соответствуют размеру страницы. Это, в свою очередь, приведет к неуклюжему размеру Task: $page-size + несколько-байт.

На самом деле компилятор использует более оптимальное представление состояния асинхронного блока, чем простой enum. На практике переменные не перемещаются между вариантами, как это требуется при перечислении. Однако размер структуры задачи по крайней мере равен размеру самой большой переменной.

По этой причине обычно более эффективно использовать отдельное пространство для буфера.

Обработка EOF

Когда читатель TCPStream закрывается, вызов read() возвращает Ok(0). На этом этапе важно выйти из цикла чтения. Забывание об этом является распространенным источником ошибок.

loop {
match socket.read(&mut buf).await {
Ok(0) => return,
// ...
}
}

Полный код примера можно найти здесь.



Кадрирование

Применим то, что мы узнали о вводе-выводе, и реализуем уровень кадрирования (framing layer) Mini-Redis. Кадрирование (framing) - это процесс получения потока байтов и преобразования его в поток кадров. Кадр - это единица данных, передаваемая между двумя узлами (peers). Кадр протокола Redis определяется следующим образом:

use bytes::Bytes;

enum Frame {
Simple(String),
Error(String),
Integer(u64),
Bulk(Bytes),
Null,
Array(Vec<Frame>),
}

Обратите внимание, что кадр состоит только из данных без какой-либо семантики. Анализ и реализация команд происходят на более высоком уровне.

Для HTTP кадр может выглядеть так:

enum HttpFrame {
RequestHead {
method: Method,
uri: Uri,
version: Version,
headers: HeaderMap,
},
ResponseHead {
status: StatusCode,
version: Version,
headers: HeaderMap,
},
BodyChunk {
chunk: Bytes,
},
}

Реализуем структуру Connection, которая оборачивает TcpStream и читает/записывает значения mini_redis::Frame:

use tokio::net::TcpStream;
use mini_redis::{Frame, Result};

struct Connection {
stream: TcpStream,
// ...
}

impl Connection {
/// Читает кадр из соединения.
///
/// Возвращает `None` при достижении EOF
pub async fn read_frame(&mut self)
-> Result<Option<Frame>>
{
// TODO
}

/// Записывает кадр в соединение
pub async fn write_frame(&mut self, frame: &Frame)
-> Result<()>
{
// TODO
}
}

Подробную информацию о протоколе Redis можно найти здесь. Полный код Connection можно найти здесь.

Буферизованное чтение

Метод read_frame ожидает получения всего кадра перед возвратом. Один вызов TcpStream::read() может вернуть произвольный объем данных. Он может содержать целый кадр, часть кадра или несколько кадров. Если получен частичный кадр, данные буферизуются, и из сокета считываются дополнительные данные. Если получено несколько кадров, возвращается первый, а остальные данные помещаются в буфер до следующего вызова read_frame().

Создаем новый файл:

touch src/connection.rs

Далее в Connection нужно добавить поле для буфера чтения (read buffer). Данные считываются из сокета в буфер чтения. При разборе кадра соответствующие данные удаляются из буфера.

Мы будем использовать BytesMut в качестве типа буфера. Это изменяемая версия Bytes.

use bytes::BytesMut;
use tokio::net::TcpStream;

pub struct Connection {
stream: TcpStream,
buffer: BytesMut,
}

impl Connection {
pub fn new(stream: TcpStream) -> Connection {
Connection {
stream,
// Выделяем буфер размером 4 КБ
buffer: BytesMut::with_capacity(4096),
}
}
}

Реализуем метод read_frame:

use tokio::io::AsyncReadExt;
use bytes::Buf;
use mini_redis::Result;

pub async fn read_frame(&mut self)
-> Result<Option<Frame>>
{
loop {
// Пытаемся разобрать кадр из буферизованных данных.
// Если данных в буфере достаточно, возвращается кадр
if let Some(frame) = self.parse_frame()? {
return Ok(Some(frame));
}

// В буфере недостаточно данных для чтения кадра.
// Пытаемся получить больше данных из сокета.
//
// При успехе возвращается количество байтов.
// `0` - индикатор "конца потока"
if 0 == self.stream.read_buf(&mut self.buffer).await? {
// Другая сторона закрыла соединение. Для чистого закрытия
// в буфере чтения не должно оставаться данных.
// Если такие данные имеются, значит другая сторона
// закрыла соединение во время передачи кадра
if self.buffer.is_empty() {
return Ok(None);
} else {
return Err("Connection reset by peer".into());
}
}
}
}

Разберем этот код. Метод read_frame работает в цикле. Сначала вызывается self.parse_frame(). Этот метод пытается разобрать кадр Redis из self.buffer. Если данных достаточно, кадр возвращается вызывающей стороне. В противном случае, мы пытаемся прочитать больше данных из сокета. После считывания дополнительных данных снова вызывается parse_frame().

При чтении из потока возвращаемое значение 0 указывает, что данных от узла больше не будет. Если в буфере чтения все еще есть данные, это означает, что был получен частичный кадр и соединение прервано внезапно. Это состояние ошибки, поэтому возвращается Err.

Типаж Buf

При чтении из потока вызывается read_buf(). Эта версия функции чтения принимает значение, реализующее BufMut из крейта bytes.

Во-первых, подумайте, как мы могли бы реализовать тот же цикл чтения, используя read(). Вместо BytesMut можно использовать Vec<u8>:

use tokio::net::TcpStream;

pub struct Connection {
stream: TcpStream,
buffer: Vec<u8>,
cursor: usize,
}

impl Connection {
pub fn new(stream: TcpStream) -> Connection {
Connection {
stream,
buffer: vec![0; 4096],
cursor: 0,
}
}
}

Функция read_frame в Connection:

use mini_redis::{Frame, Result};

pub async fn read_frame(&mut self)
-> Result<Option<Frame>>
{
loop {
if let Some(frame) = self.parse_frame()? {
return Ok(Some(frame));
}

// Проверяем наличие свободного места в буфере
if self.buffer.len() == self.cursor {
// Увеличиваем размер буфера
self.buffer.resize(self.cursor * 2, 0);
}

// Читаем в буфер, отслеживая количество прочитанных байт
let n = self.stream.read(&mut self.buffer[self.cursor..]).await?;

if 0 == n {
if self.cursor == 0 {
return Ok(None);
} else {
return Err("Connection reset by peer".into());
}
} else {
// Обновляем курсор
self.cursor += n;
}
}
}

При работе с байтовыми массивами и read(), мы должны поддерживать курсор, отслеживающий, какой объем данных был помещен в буфер. Мы должны обязательно передать в функцию read пустую часть буфера. В противном случае, мы перезапишем буферизованные данные. Если буфер заполняется, мы должны увеличить его, чтобы продолжить чтение. В parse_frame() (не входит в пример) нам нужно будет проанализировать данные, содержащиеся в self.buffer[..self.cursor].

Поскольку соединение массива байтов с курсором является очень распространенным, крейт bytes предоставляет абстракцию, представляющую массив байтов и курсор. Типаж Buf реализуется типами, из которых можно читать данные. Типаж BufMut реализуется типами, в которые можно записывать данные. При передаче T: BufMut в read_buf() внутренний курсор буфера автоматически обновляется. Благодаря этому в нашей версии read_frame() нам не нужно управлять собственным курсором.

Кроме того, при использовании Vec<u8> буфер необходимо инициализировать. vec![0; 4096] выделяет массив размером 4096 байт и записывает ноль в каждую ячейку. При изменении размера буфера новая емкость также должна быть инициализирована нулями. Процесс инициализации не является бесплатным. При работе с BytesMut и BufMut емкость не инициализируется. Абстракция BytesMut не позволяет нам читать неинициализированную память. Это позволяет нам избежать этапа инициализации.

Разбор

Теперь рассмотрим функцию parse_frame. Разбор выполняется в два этапа:

  1. Убеждаемся, что в буфере находится полный кадр, и находим конечный индекс кадра.
  2. Разбираем кадр.

Крейт mini-redis предоставляет нам функции для обоих этих шагов:

  1. Frame::check.
  2. Frame::parse.

Мы также будем повторно использовать абстракцию Buf. Buf передается в Frame::check(). Поскольку функция check перебирает переданный буфер, внутренний курсор перемещается вперед. Когда check() возвращается, внутренний курсор буфера указывает на конец кадра.

Для типа Buf мы будем использовать std::io::Cursor<&[u8]>:

use mini_redis::{Frame, Result};
use mini_redis::frame::Error::Incomplete;
use bytes::Buf;
use std::io::Cursor;

fn parse_frame(&mut self)
-> Result<Option<Frame>>
{
// Создаем тип `T: Buf`
let mut buf = Cursor::new(&self.buffer[..]);

// Проверяем, доступен ли целый кадр
match Frame::check(&mut buf) {
Ok(_) => {
// Получаем длину кадра в байтах
let len = buf.position() as usize;

// Сбрасываем внутренний курсор для вызова `parse()`
buf.set_position(0);

// Разбираем кадр
let frame = Frame::parse(&mut buf)?;

// Удаляем кадр из буфера
self.buffer.advance(len);

// Возвращаем кадр
Ok(Some(frame))
}
// В буфере содержится мало данных
Err(Incomplete) => Ok(None),
// Возникла ошибка
Err(e) => Err(e.into()),
}
}

Полный код функции Frame::check можно найти здесь.

Важно отметить, что используются API Buf в стиле "байтового итератора" (byte iterator). Речь идет об извлечении данных и перемещении внутреннего курсора. Например, чтобы определить тип кадра при его анализе, проверяется первый байт. Используемая функция - Buf::get_u8. Извлекается байт в текущей позиции курсора и курсор перемещается на единицу.

Типаж Buf предоставляет много полезных методов.

Буферизованная запись

Другая половина API кадрирования - это функция write_frame(frame). Эта функция записывает в сокет весь кадр. Чтобы свести к минимуму системные вызовы write(), запись буферизуется. Кадры кодируются в буфер записи (write buffer) перед записью в сокет. Однако, в отличие от read_frame(), весь кадр не всегда буферизуется в массив байтов перед записью в сокет.

Рассмотрим кадр массового (группового) потока (bulk stream frame). Записываемое значение - Frame::Bulk(Bytes). Формат передачи группового кадра - это заголовок кадра, который состоит из символа $, за которым следует длина данных в байтах. Большую часть кадра составляет содержимое значения Bytes. Если данные большие, копирование их в промежуточный буфер будет дорогостоящим.

Для реализации буферизованной записи мы будем использовать структуру BufWriter. Эта структура инициализируется с помощью T: AsyncWrite и сама реализует AsyncWrite. При вызове write() в BufWriter, запись идет не непосредственно в файл для записи, а в буфер. Когда буфер заполняется, его содержимое сбрасывается во внутренний файл для записи, и буфер очищается. Также существуют оптимизации, позволяющие обходить (bypass) буфер в определенных случаях.

Мы реализуем только часть функции write_frame. Полную реализацию смотрите здесь.

Сначала обновляем структуру Connection:

use tokio::io::BufWriter;
use tokio::net::TcpStream;
use bytes::BytesMut;

pub struct Connection {
stream: BufWriter<TcpStream>,
buffer: BytesMut,
}

impl Connection {
pub fn new(stream: TcpStream) -> Connection {
Connection {
stream: BufWriter::new(stream),
buffer: BytesMut::with_capacity(4096),
}
}
}

Затем реализуем write_frame():

use tokio::io::{self, AsyncWriteExt};
use mini_redis::Frame;

async fn write_frame(&mut self, frame: &Frame)
-> io::Result<()>
{
match frame {
Frame::Simple(val) => {
self.stream.write_u8(b'+').await?;
self.stream.write_all(val.as_bytes()).await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Error(val) => {
self.stream.write_u8(b'-').await?;
self.stream.write_all(val.as_bytes()).await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Integer(val) => {
self.stream.write_u8(b':').await?;
self.write_decimal(*val).await?;
}
Frame::Null => {
self.stream.write_all(b"$-1\r\n").await?;
}
Frame::Bulk(val) => {
let len = val.len();

self.stream.write_u8(b'$').await?;
self.write_decimal(len as u64).await?;
self.stream.write_all(val).await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Array(_val) => unimplemented!(),
}

self.stream.flush().await;

Ok(())
}

Используемые здесь функции предоставляются AsyncWriteExt. Они также доступны в TcpStream, но однобайтовую запись без промежуточного буфера выполнять не рекомендуется.

  • write_u8 записывает в файл один байт
  • write_all записывает весь фрагмент
  • write_decimal реализуется mini-redis

Функция заканчивается вызовом self.stream.flush().await. Поскольку BufWriter сохраняет записи в промежуточном буфере, вызовы write() не гарантируют, что данные будут записаны в сокет. Перед возвратом мы хотим, чтобы кадр был записан в сокет. Вызов flush() записывает в сокет любые данные, ожидающие обработки в буфере.

Другой альтернативой было бы не вызывать flush() в write_frame(). Вместо этого можно реализовать flush() у Connection. Это позволит вызывающей стороне записать в очередь несколько небольших кадров в буфер записи, а затем записать их все в сокет с помощью одного системного вызова write(). Это усложняет API Connection. Простота - одна из целей Mini-Redis, поэтому мы включили вызов flush().await в write_frame().

Подробно об асинхронности

Углубимся в модель асинхронной среды выполнения Rust.

Фьючеры (futures)

В качестве краткого обзора возьмем очень простую асинхронную функцию. В ней нет ничего нового.

use tokio::net::TcpStream;

async fn my_async_fn() {
println!("hello from async");
let _socket = TcpStream::connect("127.0.0.1:3000").await.unwrap();
println!("async TCP operation complete");
}

Мы вызываем функцию, и она возвращает некоторое значение. Затем мы вызываем .await на этом значении.

#[tokio::main]
async fn main() {
// В терминал пока ничего не выводится
let what_is_this = my_async_fn();
// Текст печатается в терминале, соединение
// устанавливается и закрывается
what_is_this.await;
}

Значение, возвращаемое my_async_fn(), является фьючером. Фьючер - это значение, которое реализует типаж std::future::Future, предоставляемый стандартной библиотекой.

Определение std::future::Future:

use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Future {
type Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}

Связанный тип (associated type) Output - это тип, который будет создан во фьючере после его завершения. Тип Pin - это то, с помощью чего Rust поддерживает заимствования в асинхронных функциях.

В отличие от того, как фьючеры реализуются в других языках, фьючер Rust не представляет вычисления, происходящие в фоновом режиме, а является самими вычислениями. Владелец фьючера отвечает за выполнение (advance) вычислений путем его опроса (poll).

Реализация Future

Реализуем очень простой фьючер. Он будет делать следующее:

  1. Ждать какое-то временя.
  2. Выводить некоторый текст в STDOUT.
  3. Возвращать строку.
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

struct Delay {
when: Instant,
}

impl Future for Delay {
type Output = &'static str;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<&'static str>
{
if Instant::now() >= self.when {
println!("hello world");
Poll::Ready("done")
} else {
// Пока не обращайте внимания на эту строку
cx.waker().wake_by_ref();
Poll::Pending
}
}
}

#[tokio::main]
async fn main() {
let when = Instant::now() + Duration::from_millis(10);
let future = Delay { when };

let out = future.await;
assert_eq!(out, "done");
}

Асинхронная функция как фьючер

В функции main мы создаем экземпляр фьючера и вызываем на нем .await. В асинхронных функциях мы можем вызывать .await для любого значения, реализующего Future. В свою очередь, вызов асинхронной функции возвращает анонимный тип, реализующий Future. В случае async fn main() сгенерированный фьючер выглядит примерно так:

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

enum MainFuture {
// Инициализирован, не опрашивался
State0,
// Ждет `Delay` - строка `future.await`
State1(Delay),
// Фьючер завершен
Terminated,
}

impl Future for MainFuture {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<()>
{
use MainFuture::*;

loop {
match *self {
State0 => {
let when = Instant::now() + Duration::from_millis(10);
let future = Delay { when };
*self = State1(future);
}
State1(ref mut my_future) => {
match Pin::new(my_future).poll(cx) {
Poll::Ready(out) => {
assert_eq!(out, "done");
*self = Terminated;
return Poll::Ready(());
}
Poll::Pending => {
return Poll::Pending;
}
}
}
Terminated => {
panic!("future polled after completion")
}
}
}
}
}

Фьючеры Rust - это машины состояний (state machines). Здесь MainFuture представлено как перечисление возможных состояний фьючера. Фьючер начинается в состоянии State0. Когда вызывается call(), фьючер пытается обновить свое внутреннее состояние. Если фьючер может завершиться, возвращается Poll::Ready, содержащий результат асинхронных вычислений.

Если фьючер не может завершиться, обычно из-за нехватки ресурсов, возвращается Poll::Pending. Получение Poll::Pending указывает вызывающей стороне, что фьючер завершится позже, и вызывающая сторона должна снова вызвать call() через какое-то время.

Мы также видим, что фьючеры состоят из других фьючеров. Вызов call() внешнего фьючера приводит к вызову call() внутреннего фьючера.

Исполнители (executors)

Асинхронные функции Rust возвращают фьючеры. Для обновления состояния фьючера должен вызываться call(). Фьючеры состоят из других фьючеров. Вопрос в том, что вызывает call() самого внешнего фьючера?

Напомним, что для запуска асинхронных функций их необходимо либо передать в tokio::spawn(), либо сделать их основной функцией, помеченной с помощью #[tokio::main]. В результате сгенерированный внешний фьючер передается исполнителю Tokio. Исполнитель отвечает за вызов Future::poll() на внешнем фьючере, доводя асинхронные вычисления до завершения.

Мини Tokio

Чтобы лучше понять, как все это сочетается друг с другом, реализуем собственную минимальную версию Tokio! Полный код можно найти здесь.

use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use futures::task;

fn main() {
let mut mini_tokio = MiniTokio::new();

mini_tokio.spawn(async {
let when = Instant::now() + Duration::from_millis(10);
let future = Delay { when };

let out = future.await;
assert_eq!(out, "done");
});

mini_tokio.run();
}

struct MiniTokio {
tasks: VecDeque<Task>,
}

type Task = Pin<Box<dyn Future<Output = ()> + Send>>;

impl MiniTokio {
fn new() -> MiniTokio {
MiniTokio {
tasks: VecDeque::new(),
}
}

/// Создает фьючер на экземпляре mini-tokio
fn spawn<F>(&mut self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
self.tasks.push_back(Box::pin(future));
}

fn run(&mut self) {
let waker = task::noop_waker();
let mut cx = Context::from_waker(&waker);

while let Some(mut task) = self.tasks.pop_front() {
if task.as_mut().poll(&mut cx).is_pending() {
self.tasks.push_back(task);
}
}
}
}

Это запускает асинхронный блок. Экземпляр Delay создается с указанной задержкой и "ожидается". Однако наша реализация на данный момент имеет серьезный недостаток. Наш исполнитель никогда не спит. Исполнитель непрерывно просматривает все порожденные фьючеры и опрашивает их. Большую часть времени фьючер не будет готов выполнять новую работу и будет возвращать Poll::Pending. Этот процесс будет сжигать циклы ЦП и, как правило, будет не очень эффективным.

В идеале мы хотим, чтобы mini-tokio опрашивал фьючеры только тогда, когда они готовы к выполнению новой задачи. Это происходит, когда ресурс, на котором заблокирована задача, готов выполнить запрошенную операцию. Если задача хочет прочитать данные из сокета TCP, мы должны опрашивать задачу только тогда, когда сокет TCP получил данные. В нашем случае задача блокируется при достижении указанного момента времени (Instant). В идеале, mini-tokio должен опрашивать задачу только по прошествии этого времени.

Для этого опрошенный, но не готовый ресурс должен отправить уведомление исполнителю при переходе в состояние готовности.

Будильники (wakers)

Будильники - недостающая часть. Это система, с помощью которой ресурс может уведомить ожидающую задачу о том, что он готов продолжить выполнение операции.

Еще раз взглянем на определение Future::poll():

fn poll(self: Pin<&mut Self>, cx: &mut Context)
-> Poll<Self::Output>;

Аргумент Context метода poll имеет метод waker. Этот метод возвращает Waker, привязанный к текущей задаче. У Waker есть метод wake. Вызов этого метода сигнализирует исполнителю, что связанную задачу следует запланировать для выполнения. Ресурсы вызывают wake(), когда переходят в состояние готовности, чтобы уведомить исполнителя о том, что опрос задачи может продолжиться.

Обновление Delay

Мы можем обновить Delay, чтобы использовать будильники:

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::thread;

struct Delay {
when: Instant,
}

impl Future for Delay {
type Output = &'static str;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<&'static str>
{
if Instant::now() >= self.when {
println!("hello world");
Poll::Ready("done")
} else {
// Получаем дескриптор будильника для текущей задачи
let waker = cx.waker().clone();
let when = self.when;

// Создаем поток таймера
thread::spawn(move || {
let now = Instant::now();

if now < when {
thread::sleep(when - now);
}

waker.wake();
});

Poll::Pending
}
}
}

Теперь по истечении указанного времени вызывающая задача получит уведомление, и исполнитель запланирует ее повторный опрос. Следующим шагом будет обновление mini-tokio для регистрации уведомлений о пробуждении.

С нашей реализацией Delay все еще есть несколько проблем. Мы исправим их позже.

Когда фьючер возвращает Poll::Pending, он должен гарантировать, что в какой-то момент будет подан сигнал о пробуждении. Если этого не сделать, задача будет "висеть" бесконечно.

Вспомним код Delay:

impl Future for Delay {
type Output = &'static str;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<&'static str>
{
if Instant::now() >= self.when {
println!("hello world");
Poll::Ready("done")
} else {
// !
cx.waker().wake_by_ref();
Poll::Pending
}
}
}

Прежде чем вернуть Poll::Pending, мы вызываем cx.waker().wake_by_ref(). Это необходимо для выполнения контракта фьючера. Возвращая Poll::Pending, мы отвечаем за вызов будильника. Поскольку мы еще не реализовали поток таймера (timer thread), то подаем встроенный сигнал. Это приводит к тому, что фьючер будет немедленно повторно запланирован, повторно выполнен и, вероятно, снова не будет готов к завершению.

Обратите внимание, что мы можем подавать сигнал чаще, чем это необходимо. В данном конкретном случае мы подаем сигнал о пробуждении, хотя вообще не готовы продолжать операцию. В этом нет ничего плохого, кроме ненужной траты ресурсов процессора. Однако эта конкретная реализация приведет к образованию цикла занятости (busy loop).

Обновление Mini Tokio

Следующим шагом будет обновление Mini Tokio для получения уведомлений о пробуждении. Мы хотим, чтобы исполнитель запускал задачи только после их пробуждения, и для этого Mini Tokio предоставит собственный будильник. Когда будильник вызывается, связанная с ним задача ставится в очередь на выполнение. Mini Tokio передает этот сигнал во фьючер при его опросе.

Обновленный Mini Tokio будет использовать канал для хранения запланированных задач. Каналы позволяют ставить задачи в очередь для выполнения из любого потока. Будильники должны реализовывать Send и Sync.

Типажи Send и Sync - это маркерные типажи (marker traits), связанные с параллелизмом Rust. Типы, которые можно отправить в другой поток, реализуют Send. Большинство типов являются Send, но некоторые (вроде Rc) не являются. Типы, к которым можно одновременно получить доступ через неизменяемые ссылки, реализуют Sync. Тип может быть Send, но не Sync. Хорошим примером является Cell, который можно изменить с помощью неизменяемой ссылки, и поэтому одновременный доступ к нему небезопасен.

Обновляем структуру MiniTokio:

use std::sync::mpsc;
use std::sync::Arc;

struct MiniTokio {
scheduled: mpsc::Receiver<Arc<Task>>,
sender: mpsc::Sender<Arc<Task>>,
}

struct Task {
// TODO
}

Будильники являются Sync и могут клонироваться. При вызове wake() задача должна планироваться для выполнения. Для реализации этого у нас есть канал. При вызове wake() задача передается в отправляющую половину канала. Наша структура Task будет реализовывать логику пробуждения. Для этого ему необходимо содержать как порожденный фьючер, так и отправителя из канала. Мы поместим фьючер в структуру TaskFuture рядом с перечислением Poll, чтобы отслеживать результат последнего вызова Future::poll(), который необходим для обработки ложных пробуждений. Более подробная информация представлена ​​в реализации метода poll в TaskFuture.

use std::sync::{Arc, Mutex};

/// Структура, содержащая фьючер и результат
/// последнего вызова его метода `poll`
struct TaskFuture {
future: Pin<Box<dyn Future<Output = ()> + Send>>,
poll: Poll<()>,
}

struct Task {
// `Mutex` позволяет `Task` реализовать `Sync`.
// `task_future` доступна одновременно только одному потоку.
// `Mutex` является опциональным. Настоящий Tokio
// не использует здесь мьютекс, но настоящий Tokio содержит
// гораздо больше строк кода, чем может уместиться на одной странице туториала
task_future: Mutex<TaskFuture>,
executor: mpsc::Sender<Arc<Task>>,
}

impl Task {
fn schedule(self: &Arc<Self>) {
self.executor.send(self.clone());
}
}

Для планирования задачи, Arc клонируется и отправляется по каналу. Теперь нам нужно подключить функцию schedule к std::task::Waker. Стандартная библиотека предоставляет для этого низкоуровневый API с использованием ручного построения vtable. Эта стратегия обеспечивает максимальную гибкость для разработчиков, но требует некоторого небезопасного шаблонного кода. Вместо прямого использования RawWakerVTable мы будем использовать утилиту ArcWake, предоставляемую крейтом futures. Это позволит нам реализовать простой типаж, чтобы представить структуру Task как будильник.

Добавляем следующую зависимость в файл Cargo.toml:

futures = "0.3"

Затем реализуем futures::task::ArcWake:

use futures::task::{self, ArcWake};
use std::sync::Arc;

impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.schedule();
}
}

Когда поток таймера вызывает waker.wake(), задача передается в канал. Реализуем получение и выполнение задач в функции MiniTokio::run:

impl MiniTokio {
fn run(&self) {
while let Ok(task) = self.scheduled.recv() {
task.poll();
}
}

/// Инициализирует новый экземпляр mini-tokio
fn new() -> MiniTokio {
let (sender, scheduled) = mpsc::channel();

MiniTokio { scheduled, sender }
}

/// Создает фьючер на экземпляре mini-tokio.
///
/// Данный фьючер обернут в `Task` и помещен в очередь `scheduled`.
/// Он будет выполнен при вызове `run()`
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
Task::spawn(future, &self.sender);
}
}

impl TaskFuture {
fn new(future: impl Future<Output = ()> + Send + 'static) -> TaskFuture {
TaskFuture {
future: Box::pin(future),
poll: Poll::Pending,
}
}

fn poll(&mut self, cx: &mut Context<'_>) {
// Разрешены ложные пробуждения, даже после того, как фьючер
// вернул `Ready`. Однако опрос фьючера, вернувшего
// `Ready`, не разрешен. Поэтому мы должны проверять,
// что фьючер находится в режиме ожидания перед его вызовом.
// В противном случае, может возникнуть паника
if self.poll.is_pending() {
self.poll = self.future.as_mut().poll(cx);
}
}
}

impl Task {
fn poll(self: Arc<Self>) {
// Создаем будильник из экземпляра `Task`.
// Здесь используется реализация `ArcWake`
let waker = task::waker(self.clone());
let mut cx = Context::from_waker(&waker);

// Никакой другой поток не может заблокировать `task_future`
let mut task_future = self.task_future.try_lock().unwrap();

// Опрашиваем внутренний фьючер
task_future.poll(&mut cx);
}

// Создаем новую задачу с данным фьючером.
//
// Инициализируем новый `Task`, содержащий данный фьючер и помещаем его
// в `sender`. Получатель канала получит задачу и выполнит ее
fn spawn<F>(future: F, sender: &mpsc::Sender<Arc<Task>>)
where
F: Future<Output = ()> + Send + 'static,
{
let task = Arc::new(Task {
task_future: Mutex::new(TaskFuture::new(future)),
executor: sender.clone(),
});

let _ = sender.send(task);
}
}

Здесь происходит много всего. Во-первых, реализован MiniTokio::run(). Функция работает в цикле, получая из канала запланированные задачи.

Функции new и spawn используют канал, а не VecDeque. Когда создаются новые задачи, им предоставляется клон отправителя, который задача может использовать для планирования во время выполнения.

Функция Task::poll создает будильник с помощью утилиты ArcWake из крейта futures. Будильник используется для создания task::Context. Этот task::Context передается в poll().

Резюме

Мы рассмотрели полный пример того, как работает асинхронный Rust. async/await в Rust обеспечивается типажами. Это позволяет сторонним крейтам, таким как Tokio, предоставлять детали реализации.

  • Асинхронные операции Rust ленивы и требуют, чтобы вызывающая сторона опрашивала их
  • будильники передаются фьючерам, чтобы связать фьючер с вызывающей его задачей
  • когда ресурс не готов завершить операцию, возвращается Poll::Pending и записывается будильник задачи
  • когда ресурс становится готовым, об этом уведомляется будильник задачи
  • исполнитель получает уведомление и планирует выполнение задачи
  • задача опрашивается еще раз, на этот раз ресурс готов, и задача выполняется

Ремарки

Помните, при реализации Delay, мы отметили, что нужно исправить еще несколько вещей. Асинхронная модель Rust позволяет одному фьючеру мигрировать между задачами во время их выполнения. Рассмотрим следующий код:

use futures::future::poll_fn;
use std::future::Future;
use std::pin::Pin;

#[tokio::main]
async fn main() {
let when = Instant::now() + Duration::from_millis(10);
let mut delay = Some(Delay { when });

poll_fn(move |cx| {
let mut delay = delay.take().unwrap();
let res = Pin::new(&mut delay).poll(cx);

assert!(res.is_pending());

tokio::spawn(async move {
delay.await;
});

Poll::Ready(())
}).await;
}

Функция poll_fn создает экземпляр Future с помощью замыкания. Приведенный код создает экземпляр Delay, опрашивает его один раз, затем отправляет его в новую задачу, где он ожидается. Delay::poll() вызывается несколько раз с разными экземплярами Waker. В этом случае, мы должны убедиться, что вызов wake() на Waker передан самому последнему вызову poll().

При реализации фьючера очень важно предполагать, что каждый вызов poll() может предоставлять другой экземпляр Waker. Функция poll должна обновлять любой ранее записанный Waker новым.

Наша более ранняя реализация Delay порождала новый поток при каждом его опросе. Это нормально, но может быть очень неэффективно, если он опрашивается слишком часто (например, если мы применяем select! для этого и другого фьючера, оба будут опрашиваться всякий раз, когда в любом из них происходит событие). Один из подходов к решению этой задачи - запоминать факт создания потока и создавать новый поток только в том случае, если он еще не создан. Однако при таком подходе, мы должны убедиться, что Waker потока обновляется при последующих вызовах call(), поскольку, в противном случае, мы активируем не самый последний Waker.

Нашу предыдущую реализацию можно исправить так:

use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::thread;
use std::time::{Duration, Instant};

struct Delay {
when: Instant,
// Имеет значение `Some`, если поток создан, и `None`, в противном случае
waker: Option<Arc<Mutex<Waker>>>,
}

impl Future for Delay {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
// Проверяем текущий экземпляр. Если время истекло, значит
// этот фьючер завершился, возвращаем `Poll::Ready`
if Instant::now() >= self.when {
return Poll::Ready(());
}

// Время не истекло. Если фьючер вызывается впервые,
// создаем поток таймера. Если поток уже создан,
// проверяем, что сохраненный `Waker` совпадает с `Waker` текущей задачи
if let Some(waker) = &self.waker {
let mut waker = waker.lock().unwrap();

// Проверяем, что сохраненный `Waker` совпадает с `Waker` текущей задачи.
// Это необходимо, поскольку экземпляр фьючера `Delay` может быть перемещен
// в другую задачу между вызовами `poll()`. Если это произойдет,
// будильник, содержащийся в данном `Context` будет отличаться, и мы должны
// обновить хранящийся будильник для учета этого изменения
if !waker.will_wake(cx.waker()) {
*waker = cx.waker().clone();
}
} else {
let when = self.when;
let waker = Arc::new(Mutex::new(cx.waker().clone()));
self.waker = Some(waker.clone());

// `poll()` вызывается впервые, создаем поток таймера
thread::spawn(move || {
let now = Instant::now();

if now < when {
thread::sleep(when - now);
}

// Время истекло. Уведомляем вызывающего, вызывая будильник
let waker = waker.lock().unwrap();
waker.wake_by_ref();
});
}

// К этому моменту будильник сохранен и таймер запущен.
// Время не истекло, следовательно, фьючер не завершился,
// поэтому мы должны вернуть `Poll::Pending`.
//
// Контракт типажа `Future` требует, чтобы при возврате `Pending`
// фьючер гарантировал уведомление будильника о
// необходимости повторного опроса. В нашем случае,
// возвращая здесь `Pending`, мы обещаем, что вызовем
// будильник, содержащийся в аргументе `Context`,
// по истечении запрошенного времени. Мы обеспечиваем это путем
// создания потока таймера выше.
//
// Если мы забудем вызвать будильник, задача повиснет навсегда
Poll::Pending
}
}

Это немного сложно, но идея состоит в том, что при каждом вызове poll() фьючер проверяет, что переданный будильник совпадает с ранее записанным. Если будильники совпадают, больше делать нечего. Если они не совпадают, тогда записанный будильник обновляется.

Утилиты Notify

Мы продемонстрировали, как фьючер Delay можно реализовать вручную с помощью будильника. Будильники являются основой того, как работает асинхронный Rust. Обычно нет необходимости опускаться до этого уровня. Например, в случае с Delay мы могли бы полностью реализовать его с помощью async/await, используя утилиту tokio::sync::Notify. Эта утилита предоставляет базовый механизм уведомления о задачах. Она обрабатывает подробную информацию о будильниках, включая проверку соответствия записанного будильника текущей задаче.

Используя Notify, мы можем реализовать функцию delay следующим образом:

use tokio::sync::Notify;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::thread;

async fn delay(dur: Duration) {
let when = Instant::now() + dur;
let notify = Arc::new(Notify::new());
let notify_clone = notify.clone();

thread::spawn(move || {
let now = Instant::now();

if now < when {
thread::sleep(when - now);
}

notify_clone.notify_one();
});


notify.notified().await;
}

Выбор

До сих пор, для добавления в систему параллелизма, мы создавали новую задачу. Рассмотрим другие способы одновременного выполнения асинхронного кода с помощью Tokio.

tokio::select!

Макрос tokio::select! позволяет ожидать выполнения нескольких асинхронных вычислений и возвращает результат после завершения любого из них.

Пример:

use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();

tokio::spawn(async {
let _ = tx1.send("one");
});

tokio::spawn(async {
let _ = tx2.send("two");
});

tokio::select! {
val = rx1 => {
println!("rx1 completed first with {:?}", val);
}
val = rx2 => {
println!("rx2 completed first with {:?}", val);
}
}
}

У нас есть два канала oneshot. Любой канал может завершиться первым. Оператор select! ожидает на обоих каналах и привязывает val к ​​значению, возвращаемому задачей. Когда tx1 или tx2 завершаются, соответствующий блок выполняется.

Ветка, которая не завершилась, удаляется. В этом примере вычисление ожидает oneshot::Receiver для каждого канала. oneshot::Receiver для канала, который еще не завершен, удаляется.

Отмена

В асинхронном Rust отмена выполняется путем удаления фьючера. Асинхронные операции в Rust реализуются с помощью фьючеров, которые являются ленивыми. Операция продолжается только при опросе фьючера. Если фьючер уничтожен, операция не может быть продолжена, поскольку все связанное состояние было удалено.

Тем не менее, иногда асинхронная операция порождает фоновые задачи или запускает другую операцию, выполняемую в фоновом режиме. В приведенном выше примере создается задача для отправки сообщения. Обычно задача выполняет некоторые вычисления для генерации значения.

Фьючеры или другие типы могут реализовать Drop для очистки. oneshot::Receiver реализует Drop, отправляя уведомление о закрытии половине Sender. Половина-отправитель получает это уведомление и прерывает текущую операцию, уничтожая ее.

use tokio::sync::oneshot;

async fn some_operation() -> String {
// Вычисляем значение
}

#[tokio::main]
async fn main() {
let (mut tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();

tokio::spawn(async {
// Выбираем операцию и уведомление `closed()`
tokio::select! {
val = some_operation() => {
let _ = tx1.send(val);
}
_ = tx1.closed() => {
// `some_operation()` отменена,
// задача завершена, `tx1` уничтожен
}
}
});

tokio::spawn(async {
let _ = tx2.send("two");
});

tokio::select! {
val = rx1 => {
println!("rx1 completed first with {:?}", val);
}
val = rx2 => {
println!("rx2 completed first with {:?}", val);
}
}
}

Реализация Future

Чтобы лучше понять, как работает select!, рассмотрим, как будет выглядеть гипотетическая реализация Future. Это упрощенная версия. На практике select! включает дополнительный функционал, такой как случайный выбор первой ветки для опроса.

use tokio::sync::oneshot;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

struct MySelect {
rx1: oneshot::Receiver<&'static str>,
rx2: oneshot::Receiver<&'static str>,
}

impl Future for MySelect {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if let Poll::Ready(val) = Pin::new(&mut self.rx1).poll(cx) {
println!("rx1 completed first with {:?}", val);
return Poll::Ready(());
}

if let Poll::Ready(val) = Pin::new(&mut self.rx2).poll(cx) {
println!("rx2 completed first with {:?}", val);
return Poll::Ready(());
}

Poll::Pending
}
}

#[tokio::main]
async fn main() {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();

// Используем `tx1` и `tx2`

MySelect {
rx1,
rx2,
}.await;
}

Фьючер MySelect содержит фьючеры из каждой ветки. При опросе MySelect опрашивается первая ветвь. Если она готова, ее значение возвращается и MySelect завершается. После того, как .await получает результат фьючера, он удаляется. Это приводит к тому, что фьючеры обоих ветвей уничтожаются. Поскольку одна из ветвей не завершается, соответствующая операция фактически отменяется.

Из предыдущего раздела:

Когда фьючер возвращает Poll::Pending, он должен гарантировать подачу сигнала о пробуждении в будущем. В противном случае, задача будет висеть вечно.

В реализации MySelect нет явного использования аргумента Context. Вместо этого, требование пробуждения удовлетворяется путем передачи cx во внутренние фьючеры. Поскольку внутренний фьючер также должен соответствовать требованию пробуждения, MySelect также удовлетворяет требованию пробуждения, возвращая Poll::Pending только при получении Poll::Pending из внутреннего фьючера.

Синтаксис

Макрос select! может обрабатывать более двух ветвей. Текущий лимит - 64 ветви. Каждая ветвь структурирована следующим образом:

<шаблон> = <асинхронное выражение> => <обработчик>,

При оценке select! все <асинхронные выражения> объединяются и выполняются одновременно. Когда выражение завершается, его результат сопоставляется с <шаблоном>. Если результат соответствует шаблону, все оставшиеся асинхронные выражения удаляются и выполняется <обработчик>. <обработчик> имеет доступ к любым привязкам, установленным <шаблоном>.

Основной случай использования <шаблона> - это название переменной: результат асинхронного выражения привязан к названию переменной, и <обработчик> имеет доступ к этой переменной. Вот почему в приведенном выше примере val используется для <шаблона>, а <обработчик> имеет доступ к val.

Если <шаблон> не соответствует результату асинхронного вычисления, оставшиеся асинхронные выражения продолжают выполняться одновременно до тех пор, пока не завершится следующее. К этому результату применяется та же логика.

Поскольку select! принимает любое асинхронное выражение, можно определить более сложные вычисления для выбора.

Здесь мы выбираем результат канала oneshot и TCP-соединение:

use tokio::net::TcpStream;
use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
let (tx, rx) = oneshot::channel();

// Создаем задачу, которая отправляет сообщение по каналу
tokio::spawn(async move {
tx.send("done").unwrap();
});

tokio::select! {
socket = TcpStream::connect("localhost:3465") => {
println!("socket connected {:?}", socket);
}
msg = rx => {
println!("received message {:?}", msg);
}
}
}

Здесь мы выбираем канал oneshot и принимаем сокеты из TcpListener:

use std::io;
use tokio::net::TcpListener;
use tokio::sync::oneshot;

#[tokio::main]
async fn main() -> io::Result<()> {
let (tx, rx) = oneshot::channel();

tokio::spawn(async move {
tx.send(()).unwrap();
});

let listener = TcpListener::bind("localhost:3465").await?;

tokio::select! {
_ = async {
loop {
let (socket, _) = listener.accept().await?;
tokio::spawn(async move { process(socket) });
}

// Недостижимый, но необходимый код
Ok::<_, io::Error>(())
} => {}
_ = rx => {
println!("terminating accept loop");
}
}

Ok(())
}

fn process(_socket: tokio::net::TcpStream) {}

Цикл выполняется до тех пор, пока не произойдет ошибка или пока rx не получит значение. Шаблон _ указывает, что нас не интересует результат асинхронной операции.

Возвращаемое значение

Макрос select! возвращает результат выражения, вычисленного <обработчиком>:

async fn computation1() -> String {
// ...
}

async fn computation2() -> String {
// ...
}

#[tokio::main]
async fn main() {
let out = tokio::select! {
res1 = computation1() => res1,
res2 = computation2() => res2,
};

println!("{out}");
}

По этой причине требуется, чтобы <обработчики> каждой ветви возвращали одинаковый тип. Если результат select! не требуется, рекомендуется, чтобы выражение оценивалось как ().

Ошибки

Оператор ? распространяет (propagate) ошибку из выражения (ошибка поднимается на уровень выше). Как это работает, зависит от того, где ? используется, в асинхронном выражении или в обработчике. В первом случае ? распространяет ошибку из асинхронного выражения. Это делает результат асинхронного выражения Result. Во втором случае ? немедленно распространяет ошибку за пределы выражения select!. Давайте еще раз посмотрим на пример цикла принятия TCP-соединений:

use tokio::net::TcpListener;
use tokio::sync::oneshot;
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
// ...

let listener = TcpListener::bind("localhost:3465").await?;

tokio::select! {
res = async {
loop {
let (socket, _) = listener.accept().await?;
tokio::spawn(async move { process(socket) });
}

Ok::<_, io::Error>(())
} => {
res?;
}
_ = rx => {
println!("terminating accept loop");
}
}

Ok(())
}

Обратите внимание на listener.accept().await?. Оператор ? перемещает ошибку из этого выражения в переменную res. В случае ошибки для параметра res будет установлено значение Err(_). Затем в обработчике снова используется ?. Инструкция res? распространяет ошибку из функции main.

Сопоставление с шаблоном

Напомним синтаксис ветви select!:

<шаблон> = <асинхронное выражение> => <обработчик>,

До сих пор мы использовали привязки переменных только для <шаблона>. Однако можно использовать любой шаблон Rust. Предположим, что мы получаем данные из нескольких каналов MPSC, мы можем сделать следующее:

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
let (mut tx1, mut rx1) = mpsc::channel(128);
let (mut tx2, mut rx2) = mpsc::channel(128);

tokio::spawn(async move {
// Делаем что-нибудь с `tx1` и `tx2`
});

tokio::select! {
Some(v) = rx1.recv() => {
println!("Got {:?} from rx1", v);
}
Some(v) = rx2.recv() => {
println!("Got {:?} from rx2", v);
}
else => {
println!("Both channels closed");
}
}
}

В этом примере выражение select! ожидает получения значения от rx1 и rx2. Если канал закрывается, recv() возвращает None. Это не соответствует шаблону, и ветка отключается. select! продолжает ожидать оставшиеся ветки.

Обратите внимание, что этот select! включает ветку else. select! должен возвращать значение. При использовании сопоставления с шаблоном возможно, что ни одна из ветвей не будет соответствовать шаблону. В этом случае оценивается ветвь else.

Заимствование

При создании задачи, порожденное асинхронное выражение должно владеть всеми своими данными. Макрос select! не имеет этого ограничения. Асинхронное выражение каждой ветки может заимствовать данные и работать с ними одновременно. Следуя правилам заимствования Rust, несколько асинхронных выражений могут заимствовать один неизменный фрагмент данных, или только одно асинхронное выражение может заимствовать изменяемый фрагмент данных.

Рассмотрим несколько примеров. Здесь мы одновременно отправляем одни и те же данные в два разных пункта назначения TCP:

use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use std::io;
use std::net::SocketAddr;

async fn race(
data: &[u8],
addr1: SocketAddr,
addr2: SocketAddr
) -> io::Result<()> {
tokio::select! {
Ok(_) = async {
let mut socket = TcpStream::connect(addr1).await?;
socket.write_all(data).await?;
Ok::<_, io::Error>(())
} => {}
Ok(_) = async {
let mut socket = TcpStream::connect(addr2).await?;
socket.write_all(data).await?;
Ok::<_, io::Error>(())
} => {}
else => {}
};

Ok(())
}

Переменная data неизменяемо заимствована в обоих асинхронных выражениях. Когда любая операция завершается успешно, другая уничтожается. Поскольку мы сопоставляем шаблон с Ok(_), в случае провала одного выражения, другое продолжит выполняться.

Когда дело доходит до <обработчика> каждой ветки, select! гарантирует, что запускается только один <обработчик>. Из-за этого каждый <обработчик> может мутабельно заимствовать одни и те же данные.

Например, в следующем примере out модифицируется в обоих обработчиках:

use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();

let mut out = String::new();

tokio::spawn(async move {
// Отправляем значения в `tx1` и `tx2`
});

tokio::select! {
_ = rx1 => {
out.push_str("rx1 completed");
}
_ = rx2 => {
out.push_str("rx2 completed");
}
}

println!("{}", out);
}

Циклы

Макрос select! часто используется в циклах. Рассмотрим несколько примеров. Начнем с выбора нескольких каналов:

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
let (tx1, mut rx1) = mpsc::channel(128);
let (tx2, mut rx2) = mpsc::channel(128);
let (tx3, mut rx3) = mpsc::channel(128);

loop {
let msg = tokio::select! {
Some(msg) = rx1.recv() => msg,
Some(msg) = rx2.recv() => msg,
Some(msg) = rx3.recv() => msg,
else => { break }
};

println!("Got {:?}", msg);
}

println!("All channels have been closed.");
}

В этом примере выполняется выбор приемников трех каналов. При получении сообщения по любому каналу, оно записывается в STDOUT. Когда канал закрывается, recv() возвращает None. Благодаря сопоставлению с шаблоном, макрос select! продолжает ждать сообщения на остальных каналах. Когда все каналы закрываются, оценивается ветвь else и цикл завершается.

Макрос select! произвольно выбирает ветку для проверки ее готовности. Если несколько каналов имеют ожидающие значения, для приема будет выбран произвольный канал. Это необходимо для обработки случая, когда цикл приема обрабатывает сообщения медленнее, чем они передаются в каналы, а это означает, что каналы начинают заполняться (fill up). Если select! не будет произвольно выбирать ветвь для проверки, на каждой итерации цикла сначала будет проверяться rx1. Если rx1 всегда будет содержать новое сообщение, остальные каналы никогда не будут проверяться.

Если при оценке select! несколько каналов имеют ожидающие сообщения, извлекается значение только из одного канала. Все остальные каналы остаются нетронутыми, и их сообщения остаются в этих каналах до следующей итерации цикла. Ни одно сообщение не теряется.

Возобновление асинхронной операции

Теперь посмотрим, как запустить асинхронную операцию в нескольких select!. В этом примере у нас есть канал MPSC с типом элемента i32 и асинхронная функция. Мы хотим запускать асинхронную функцию до тех пор, пока она не завершится, или пока в канале не будет получено четное целое число.

async fn action() {
// ...
}

#[tokio::main]
async fn main() {
let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);

let operation = action();
tokio::pin!(operation);

loop {
tokio::select! {
_ = &mut operation => break,
Some(v) = rx.recv() => {
if v % 2 == 0 {
break;
}
}
}
}
}

Обратите внимание, что вместо вызова функции action в макросе select!, она вызывается вне цикла. Результат action() присваивается переменной operation без вызова .await. Затем мы вызываем макрос tokio::pin! на operation.

В цикле select!, вместо передачи operation, мы передаем &mut operation. operation отслеживает текущую асинхронную операцию. Каждая итерация цикла использует одну и ту же operation вместо нового вызова action().

Другой select! получает сообщение из канала. Если сообщение является четным целым числом, цикл завершается. В противном случае, снова запускается select!.

Здесь мы впервые используем tokio::pin!. Мы пока не будем вдаваться в подробности закрепления (pinning). Следует отметить, что для ожидания ссылки, значение, на которое ссылаются, должно быть закреплено или реализовывать Unpin.

Если мы удалим строку tokio::pin! и попытаемся скомпилировать код, то получим следующую ошибку:

error[E0599]: no method named `poll` found for struct
`std::pin::Pin<&mut &mut impl std::future::Future>`
in the current scope
--> src/main.rs:16:9
|
16 | / tokio::select! {
17 | | _ = &mut operation => break,
18 | | Some(v) = rx.recv() => {
19 | | if v % 2 == 0 {
... |
22 | | }
23 | | }
| |_________^ method not found in
| `std::pin::Pin<&mut &mut impl std::future::Future>`
|
= note: the method `poll` exists but the following trait bounds
were not satisfied:
`impl std::future::Future: std::marker::Unpin`
which is required by
`&mut impl std::future::Future: std::future::Future`

Если вы столкнулись с такой ошибкой, вероятно, Future необходимо закрепить. Узнать больше о Pin можно здесь.

Модификация ветки

Рассмотрим немного более сложный цикл. У нас есть:

  1. Канал значений i32.
  2. Асинхронная операция над этими значениями.

Логика, которую мы хотим реализовать:

  1. Ждем получения четного числа из канала.
  2. Запускаем асинхронную операцию с четным числом в качестве аргумента.
  3. Ждем завершения операции и в тоже время регистрируем новые четные числа, поступающие из канала.
  4. Если до завершения операции получено новое четное число, прерываем текущую операцию и запускаем новую.
async fn action(input: Option<i32>) -> Option<String> {
// Если аргументом является `None`, возвращаем `None`.
// Это можно переписать как `let i = input?;`
let i = match input {
Some(input) => input,
None => return None,
};
// Асинхронная логика
}

#[tokio::main]
async fn main() {
let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);

let mut done = false;
let operation = action(None);
tokio::pin!(operation);

tokio::spawn(async move {
let _ = tx.send(1).await;
let _ = tx.send(3).await;
let _ = tx.send(2).await;
});

loop {
tokio::select! {
res = &mut operation, if !done => {
done = true;

if let Some(v) = res {
println!("GOT = {}", v);
return;
}
}
Some(v) = rx.recv() => {
if v % 2 == 0 {
// `.set` это метод, предоставляемый `Pin`
operation.set(action(Some(v)));
done = false;
}
}
}
}
}

Мы используем ту же стратегию, что и в предыдущем примере. Асинхронная функция action вызывается вне цикла и присваивается переменной operation. operation закрепляется. Цикл выбирает как операцию, так и приемник канала.

Обратите внимание, что action принимает Option<i32> в качестве аргумента. Перед получением первого четного числа, нам нужно инициализировать чем-то operation. Мы делаем так, чтобы action() принимала Option и возвращала Option. Если передается None, возвращается None. На первой итерации цикла, операция немедленно завершается со значением None.

В этом примере используется новый синтаксис. Первая ветка включает , if !done. Это предварительное условие ветвления. Прежде чем объяснять, как это работает, давайте посмотрим, что произойдет, если предусловие опустить. Удаление , if !done и выполнение примера приведет к следующему результату:

thread 'main' panicked at '`async fn` resumed after completion', src/main.rs:1:55
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

Эта ошибка возникает при попытке использовать operation после ее завершения. Обычно при использовании .await, ожидаемое значение потребляется. В этом примере мы ожидаем ссылку. Это означает, что operation существует после завершения.

Чтобы избежать этой паники, мы должны позаботиться о том, чтобы отключить первую ветвь, если операция завершена. Переменная done используется для отслеживания того, завершена ли операция. Ветвь select! может включать предварительное условие. Это предварительное условие проверяется перед тем, как select! начинает ждать на ветке. Если условие оценивается как false, ветвь отключается. done инициализируется значением false. После завершения operation, done устанавливается в значение true. На следующей итерации ветка отключается. При получении четного числа из канала, operation сбрасывается, и done устанавливается в false.

Параллелизм по задачам

И tokio::spawn(), и select! позволяют запускать параллельные асинхронные операции. Однако стратегия, используемая для выполнения параллельных вычислений, отличается. Функция tokio::spawn принимает асинхронную операцию и создает новую задачу для ее выполнения. Задача - это объект, который планируется средой выполнения Tokio. Две разные задачи планируются независимо. Они могут выполняться одновременно в разных потоках операционной системы. По этой причине порожденная задача имеет то же ограничение, что и порожденный поток: отсутствие заимствований.

Макрос select! попеременно запускает все ветки одной и той же задачи. Поскольку все ветки выполняются для одной и той же задачи, они никогда не будут запускаться одновременно. Макрос select! мультиплексирует асинхронные операции в одной задаче.

Потоки

Поток - это асинхронная серия значений. Это асинхронный эквивалент std::iter::Iterator, представленный типажом Stream. Потоки можно перебирать в асинхронных функциях. Их также можно трансформировать с помощью адаптеров. Tokio предоставляет несколько распространенных адаптеров в типаже StreamExt.

Tokio предоставляет поддержку потоков в отдельном крейте tokio-stream:

tokio-stream = "0.1"

В настоящее время утилиты для работы с потоками Tokio находятся в крейте tokio-stream. После стабилизации Stream в стандартной библиотеке Rust, эти утилиты будут перемещены в крейт tokio.

Перебор

В настоящее время язык программирования Rust не поддерживает асинхронные циклы for. Вместо этого перебор потоков выполняется с помощью цикла while let в сочетании с методом StreamExt::next:

use tokio_stream::StreamExt;

#[tokio::main]
async fn main() {
let mut stream = tokio_stream::iter(&[1, 2, 3]);

while let Some(v) = stream.next().await {
println!("GOT = {:?}", v);
}
}

Как и итераторы, метод next возвращает Option<T>, где T - тип значения потока. Получение None указывает на то, что итерация потока завершена.

Трансляция Mini-Redis

Рассмотрим немного более сложный пример с использованием клиента Mini-Redis. Полный код можно найти здесь.

use tokio_stream::StreamExt;
use mini_redis::client;

async fn publish() -> mini_redis::Result<()> {
let mut client = client::connect("127.0.0.1:6379").await?;

// Публикуем некоторые данные
client.publish("numbers", "1".into()).await?;
client.publish("numbers", "two".into()).await?;
client.publish("numbers", "3".into()).await?;
client.publish("numbers", "four".into()).await?;
client.publish("numbers", "five".into()).await?;
client.publish("numbers", "6".into()).await?;
Ok(())
}

async fn subscribe() -> mini_redis::Result<()> {
let client = client::connect("127.0.0.1:6379").await?;
let subscriber = client.subscribe(vec!["numbers".to_string()]).await?;
let messages = subscriber.into_stream();

tokio::pin!(messages);

while let Some(msg) = messages.next().await {
println!("got = {:?}", msg);
}

Ok(())
}

#[tokio::main]
async fn main() -> mini_redis::Result<()> {
tokio::spawn(async {
publish().await
});

subscribe().await?;

println!("done");

Ok(())
}

Сначала создается задача для публикации сообщений на сервере Mini-Redis в канале numbers. Затем мы подписываемся на этот канал в основной задаче и отображаем полученные сообщения.

После подписки на вернувшемся подписчике вызывается метод into_stream. Это потребляет Subscriber, возвращая поток, который выдает сообщения по мере их поступления. Обратите внимание, что перед перебором сообщений поток закрепляется в стеке с помощью макроса tokio::pin!. Вызов next() для потока требует его закрепления. Функция into_stream возвращает незакрепленный поток, мы должны явно закрепить его, чтобы выполнить итерацию.

Значение Rust "закрепляется", когда его больше нельзя перемещать в памяти. Ключевой особенностью закрепленного значения является то, что указатели на него всегда остаются действительными. Эта особенность используется async/await для поддержки заимствования данных через точки .await.

Если мы забудем закрепить поток, то получим ​​ошибку.

Запускаем сервер Mini-Redis:

mini-redis-server

Запускаем пример:

got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })
got = Ok(Message { channel: "numbers", content: b"four" })
got = Ok(Message { channel: "numbers", content: b"five" })
got = Ok(Message { channel: "numbers", content: b"6" })

Некоторые ранние сообщения могут быть пропущены, поскольку между подпиской и публикацией идет гонка. Программа никогда не завершается. Подписка на канал Mini-Redis остается активной, пока активен сервер.

Посмотрим, как мы можем работать с потоками, чтобы расширить эту программу.

Адаптеры

Функции, которые принимают Stream и возвращают другой Stream, часто называют "адаптерами потока", поскольку они представляют собой форму "шаблона адаптера" (adapter pattern). Популярными адаптерами потока являются map, take и filter.

Обновим Mini-Redis, чтобы он завершал работу. После получения трех сообщений прекращаем получать сообщения. Это делается с помощью take(). Этот адаптер ограничивает поток так, чтобы он выдавал не более n сообщений:

let messages = subscriber
.into_stream()
.take(3);

Запускаем программу:

got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })

На этот раз программа завершается.

Теперь давайте ограничим поток однозначными числами. Проверяем длину сообщения. Для отброса любого сообщения, не соответствующего предикату, используется filter():

let messages = subscriber
.into_stream()
.filter(|msg| match msg {
Ok(msg) if msg.content.len() == 1 => true,
_ => false,
})
.take(3);

Запускаем программу:

got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"3" })
got = Ok(Message { channel: "numbers", content: b"6" })

Обратите внимание, что порядок применения адаптеров имеет значение. Вызов filter(), а затем take() отличается от вызова take(), а затем filter().

Наконец, приведем в порядок вывод, удалив часть Ok(Message { ... }). Это делается с помощью map(). Поскольку map() вызывается после filter(), мы знаем, что сообщение Ok, поэтому можем использовать unwrap():

let messages = subscriber
.into_stream()
.filter(|msg| match msg {
Ok(msg) if msg.content.len() == 1 => true,
_ => false,
})
.map(|msg| msg.unwrap().content)
.take(3);

Запускаем программу:

got = b"1"
got = b"3"
got = b"6"

filter() и map() можно объединить в один вызов с помощью filter_map().

Существуют и другие адаптеры.

Реализация Stream

Типаж Stream очень похож на типаж Future:

use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Stream {
type Item;

fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Self::Item>>;

fn size_hint(&self) -> (usize, Option<usize>) {
(0, None)
}
}

Функция Stream::poll_next во многом похожа на функцию Future::poll, за исключением того, что ее можно вызывать повторно для получения нескольких значений из потока. Как мы видели в одном из предыдущих разделов, когда поток не готов вернуть значение, он возвращает Poll::Pending. При этом регистрируется будильник задачи. Как только поток должен быть снова опрошен, будильник получает уведомление.

Метод size_hint используется так же, как и с итераторами.

Обычно при реализации потока вручную, это делается путем композиции фьючеров и других потоков. В качестве примера перепишем фьючер Delay. Мы преобразуем его в поток, который выдает () три раза с интервалом 10 мс:

use tokio_stream::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

struct Interval {
rem: usize,
delay: Delay,
}

impl Interval {
fn new() -> Self {
Self {
rem: 3,
delay: Delay { when: Instant::now() }
}
}
}

impl Stream for Interval {
type Item = ();

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<()>>
{
if self.rem == 0 {
// Задержек больше нет
return Poll::Ready(None);
}

match Pin::new(&mut self.delay).poll(cx) {
Poll::Ready(_) => {
let when = self.delay.when + Duration::from_millis(10);
self.delay = Delay { when };
self.rem -= 1;
Poll::Ready(Some(()))
}
Poll::Pending => Poll::Pending,
}
}
}

async-stream

Реализация потоков вручную с помощью типажа Stream может быть утомительной. К сожалению, язык программирования Rust пока не поддерживает синтаксис async/await для определения потоков.

В качестве временного решения можно использовать крейт async-stream. Он предоставляет макрос stream!, который преобразует входные данные в поток. С помощью этого крейта вышеуказанный интервал можно реализовать следующим образом:

use async_stream::stream;
use std::time::{Duration, Instant};

stream! {
let mut when = Instant::now();

for _ in 0..3 {
let delay = Delay { when };
delay.await;
yield ();
when += Duration::from_millis(10);
}
}