Tokio
Tokio - это асинхронная среда выполнения (runtime) кода Rust. Она предоставляет строительные блоки, необходимые для разработки сетевых приложений любого размера.
Обзор
На высоком уровне Tokio предоставляет несколько основных компонентов:
- многопоточную среду выполнения асинхронного кода
- асинхронную версию стандартной библиотеки (std)
- большую экосистему библиотек
Назначение Tokio
Асинхронный код позволяет выполнять несколько задач одновременно. Однако Rust не предоставляет встроенных инструментов для выполнения такого кода, для этого требуется сторонняя среда выполнения. Tokio является самой популярной из таких сред.
Кроме того, Tokio предоставляет множество полезных утилит. При написании асинхронного кода мы не можем использовать обычные блокирующие API, предоставляемые стандартной библиотекой Rust, и должны использовать их асинхронные версии. Tokio предоставляет такие версии, где это необходимо.
Преимущества Tokio
Скорость
Tokio является быстрым, поскольку таковым является сам Rust. Производительность кода, написанного с помощью Tokio, сопоставима с производительностью кода, написанного самостоятельно.
Tokio является масштабируемым благодаря масштабируемости async/await
. При работе с сетью существует естественный предел скорости обработки запроса из-за задержки (latency), поэтому единственным способом масштабирования является обработка нескольких запросов одновременно. async/await
позволяет легко и дешево увеличивать количество одновременно выполняемых задач.
Надежность
Tokio основан на Rust - языке, позволяющем разрабатывать надежное и эффективное ПО. Разные исследования (например, это и это) показывают, что около 70% серьезных ошибок безопасности являются результатом небезопасной работы с памятью. Использование Rust избавляет от всего этого класса багов в приложении.
Tokio сосредоточен на предоставлении предсказуемого поведения без сюрпризов. Основная цель Tokio - предоставить пользователям возможность развертывать предсказуемое ПО, которое будет работать одинаково изо дня в день с одинаковым временем отклика и без непредсказуемых скачков задержки.
Простота
Благодаря появлению async/await
в Rust, сложность создания асинхронных приложений существенно снизилась. В сочетании с утилитами Tokio и динамичной экосистемой, разработка асинхронных приложений становится проще простого.
Tokio следует соглашению об именовании стандартной библиотеки, когда это имеет смысл. Это позволяет легко конвертировать код, написанный с использованием стандартной библиотеки, в код, написанный с помощью Tokio. Благодаря строгой системе типов Rust, обеспечение правильности написанного кода не знает себе равных.
Случаи, для которых Tokio не подходит
Хотя Tokio полезен для многих проектов, в которых необходимо выполнять много задач одновременно, есть случаи, для которых он не подходит:
- ускорение вычислений, выполняемых ЦП, за счет их параллельного выполнения в нескольких потоках. Tokio предназначен для приложений, связанных с вводом-выводом, где каждая отдельная задача проводит большую часть времени в ожидании ввода-вывода. Если все, что делает наше приложение, - это параллельные вычисления, нам следует использовать rayon. Впрочем, ничто не мешает нам использовать Tokio и
rayon
вместе (пример) - чтение большого количества файлов. Может показаться, что Tokio будет полезен для проектов, которым нужно читать много файлов, однако Tokio здесь не дает никаких преимуществ по сравнению с обычным пулом потоков (thread pool). Это связано с тем, что ОС обычно не предоставляют асинхронные файловые API
- отправка единственного веб-запроса. Tokio полезен, когда нужно выполнять много задач одновременно. Если нам нужно использовать библиотеку, предназначенную для выполнения асинхронного кода, такую как reqwest, но не нужно выполнять много задач одновременно, следует предпочесть блокирующую версию этой библиотеки, поскольку она упростит код проекта. Конечно, в этом случае Tokio будет работать, но не даст никаких преимуществ перед блокирующим API
Настройка
Этот туториал шаг за шагом проведет вас через процесс создания клиента и сервера Redis. Мы начнем с основ асинхронного программирования на Rust и будем двигаться дальше. Мы реализуем несколько команд Redis и получим полный обзор Токио.
Проект, который мы создадим в этом туториале, доступен как Mini-Redis на GitHub. Mini-Redis разработан с основной целью изучения Tokio и поэтому очень хорошо прокомментирован, но это также означает, что в Mini-Redis отсутствуют некоторые функции, которые нужны в настоящей библиоте ке Redis. Готовые к использованию библиотеки Redis можно найти на crates.io.
Предварительные условия
Для прохождения этого туториала вы должны быть знакомы с Rust.
Хотя это и не обязательно, но некоторый опыт написания сетевого кода с использованием стандартной библиотеки Rust или другого языка программирования может оказаться полезным.
Рекомендуется использовать самую последнюю стабильную версию Rust.
Далее нужно установить сервер Mini-Redis. Он будет использоваться для тестирования разрабатываемого нами клиента.
cargo install mini-redis
Команда для запуска сервера:
mini-redis-server
Команда для получения значения по ключу foo
(выполняем в отдельном терминале):
mini-redis-cli get foo
Выполнение этой команды должно привести к отображению (nil)
в терминале.
Привет, Tokio
Начнем с создания очень простого приложения Tokio. Оно будет подключаться к серверу Mini-Redis и устанавливать ключ hello
в значение world
. Затем оно будет читать значение этого ключа. Это будет делаться с помощью клиента Mini-Redis.
Код
Создание крейта
Начнем с генерации нового приложения Rust:
cargo new my-redis
cd my-redis
Добавление зависимостей
Открываем файл Cargo.toml
и добавляем следующие записи в раздел [dependencies]
:
tokio = { version = "1", features = ["full"] }
mini-redis = "0.4"
На писание кода
Редактируем файл main.rs
следующим образом:
use mini_redis::{client, Result};
#[tokio::main]
async fn main() -> Result<()> {
// Подключаемся к серверу mini-redis
let mut client = client::connect("127.0.0.1:6379").await?;
// Устанавливаем ключ "hello" в значение "world"
client.set("hello", "world".into()).await?;
// Получаем значение ключа "hello"
let result = client.get("hello").await?;
println!("От сервера получено: {:?}", result);
Ok(())
}
Убедитесь, что сервер Mini-Redis запущен.
Запускаем приложение my-redis
:
cargo run
Получаем:
От сервера получено: Some(b"world")
Полный код примера можно найти здесь.
Разбор
Разберем код, который мы написали, построчно. Его мало, н о делает он много всего.
let mut client = client::connect("127.0.0.1:6379").await?;
Функция client::connect предоставляется крейтом mini-redis. Она асинхронно устанавливает TCP-соединение с указанным сервером (адресом). После установки соединения возвращается дескриптор клиента (client
). Несмотря на то, что операция выполняется асинхронно, код выглядит синхронным. Единственный признак того, что операция является асинхронной, - оператор .await
.
Что такое асинхронное программирование?
Большинство компьютерных программ выполняются в том порядке, в котором они написаны. Выполняется первая строка, затем следующая и т.д. В синхронном программировании, когда программа встречает операцию, которая не может быть завершена немедленно, она блокируется до тех пор, пока операция не завершится. Например, для установки TCP-соединения требуется обмен данными с одноранговым узлом (peer) по сети, что занимает некоторое время. В это время поток блокируется.
В асинхронном программировании операции, которые не могут завершиться немедленно, приостанавливаются в фоновом режиме. Поток не блокируется и может выполнять другие задачи. После завершения операции, задача возобновляется и продолжает выполняться с того места, где она была остановлена. В нашем примере имеется только одна задача, поэтому пока она приостановлена, ничего не происходит, но асинхронные программы обычно имеют много таких задач.
Хотя асинхронное программирование может привести к созданию более быстрых приложений, оно часто приводит к созданию гораздо более сложных программ. Программисту необходимо отслеживать все состояния, необходимые для возобновления работы после завершения асинхронной операции. Это утомительная и подверженная ошибкам задача.
Rust реализует асинхронное программирование с помощью паттерна async/await. Функции, выполняющие асинхронные операции, помечаются ключевым словом async
. В нашем примере функция connect
определяется следующим образом:
use mini_redis::Result;
use mini_redis::client::Client;
use tokio::net::ToSocketAddrs;
pub async fn connect<T: ToSocketAddrs>(addr: T) -> Result<Client> {
// ...
}
async fn
выглядит как обычная синхронная функция, но работает асинхронно. Rust преобразует асинхронную функцию во время компиляции в подпрограмму (routine), работающую асинхронно. Вызов .await
внутри async fn
возвращает управление потоку. Поток может выполнять другую работу, пока асинхронная операция выполняется в фоновом режиме.
Хотя другие языки тоже реализуют
async/await
, Rust использует уникальный подход. Прежде всего, асинхронные операции в Rust являются ленивыми (lazy). Это приводит к отличиям в семантике времени выполнения.
Использование async/await
Асинхронная функция вызываются также, как обычная. Однако вызов такой функции не приводит к выполнению ее тела. Вместо этого вызов async fn
возвращает значение, представляющее операцию. Это концептуально аналогично замыканию с нулевым аргументом (zero-argument closure). Для фактического запуска операции используется оператор .await
на возвращаемом значении.
Например, результатом выполнения программы
async fn say_world() {
println!("world");
}
#[tokio::main]
async fn main() {
// Вызов функции `say_world` не выполняет ее тело
let op = say_world();
// Сначала выполняется этот код
println!("hello");
// Вызов `.await` на `op` выполняет тело `say_world()`
op.await;
}
является
hello
world
Возвращаемое значение async fn
- это анонимный тип, реализующий типаж (трейт) Future (фьючер).
Асинхронная функция main
Функция main
, используемая для запуска приложения, отличается от обычной, встречающейся в большинстве крейтов Rust:
- Это
async fn
. - Она аннотирована с помощью
#[tokio::main]
.
async fn
используется для входа в асинхронный контекст. Однако асинхронные функции должны выполняться runtime. runtime
содержит асинхронный планировщик задач, обеспечивает событийный ввод-вывод, таймеры и др. Среда выполнения не запускается автоматически, поэтому ее должна запустить функция main
.
Функция #[tokio::main]
- это макрос. Она преобразует async fn main()
в синхронную fn main()
, которая инициализирует экземпляр среды выполнения и выполняет тело асинхронной функции.
Например, код
#[tokio::main]
async fn main() {
println!("hello");
}
трансформируется в
fn main() {
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
println!("hello");
})
}
Возможности Cargo
В этом туториале используются все возможности Tokio (флаг full
):
tokio = { version = "1", features = ["full"] }
Tokio предоставляет богатый функционал (TCP, UDP, сокеты Unix, таймеры, утилиты синхронизации, несколько типов планировщиков и др.). Не всем приложениям нужен весь этот функционал. При оптимизации времени компиляции или размера конечного файла приложения можно указать только те функции, которые используются приложением.
Создание потоков
Прис тупим к разработке сервера Redis.
Сначала переместим код клиента из предыдущего раздела в отдельный файл:
mkdir -p examples
mv src/main.rs examples/hello-redis.rs
Затем создадим новый пустой файл src/main.rs
.
Прием сокетов
Первое, что должен делать наш сервер, - принимать входящие TCP-сокеты. Это делается путем привязки tokio::net::TcpListener к порту 6379.
Многие типы Tokio называются также, как их синхронные эквиваленты в стандартной библиотеке Rust. Когда это имеет смысл, Tokio предоставляет те же API, что и
std
, но с использованиемasync fn
.
Сокеты принимаются в цикле. Каждый сокет обрабатывается и закрывается. Прочитаем команду, выведем ее на стандартный вывод и ответим ошибкой:
// src/main.rs
use tokio::net::{TcpListener, TcpStream};
use mini_redis::{Connection, Frame};
#[tokio::main]
async fn main() {
// Привязываем обработчик к адресу
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
loop {
// Второй элемент содержит IP и порт нового подключения
let (socket, _) = listener.accept().await.unwrap();
process(socket).await;
}
}
async fn process(socket: TcpStream) {
// `Connection` позволяет читать/писать кадры (frames) redis вместо
// потоков байтов. Тип `Connection` определяется mini-redis
let mut connection = Connection::new(socket);
if let Some(frame) = connection.read_frame().await.unwrap() {
println!("GOT: {:?}", frame);
// Отвечаем ошибкой
let response = Frame::Error("Unimplemented".to_string());
connection.write_frame(&response).await.unwrap();
}
}
Запускаем программу:
cargo run
Запускаем пример hello-redis
в отдельном терминале:
cargo run --example hello-redis
Вывод в терминале примера:
Error: "Unimplemented"
Вывод в терминале сервера:
GOT: Array([Bulk(b"set"), Bulk(b"hello"), Bulk(b"world")])
Одновременность
У нашего сервера есть небольшая проблема (помимо того, что он отвечает только ошибками). Он обрабатывает входящие запросы по одному. После установки соединения, сервер остается внутри цикла приема подключений до тех пор, пока ответ не будет полностью записан в сокет.
Мы хотим, чтобы наш сервер обрабатывал много запросов одновременно. Для этого нам нужно добавить немного конкурентности.
Одновременность (concurrency) и параллелизм (parallelism) - это не одно и тоже. Если мы переключаемся между двумя задачами, то мы работаем над ними одновременно, а не параллельно. Чтобы эту работу можно было считать параллельной, нам потребуются два человека, по одному на каждую задачу.
Одним из преимуществ использования Tokio является то, что асинхронный код позволяет работать над многими задачами одновременно, без необходимости работать над ними параллельно с использованием обычных потоков. Фактически, Tokio может выполнять множество задач одновременно в одном потоке!
Для одновременной обработки соединений для каждого входящего соединения создается новая задача. Соединение обрабатывается этой задачей.
Цикл принятия соединений становится таким:
use tokio::net::TcpListener;
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
loop {
let (socket, _) = listener.accept().await.unwrap();
// Для каждого входящего сокета создается новая задача. Сокет
// перемещается в новую задачу и обрабатывается там
tokio::spawn(async move {
process(socket).await;
});
}
}
Задачи
Задача Tokio - это асинхронный зеленый поток (green thread). Они создаются путем передачи async
блока в tokio::spawn()
. Функция tokio::spawn
возвращает JoinHandle
, который вызывающая сторона может использовать для взаимодействия с созданной задачей. async
блок может иметь возвращаемое значение. Вызывающая сторона может получить его с помощью .await
на JoinHandle
.
Например:
#[tokio::main]
async fn main() {
let handle = tokio::spawn(async {
// Выполняем асинхронную работу
"return value"
});
// Выполняем другую работу
let out = handle.await.unwrap();
println!("GOT: {}", out);
}
Ожидание JoinHandle
возвращает Result
. Если во время выполнения задачи возникает ошибка, JoinHandle
возвращает Err
. Это происходит, когда задача либо вызывает панику, либо принудительно отменяется из-за закрытия среды выполнения.
Задача - это единица выполнения (unit of execution), управляемая планировщиком. При создании задачи она передается планировщику Tokio, который гарантирует ее выполнение при появлении у нее работы. Порожденная задача может выполняться в том же потоке, в котором она была создана, или в другом потоке времени выполнения. Задачу также можно перемещать между потоками после создания.
Задачи в Токио очень легкие. По сути, им требуется только одно выделение и 64 байта памяти. Приложения должны иметь возможность свободно создавать тысячи, если не миллионы задач.
Привязка 'static
При создании задачи в среде выполнения Tokio, время жизни ее типа должно быть 'static
. Это означает, что порожденная задача не должна содержать никаких ссылок на данные, не принад лежащие ей.
Распространено заблуждение, что
'static
означает "жить вечно", но это не так. Тот факт, что значение является'static
, не означает, что у нас есть утечка памяти. Больше об этом можно прочитать здесь.
Например, следующий код не скомпилируется:
use tokio::task;
#[tokio::main]
async fn main() {
let v = vec![1, 2, 3];
task::spawn(async {
println!("Это вектор: {:?}", v);
});
}
error[E0373]: async block may outlive the current function, but
it borrows `v`, which is owned by the current function
--> src/main.rs:7:23
|
7 | task::spawn(async {
| _______________________^
8 | | println!("Это вектор: {:?}", v);
| | - `v` is borrowed here
9 | | });
| |_____^ may outlive borrowed value `v`
|
note: function requires argument type to outlive `'static`
--> src/main.rs:7:17
|
7 | task::spawn(async {
| _________________^
8 | | println!("Это вектор: {:?}", v);
9 | | });
| |_____^
help: to force the async block to take ownership of `v` (and any other
referenced variables), use the `move` keyword
|
7 | task::spawn(async move {
8 | println!("Это вектор: {:?}", v);
9 | });
|
Это происходит пот ому, что по умолчанию переменные не перемещаются в асинхронные блоки. Вектор v
остается во владении функции main
. println!
заимствует v
. Компилятор Rust любезно объясняет нам это и даже предлагает исправление! Изменение строки 7 на task::spawn(async move {
даст указание компилятору переместить v
в порожденную задачу. Теперь задача владеет всеми своими данными, что делает их 'static
.
Если часть данных должна быть доступна одновременно в нескольких задачах, ее необходимо распределять (сделать общей) с помощью примитивов синхронизации, таких как Arc
.
Обратите внимание, что в сообщении об ошибке говорится о том, что тип аргумента переживает время жизни 'static
. Эта терминология может сбивать с толку, поскольку время жизни 'static
длится до конца программы, поэтому, если тип переживает его, не возникает ли у нас утечки памяти? Объяснение состоит в том, что именно тип, а не значение, должен переживать время жизни 'static
, и значение может быть уничтожено до того, как его тип перестанет быть действительным.
Когда мы говорим, что значение являетс я "статическим", это означает лишь то, что было бы правильно хранить его вечно. Это важно, поскольку компилятор не может определить, как долго будет выполняться вновь созданная задача. Мы должны убедиться, что задаче разрешено жить вечно, чтобы Tokio мог выполнять ее столько, сколько необходимо.
"Привязка 'static
", "тип, переживающий 'static
" и "'static
значение" обозначают одно и тоже - T: 'static
, в отличие от "аннотации с помощью 'static
", как в &'static T
.
Привязка bound
Задачи, порожденные tokio::spawn()
, должны реализовывать типаж Send
. Это позволяет среде выполнения Tokio перемещать задачи между потоками, пока они приостановлены в .await
.
Задачи являются Send
, когда все данные, хранящиеся в вызовах .await
, являются таковыми. При вызове .await
задача возвращается (yields back) планировщику. При следующем выполнении задачи, она возобновляется с той точки, на которой была приостановлена (yielded) в последний раз. Чтобы это работало, все состояние, используемое после .await
, должно сохраняться задачей. Если это состояние являются Send
, т.е. его можно перемещать между потоками, то и саму задачу можно перемещать между потоками. И наоборот, если состояние не являются Send
, то и задача тоже.
Например, это работает:
use tokio::task::yield_now;
use std::rc::Rc;
#[tokio::main]
async fn main() {
tokio::spawn(async {
// Область видимости уничтожает `rc` перед `.await`
{
let rc = Rc::new("hello");
println!("{}", rc);
}
// `rc` больше не и спользуется. Он не сохраняется, когда
// задача возвращается планировщику
yield_now().await;
});
}
А это не работает:
use tokio::task::yield_now;
use std::rc::Rc;
#[tokio::main]
async fn main() {
tokio::spawn(async {
let rc = Rc::new("hello");
// `rc` используется после `.await`. Он должен быть сохранен в
// состоянии задачи
yield_now().await;
println!("{}", rc);
});
}
Попытка компиляции этого фрагмента завершается такой ошибкой:
error: future cannot be sent between threads safely
--> src/main.rs:6:5
|
6 | tokio::spawn(async {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: [..]spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in
| `tokio::task::spawn::spawn`
|
= help: within `impl std::future::Future`, the trait
| `std::marker::Send` is not implemented for
| `std::rc::Rc<&str>`
note: future is not `Send` as this value is used across an await
--> src/main.rs:10:9
|
7 | let rc = Rc::new("hello");
| -- has type `std::rc::Rc<&str>` which is not `Send`
...
10 | yield_now().await;
| ^^^^^^^^^^^^^^^^^ await occurs here, with `rc` maybe
| used later
11 | println!("{}", rc);
12 | });
| - `rc` is later dropped here
Хранение значений
Теперь мы реализуем функцию process
для обработки входящих команд. Мы будем использовать HashMap
для хранения значений. Команды SET
будут добавлять значения в HashMap
, а команды GET
будут извлекать значения из HashMap
. Кроме того, мы будем использовать цикл для приема нескольких команд в одном соединении.
use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};
async fn process(socket: TcpStream) {
use mini_redis::Command::{self, Get, Set};
use std::collections::HashMap;
// Хранилище данных
let mut db = HashMap::new();
// `Connection`, предоставляемое `mini-redis`, обрабатывает разбор кадров из сокета
let mut connection = Connection::new(socket);
// Используем `read_frame` для получения ком анды из соединения
while let Some(frame) = connection.read_frame().await.unwrap() {
let response = match Command::from_frame(frame).unwrap() {
Set(cmd) => {
// Значение хранится в виде `Vec<u8>`
db.insert(cmd.key().to_string(), cmd.value().to_vec());
Frame::Simple("OK".to_string())
}
Get(cmd) => {
if let Some(value) = db.get(cmd.key()) {
// `Frame::Bulk` ожидает, что данные будут иметь тип `Bytes`.
// Мы рассмотрим этот тип позже.
// `&Vec<u8>` преобразуется в `Bytes` с помощью метода `into`
Frame::Bulk(value.clone().into())
} else {
Frame::Null
}
}
cmd => panic!("Не реализовано {:?}", cmd),
};
// Отправляем (пишем) ответ клиенту
connection.write_frame(&response).await.unwrap();
}
}
Запускаем сервер:
cargo run
В отдельном терминале запускаем пример hello-redis
:
cargo run --example hello-redis
Полный код примера можно найти здесь.
Теперь мы можем устанавливать и получать значения, но есть одна проблема: значения не распределяются между соединениями. Если другой подключенный сокет попытается получить значение по ключу hello
, он получит (nil)
.
Общее с остояние
На данный момент у нас имеется работающий сервер "ключ-значение". Однако у него есть серьезный недостаток: состояние не распределяется между соединениями. Давайте это исправим.
Стратегии
В Tokio существует несколько разных способов распределять состояние:
- Защита общего состояния с помощью
Mutex
. - Создание задачи для управления состоянием и использование передачи сообщений для работы с ней.
Обычно первый подход используется для простых данных, а второй - для вещей, требующих асинхронной работы, таких как примитивы ввода-вывода. В нашем случае общим состоянием является HashMap
, а операциями - insert()
и get()
. Ни одна из этих операций не является асинхронной, поэтому мы будем использовать Mutex
.
Второй подход будет рассмотрен в следующей главе.
Добавление зависимости bytes
Вместо Vec<u8>
Mini-Redis использует Bytes
из крейта bytes. Цель Bytes
- предоставить надежную структуру массива байтов для сетевого программирования. Основная особенность, которую он добавляет к Vec<u8>
, - это поверхностное клонирование (shallow cloning). Другими словами, вызов метода clone
для экземпляра Bytes
не копирует данные. Тип Bytes
примерно соответствует Arc<Vec<u8>>
, но с некоторыми дополнительными возможностями.
Добавляем bytes
в раздел [dependencies]
файла Cargo.toml
:
bytes = "1"
Инициализация HashMap
HashMap
будет использоваться многими задачами и, возможно, многими потоками. Для поддержки этого его нужно обернуть в Arc<Mutex<_>>
.
Во-первых, для удобства добавим следующий псевдоним типа после операторов use
:
use bytes::Bytes;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
type Db = Arc<Mutex<HashMap<String, Bytes>>>;
Затем обновляем функцию main
, чтобы инициализировать HashMap
и передать дескриптор (handle) Arc
в функцию process
. Использование Arc
позволяет одновременно ссылаться на HashMap
из многих задач, потенциально работающих во многих потоках. В Tokio термин "дескриптор" используется для обозначения значения, которое обеспечивает доступ к некоторому общему состоянию.
use tokio::net::TcpListener;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
println!("Listening");
let db = Arc::new(Mutex::new(HashMap::new()));
loop {
let (socket, _) = listener.accept().await.unwrap();
// Клонируем дескриптор в `HashMap`
let db = db.clone();
println!("Accepted");
tokio::spawn(async move {
process(socket, db).await;
});
}
}
Об использовании std::sync::Mutex
Обратите внимание, что для защиты HashMap
используется std::sync::Mutex
, а не tokio::sync::Mutex
. Распространенной ошибкой является безусловное использование tokio::sync::Mutex
в асинхронном коде. Асинхронный мьютекс - это мьютекс, который блокируется при вызовах .await
.
Синхронный мьютекс блокирует текущий поток в ожидании получения блокировки (acquire the lock). Это, в свою очередь, блокирует обработку других задач. Однако переключение на tokio::sync::Mutex
обычно не помогает, поскольку асинхронный мьютекс использует синхронный мьютекс под капотом. Как правило, использование синхронного мьютекса в асинхронном коде вполне допустимо, пока конкуренция остается низкой и блокировка не удерживается при вызовах .await
.
Обновление функции process
Функция process
больше не инициализирует HashMap
. Вместо этого, она принимает общий дескриптор HashMap
в качестве параметра. Перед использованием HashMap
его необходимо заблокировать. Помните, что типом значения HashMap
теперь является Bytes
(который можно легко клонировать), поэтому его также необходимо изменить.
use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};
async fn process(socket: TcpStream, db: Db) {
use mini_redis::Command::{self, Get, Set};
let mut connection = Connection::new(socket);
while let Some(frame) = connection.read_frame().await.unwrap() {
let response = match Command::from_frame(frame).unwrap() {
Set(cmd) => {
// Блокируем `HashMap`
let mut db = db.lock().unwrap();
db.insert(cmd.key().to_string(), cmd.value().clone());
Frame::Simple("OK".to_string())
}
Get(cmd) => {
// Блокируем `HashMap`
let db = db.lock().unwrap();
if let Some(value) = db.get(cmd.key()) {
Frame::Bulk(value.clone())
} else {
Frame::Null
}
}
cmd => panic!("Не реализовано {:?}", cmd),
};
connection.write_frame(&response).await.unwrap();
}
}
Задачи, потоки и конкуренция
Использование блокирующего мьютекса для защиты небольших критических блоков кода является приемлемой стратегией, когда конкуренция минимальна. Когда возникает борьба за блокировку, поток, выполняющий задачу, должен заблокироваться и дождаться мьютекса. Это заблокирует не только текущую задачу, но также другие задачи, запланированные в текущем потоке.
По умолчанию Tokio использует многопоточный планировщик. Задачи планируются для любого количества потоков, управляемых средой выполнения. Если запланировано выполнение большого количества задач и все они требуют доступа к мьютексу, возникает конфликт. С другой стороны, если используется вариант среды выполнения (runtime flavor) current_thread, то мьютекс никогда не будет конкурировать.
Вариант среды выполнения
current_thread
представляет собой облегченную однопоточную среду выполнения. Это хороший выбор, когда создается всего несколько задач и открывается несколько сокетов. Например, этот вариант хорошо работает при предоставлении моста синхронного API поверх асинхронной клиентской библиотеки.
Если конкуренция за синхронный мьютекс становится проблемой, переключение на мьютекс Tokio редко будет лучшим решением. Вместо этого можно рассмотреть следующие варианты:
- переключение на специальную задачу для управления состоянием и использование передачи сообщений для работы с ней
- сегментирование (shard) мьютекса
- реструктуризация кода для удаления мьютекса
В нашем случае, поскольку каждый ключ независим, отлично подойдет сегментирование мьютекса. Для этого вместо одного экземпляра Mutex<HashMap<_, _>>
мы создадим N
отдельных экземпляров.
type ShardedDb = Arc<Vec<Mutex<HashMap<String, Vec<u8>>>>>;
fn new_sharded_db(num_shards: usize) -> ShardedDb {
let mut db = Vec::with_capacity(num_shards);
for _ in 0..num_shards {
db.push(Mutex::new(HashMap::new()));
}
Arc::new(db)
}
После этого поиск ячейки для любого заданного ключа становится двухэтапным процессом. Сна чала ключ используется для определения того, частью какого сегмента он является. Затем ключ просматривается в HashMap
.
let shard = db[hash(key) % db.len()].lock().unwrap();
shard.insert(key, value);
Простая реализация, описанная выше, требует использования фиксированного количества сегментов, и количество сегментов не может быть изменено после создания сегментированной карты. Крейт dashmap предоставляет реализацию более сложной сегментиров анной хэш-карты.
Удержание MutexGuard
через .await
Мы можем написать код, который выглядит так:
use std::sync::{Mutex, MutexGuard};
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
do_something_async().await;
} // здесь `lock` выходит из области видимости
При попытке создать что-то, вызывающее эту функцию, мы получим следующее сообщение об ошибке:
error: future cannot be sent between threads safely
--> src/lib.rs:13:5
|
13 | tokio::spawn(async move {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in `tokio::task::spawn::spawn`
|
= help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, i32>`
note: future is not `Send` as this value is used across an await
--> src/lib.rs:7:5
|
4 | let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
| -------- has type `std::sync::MutexGuard<'_, i32>` which is not `Send`
...
7 | do_something_async().await;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `mut lock` maybe used later
8 | }
| - `mut lock` is later dropped here
Это происходит потому, что тип std::sync::MutexGuard
не является Send
. Это означает, что мы не можем отправить блокировку мьютекса в другой поток, и ошибка возникает, потому что Tokio может перемещать задачу между потоками при каждом .await
. Следовательно, нам нужно реструктурировать код таким образом, чтобы деструктор блокировки мьютекса запускался до .await
.
// Это работает
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
{
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
} // здесь `lock` выходит из области видимости
do_something_async().await;
}
Обратите внимание, что это не работает:
use std::sync::{Mutex, MutexGuard};
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
drop(lock);
do_something_async().await;
}
Это связано с тем, что компилятор вычисляет, является ли фьючер (future) Send
, только на основе информации об области видимости. Возможно, в будущем компилятор будет поддерживать явное удаление блокировки, но сейчас для этого необходимо использовать область видимости.
Не пытайтесь решить эту проблему путем создания задачи, которая не должна быть Send
, потому что если Tokio приостановит задачу в .await
, пока она удерживает блокировку, в том же потоке может быть запланировано выполнение другой задачи, которая также может попытаться заблокировать этот мьютекс, что приведет к взаимоблокировке, поскольку задача, ожидающая блокировки мьютекса, не позволит задаче, удерживающей мьютекс, освободить его.
Ниже мы обсудим некоторые подходы к решению этой проблемы.
Реструктуризация кода, чтобы не удерживать блокировку через .await
Мы уже видели один пример этого в приведенном выше фрагменте кода, но есть несколько более надежных способов. Например, можно обернуть мьютекс в структуру и блокировать его только внутри синхронных методов этой структуры:
use std::sync::Mutex;
struct CanIncrement {
mutex: Mutex<i32>,
}
impl CanIncrement {
// Эта функция является синхронной
fn increment(&self) {
let mut lock = self.mutex.lock().unwrap();
*lock += 1;
}
}
async fn increment_and_do_stuff(can_incr: &CanIncrement) {
can_incr.increment();
do_something_async().await;
}
Этот шаблон гарантирует, что мы не столкнемся с ошибкой Send
, поскольку защита мьютекса отсутствует в асинхронной функции.
Создание задачи для управления состоянием и использование передачи сообщений для работы с ней
Это второй подход, упомянутый в начале этого раздела, и он часто используется, когда общий ресурс является ресурсом ввода-вывода. Мы подробно поговорим о нем в следующем разделе.
Использование асинхронного мьютекса Токио
Также можно использовать тип tokio::sync::Mutex, предоставляемый Tokio. Основная особенность мьютекса Tokio заключается в том, что его можно без проблем удерживать в .await
. Тем не менее, асинхронный мьютекс обходится дороже, чем обычный, и лучше использовать один из двух других подходов.
// Мьютекс Tokio
use tokio::sync::Mutex;
// Это компилируется
// (но в данном случае реструктуризация кода будет лучшим решением)
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock = mutex.lock().await;
*lock += 1;
do_something_async().await;
} // здесь `lock` выходит из области видимости
Каналы
Возвращаемся к клиенту. Поместим код сервера, который мы написали, в отдельный двоичный файл:
mkdir src/bin
mv src/main.rs src/bin/server.rs
Создаем новый двоичный файл, который будет содержать код клиента:
touch src/bin/client.rs
Команда для запуска сервера:
cargo run --bin server
Команда для запуска клиента:
cargo run --bin client
Допустим, мы хотим запустить две команды Redis одновременно. Мы можем создать одну задачу для каждой команды. Тогда обе команды будут выполняться одновременно.
Мы могли бы написать что-то вроде этого:
use mini_redis::client;
#[tokio::main]
async fn main() {
// Подключаемся к серверу
let mut client = client::connect("127.0.0.1:6379").await.unwrap();
// Создаем две задачи: одна извлекает значение, другая устанавливает значение по ключу
let t1 = tokio::spawn(async {
let res = client.get("foo").await;
});
let t2 = tokio::spawn(async {
client.set("foo", "bar".into()).await;
});
t1.await.unwrap();
t2.await.unwrap();
}
Но этот код не скомпилируется, поскольку обеим задачам требуется доступ к клиенту. Поскольку Client
не реализует Copy
, он не будет компилироваться без кода, обеспечивающего возможность его совместного использования. Кроме того, Client::set
принимает &mut self
, что означает, что для его вызова требуется монопольный доступ к клиенту. Мы могли бы открывать соединение для каждой задачи, но это не оптимально. Мы не можем использовать std::sync::Mutex
, так как .await
нужно вызывать с удерживаемой блокировкой. Мы могли бы использовать tokio::sync::Mutex
, но это позволит выполнить только один текущий запрос. Если клиент реализует конвейерную обработку, асинхронный мьютекс приведет к недостаточному использованию (underutilizing) соединения.
Передача сообщений
Ответ заключается в использовании передачи сообщений. Шаблон предполагает создание специальной задачи для управления ресурсом client
. Любая задача, желающая обработать (issue) запрос, отправляет сообщение задаче client
. Задача client
выдает (issue) запрос от имени отправителя, и ответ отправляется отправителю (простите за тавтологию).
В этой стратегии устанавливается одно соединение. Задача, управляющая client
, может получить к нему монопольный доступ для вызова get
и set
. Кроме того, канал работает как буфер. Операции могут отправляться задаче client
, пока она занята. Как только задача client
освобождается (становится доступной для обработки новых запросов), она извлекает следующий запрос из канала. Это может привести к повышению пропускной способности и может быть расширено для поддержки пула соединений.
Примитивы каналов Tokio
Tokio предоставляет несколько каналов, которые служат разным целям:
- mpsc - канал с несколькими производителями (producer) и одним потребителем (consumer). Можно отправлять много значений
- oneshot - канал с одним производителем и одним потребителем. Можно отправлять одно значение
- broadcast - канал с несколькими производителями и потребителями. Можно отправлять несколько значений. Каждый получатель видит каждое значение
- watch - канал с одним производителем и несколькими потребителями. Можно отправлять много значений, но история не сохраняется. Каждый получатель видит только последнее значение
Если нам нужен многопользовательский канал с несколькими производителями, где каждое сообщение видит только один потребитель, можно использовать крейт async-channel. Существуют также синхронные каналы, например, std::sync::mpsc и crossbeam::channel. Эти каналы ждут сообщений, блокируя поток, что не разрешено в асинхронном коде.
В этом разделе мы будем использовать mpsc
и oneshot
. Другие типы каналов передачи сообщений рассматриваются в следующих разделах. Полный код из этого раздела можно найти здесь.