157 KiB
+++ title = "Async/Await" weight = 12 path = "async-await" date = 2020-03-27
[extra] chapter = "Multitasking"
Please update this when updating the translation
translation_based_on_commit = "f2966a53489a1c3eff3e5c3c1c82a3febb47569b"
GitHub usernames of the people that translated this post
translators = ["TakiMoysha"]
+++
В этом посте мы рассмотрим кооперативную многозадачность и возможности async/await в Rust. Мы подробно рассмотрим, как async/await работает в Rust, включая трейт Future, преобразование машины состояний и pinning. Затем мы добавим базовую поддержку async/await в наше ядро, by creating an asynchronous keyboard task and a basic executor.
Этот блог открыто разрабатывается на GitHub. Если у вас возникают проблемы или вопросы, пожалуйста, откройте issue. Также вы можете оставлять комментарии внизу. Исходный код этого поста можно найти в post-12 ветку.
Многозадачность
Одной из основных функций возможностей операционных систем является [многозадачность][multitasking], то есть возможность одновременного выполнения нескольких задач. Например, вероятно, пока вы читаете этот пост, у вас открыты другие программы, такие как текстовый редактор или окно терминала. Даже если у вас открыто только одно окно браузера, вероятно, в фоновом режиме выполняются различные задачи по управлению окнами рабочего стола, проверке обновлений или индексированию файлов.
Хотя кажется, что все задачи выполняются параллельно, на одном ядре процессора может выполняться только одна задача за раз. Чтобы создать иллюзию параллельного выполнения задач, операционная система быстро переключается между активными задачами, чтобы каждая из них могла выполнить небольшой прогресс. Поскольку компьютеры работают быстро, мы в большинстве случаев не замечаем этих переключений.
Когда одноядерные центральные процессоры (ЦП) могут выполнять только одну задачу за раз, многоядерные ЦП могут выполнять несколько задач по настоящему параллельно. Например, процессор с 8 ядрами может выполнять 8 задач одновременно. В следующей статье мы расскажем, как настроить многоядерные ЦП. В этой статье для простоты мы сосредоточимся на одноядерных процессорах. (Стоит отметить, что все многоядерные ЦП запускаются с одним активным ядром, поэтому пока мы можем рассматривать их как одноядерные процессоры).
Есть две формы многозадачности: кооперативная (совместная) - требует, чтобы задачи регулярно отдавали контроль над процессором для продвижения других задач; вытесняющая (приоритетная) - использующая функционал операционной системы (ОС) для переключения потоков в произвольные моменты моменты времени через принудительную остановку. Далее мы рассмотрим две формы многозадачности более подробно и обсудим их преимущества и недостатки.
Вытесняющая Многозадачность
Идея заключается в том, что ОС контролирует, когда переключать задачи. Для этого она использует факт того, что при каждом прирывании она восстанавливает контрлоль над ЦП. Это позволяет переключать задачи всякий раз, когда в системе появляется новый ввод. Например, возможность переключать задачи когда двигается мышка или приходят пакеты по сети. ОС также может определять точное время, в течении которого задаче разрешается выполняться, настроив аппаратный таймер на отправку прерывания по истечению этого времени.
На следующем рисунку показан процесс переключения задач при аппаратном прерывании:
На первой строке ЦП выполняет задачу A1 программы A. Все другие задачи приостановлены. На второй строке, наступает аппаратное прерывание. Как описанно в посте Аппаратные Прерывания, ЦП немедленно останавливает выполнение задачи A1 и переходит к обработчику прерываний, определенному в таблице векторов прерываний (Interrupt Descriptor Table, IDT). Благодаря этого обработчику прерывания ОС теперь снова обладает контролем над ЦП, что позволяет ей переключиться на задачу B1 вместо продолжения задачи A1.
Сохранение состояния
Поскольку задачи прерываются в произвольные моменты времени, они могут находиться в середине вычислений. Чтобы иметь возможность возобновить их позже, ОС должна создать копию всего состояния задачи, включая ее стек вызовов и значения всех регистров ЦП. Этот процесс называется переключением контекста.
Поскольку стек вызовов может быть очень большим, операционная система обычно создает отдельный стек вызовов для каждой задачи, вместо того чтобы сохранять содержимое стека вызовов при каждом переключении задач. Такая задача со своим собственным стеком называется потоком выполнения или сокращенно поток. Используя отдельный стек для каждой задачи, при переключении контекста необходимо сохранять только содержимое регистров (включая программный счетчик и указатель стека). Такой подход минимизирует накладные расходы на производительность при переключении контекста, что очень важно, поскольку переключения контекста часто происходят до 100 раз в секунду.
Обсуждение
Основным преимуществом вытесняющей многозадачности является то, что операционная система может полностью контролировать разрешенное время выполнения задачи. Таким образом, она может гарантировать, что каждая задача получит справедливую долю времени процессора, без необходимости полагаться на кооперацию задач. Это особенно важно при выполнении сторонних задач или когда несколько пользователей совместно используют одну систему.
Недостатком вытесняющей многозадачности является то, что каждой задаче требуется собственный стек. По сравнению с общим стеком это приводит к более высокому использованию памяти на задачу и часто ограничивает количество задач в системе. Другим недостатком является то, что ОС всегда должна сохранять полное состояние регистров ЦП при каждом переключении задач, даже если задача использовала только небольшую часть регистров.
Вытесняющая многозадачность и потоки - фундаментальные компонтенты ОС, т.к. они позволяют запускать недоверенные программы в userspace (run untrusted userspace programs) . Мы подробнее обсудим эти концепции в будущийх постах. Однако сейчас, мы сосредоточимся на кооперативной многозадачности, которая также предоставляет полезные возможности для нашего ядра.
Кооперативная Многозадачность
Вместо принудительной остановки выполняющихся задач в произвольные моменты времени, кооперативная многозадачность позволяет каждой задаче выполняться до тех пор, пока она добровольно не уступит контроль над ЦП. Это позволяет задачам самостоятельно приостанавливаться в удобные моменты времени, например, когда им нужно ждать операции ввода-вывода.
Кооперативная многозадачность часто используется на языковом уровне, например в виде сопрограмм или async/await. Идея в том, что программист или компилятор вставляет в программу операции yield, которые отказываются от управления ЦП и позволяют выполняться другим задачам. Например, yield может быть вставлен после каждой итерации сложного цикла.
Часто кооперативную многозадачность совмещают с асинхронными операциями. Вместо того чтобы ждать завершения операции и препятствовать выполнению других задач в это время, асинхронные операции возвращают статус «не готов», если операция еще не завершена. В этом случае ожидающая задача может выполнить операцию yield, чтобы другие задачи могли выполняться.
Сохранение состояния
Поскольку задачи сами определяют точки паузы, им не нужно, чтобы ОС сохраняла их состояние. Вместо этого они могут сохранять то состояние, которое необходимо для продолжения работы, что часто приводит к улучшению производительности. Например, задаче, которая только что завершила сложные вычисления, может потребоваться только резервное копирование конечного результата вычислений, т.к. промежуточные результаты ей больше не нужны.
Реализации кооперативных задач, поддерживаемые языком, часто даже могут сохранять необходимые части стека вызовов перед приостановкой. Например, реализация async/await в Rust сохраняет все локальные переменные, которые еще нужны, в автоматически сгенерированной структуре (см. ниже). Благодаря резервному копированию соответствующих частей стека вызовов перед приостановкой все задачи могут использовать один стек вызовов, что приводит к значительному снижению потребления памяти на задачу. Это позволяет создавать практически любое количество кооперативных задач без исчерпания памяти.
Обсуждение
Недостатком кооперативной многозадачности является то, что некооперативная задача может потенциально выполняться в течение неограниченного времени. Таким образом, вредоносная или содержащая ошибки задача может помешать выполнению других задач и замедлить или даже заблокировать работу всей системы. По этой причине кооперативная многозадачность должна использоваться только в том случае, если известно, что все задачи будут взаимодействовать друг с другом. В качестве противоположного примера можно привести то, что не стоит полагаться на взаимодействие произвольных программ пользовательского уровня в операционной системе.
Однако высокая производительность и преимущества кооперативной многозадачности в плане памяти делают ее хорошим подходом для использования внутри программы, особенно в сочетании с асинхронными операциями. Поскольку ядро операционной системы является программой, критичной с точки зрения производительности, которая взаимодействует с асинхронным оборудованием, кооперативная многозадачность кажется хорошим подходом для реализации параллелизма.
Async/Await в Rust
Rust предоставляет отличную поддержку кооперативной многозадачности в виде async/await. Прежде чем мы сможем изучить, что такое async/await и как оно работает, нам необходимо понять, как работают futures и асинхронное программирование в Rust.
Futures
Future представляет значение, которое может быть еще недоступно. Это может быть, например, целое число, вычисляемое другой задачей, или файл, загружаемый из сети. Вместо того, чтобы ждать, пока значение станет доступным, futures позволяют продолжить выполнение до тех пор, пока значение не понадобится.
Пример
Концепцию future лучше всего проиллюстрировать небольшим примером:
Эта диаграмма последовательности показывает функцию main, которая считывает файл из файловой системы, а затем вызывает функцию foo. Этот процесс повторяется дважды: один раз с синхронным вызовом read_file и один раз с асинхронным вызовом async_read_file.
При синхронном вызове функция main должна ждать, пока файл не будет загружен из файловой системы. Только после этого она может вызвать функцию foo, которая требует от нее снова ждать результата.
При асинхронном вызове async_read_file файловая система напрямую возвращает будущее значение и загружает файл асинхронно в фоновом режиме. Это позволяет функции main вызвать foo гораздо раньше, которая затем выполняется параллельно с загрузкой файла. В этом примере загрузка файла даже заканчивается до возврата foo, поэтому main может напрямую работать с файлом без дальнейшего ожидания после возврата foo.
Futures в Rust
В Rust, futures представленны трейтом Future, который выглядит так:
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}
Ассоциированный тип Output определяет тип асинхронного значения. Например, функция async_read_file на приведенной выше диаграмме вернет экземпляр Future с Output, установленным как File.
Метод poll позволяет проверить, доступно ли значение. Он возвращает перечисление Poll, которое выглядит следующим образом:
pub enum Poll<T> {
Ready(T),
Pending,
}
Когда значение уже доступно (например, файл был полностью прочитан с диска), оно возвращается, обернутое в вариант Ready. Иначе возвращается вариант Pending, который сигнализирует вызывающему, что значение еще не доступно.
Метод poll принимает два аргумента: self: Pin<&mut Self> и cx: &mut Context. Первый аргумент ведет себя аналогично обычной ссылке &mut self, за исключением того, что значение Self pinned к своему месту в памяти. Понять Pin и его необходимость сложно, не понимая сначала, как работает async/await. Поэтому мы объясним это позже в этом посте.
Параметр cx: &mut Context нужен для передачи экземпляра Waker в асинхронную задачу, например, загрузку файловой системы. Этот Waker позволяет асинхронной задаче сообщать о том, что она (или ее часть) завершена, например, что файл был загружен с диска. Поскольку основная задача знает, что она будет уведомлена, когда Future будет готов, ей не нужно повторно вызывать poll. Мы объясним этот процесс более подробно позже в этом посте, когда будем реализовывать наш собственный тип waker.
Working with Futures
Теперь мы знаем, как определяются футуры, и понимаем основную идею метода poll. Однако мы все еще не знаем, как эффективно работать с футурами. Проблема в том, что они представляют собой результаты асинхронных задач, которые могут быть еще недоступны. На практике, однако, нам часто нужны эти значения непосредственно для дальнейших вычислений. Поэтому возникает вопрос: как мы можем эффективно получить значение, когда оно нам нужно?
Waiting on Futures
Один из возможных ответов — дождаться, пока футура исполнится. Это может выглядеть примерно так:
let future = async_read_file("foo.txt");
let file_content = loop {
match future.poll(…) {
Poll::Ready(value) => break value,
Poll::Pending => {}, // ничего не делать
}
}
Здесь мы активно ждем футуру, вызывая poll снова и снова в цикле. Аргументы poll опущены, т.к. здесь они не имеют значения. Хотя это решение работает, оно очень неэффективно, потому что мы занимаем CPU до тех пор, пока значение не станет доступным.
Более эффективным подходом может быть блокировка текущего потока до тех пор, пока футура не станет доступной. Конечно, это возможно только при наличии потоков, поэтому это решение не работает для нашего ядра, по крайней мере, пока. Даже в системах, где поддерживается блокировка, она часто нежелательна, поскольку превращает асинхронную задачу в синхронную, тем самым сдерживая потенциальные преимущества параллельных задач в плане производительности.
Комбинаторы Future
Альтернативой ожиданию является использование комбинаторов future. Комбинаторы future - это методы вроде map, которые позволяют объединять и связывать future между собой, аналогично методам трейта Iterator. Вместо того чтобы ожидать выполнения future, эти комбинаторы сами возвращают future, которые применяет операцию преобразования при вызове poll.
Например, простой комбинатор string_len для преобразования Future<Output = String> в Future<Output = usize> может выглядеть так:
struct StringLen<F> {
inner_future: F,
}
impl<F> Future for StringLen<F> where F: Future<Output = String> {
type Output = usize;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
match self.inner_future.poll(cx) {
Poll::Ready(s) => Poll::Ready(s.len()),
Poll::Pending => Poll::Pending,
}
}
}
fn string_len(string: impl Future<Output = String>)
-> impl Future<Output = usize>
{
StringLen {
inner_future: string,
}
}
// Использование
fn file_len() -> impl Future<Output = usize> {
let file_content_future = async_read_file("foo.txt");
string_len(file_content_future)
}
Этот код не совсем корректен, потому что не учитывает pinning, но он подходит для примера. Основная идея в том, что функция string_len оборачивает переданный экземпляр Future в новую структуру StringLen, которая также реализует Future. При опросе обёрнутого future опрашивается внутренний future. Если значение ещё не готово, из обёрнутого future также возвращается Poll::Pending. Если значение готово, строка извлекается из варианта Poll::Ready, вычисляется её длина, после чего результат снова оборачивается в Poll::Ready и возвращается.
С помощью функции string_len можно вычислить длину асинхронной строки, не дожидаясь её завершения. Поскольку функция снова возвращает Future, вызывающий код не может работать с возвращённым значением напрямую, а должен использовать комбинаторы. Таким образом, весь граф вызовов становится асинхронным, и в какой-то момент (например, в основной функции) можно эффективно ожидать завершения нескольких future одновременно.
Так как ручное написание функций-комбинаторов сложно, они обычно предоставляются библиотеками. Стандартная библиотека Rust пока не содержит методов-комбинаторов, но полуофициальная (и совместимая с no_std) библиотека futures предоставляет их. Её трейт [FutureExt] включает высокоуровневые методы-комбинаторы, такие как [map] или [then], которые позволяют манипулировать результатом с помощью произвольных замыканий.
Преимущества
Большое преимущество future комбинаторов (future combinators) в том, что они сохраняют асинхронность. В сочетании с асинхронными интерфейсами ввода-вывода такой подход может обеспечить очень высокую производительность. То, что future кобинаторы реализованы как обычные структуры с имплементацией трейтов, позволяет компилятору чрезвычайно оптимизировать их. Подробнее см. в посте Futures с нулевой стоимостью в Rust, где было объявлено о добавлении futures в экосистему Rust.
Недостатки
Хотя future комбинаторы позволяют писать очень эффективный код, их может быть сложно использовать в некоторых ситуациях из-за системы типов и интерфейса на основе замыканий. Например, рассмотрим такой код:
fn example(min_len: usize) -> impl Future<Output = String> {
async_read_file("foo.txt").then(move |content| {
if content.len() < min_len {
Either::Left(async_read_file("bar.txt").map(|s| content + &s))
} else {
Either::Right(future::ready(content))
}
})
}
Здесь мы читаем файл foo.txt, а затем используем комбинатор [then], чтобы связать вторую футуру на основе содержимого файла. Если длина содержимого меньше заданного min_len, мы читаем другой файл bar.txt и добавляем его к content с помощью комбинатора [map]. В противном случае возвращаем только содержимое foo.txt.
Нам нужно использовать ключевое слово [move] для замыкания, передаваемого в then, иначе возникнет ошибка времени жизни (lifetime) для min_len. Причина использования обёртки Either заключается в том, что блоки if и else всегда должны возвращать значения одного типа. Поскольку в блоках возвращаются разные типы будущих значений, нам необходимо использовать обёртку, чтобы привести их к единому типу. Функция ready оборачивает значение в будущее, которое сразу готово к использованию. Здесь она необходима, потому что обёртка Either ожидает, что обёрнутое значение реализует Future.
Как можно догадаться, такой подход быстро приводит к очень сложному коду, особенно в крупных проектах. Ситуация ещё больше усложняется, если задействованы заимствования (borrowing) и разные времена жизни (lifetimes). Именно поэтому в Rust было вложено много усилий для добавления поддержки async/await — с целью сделать написание асинхронного кода радикально проще.
Паттерн Async/Await
Идея async/await заключается в том, чтобы позволить программисту писать код, который выглядит как обычный синхронный код, но превращается в асинхронный код компилятором. Это работает на основе двух ключевых слов async и await. Ключевое слово async можно использовать в сигнатуре функции для превращения синхронной функции в асинхронную функцию, возвращающую future:
async fn foo() -> u32 {
0
}
// примерно переводится компилятором в:
fn foo() -> impl Future<Output = u32> {
future::ready(0)
}
Одного этого ключевого слова недостаточно. Однако внутри функций async можно использовать ключевое слово await, чтобы получить асинхронное значение future:
async fn example(min_len: usize) -> String {
let content = async_read_file("foo.txt").await;
if content.len() < min_len {
content + &async_read_file("bar.txt").await
} else {
content
}
}
В данном примере async_read_file — это асинхронная функция, возвращающая будущее строки.
async fn example(min_len: usize) -> String {
let content = async_read_file("foo.txt").await;
if content.len() < min_len {
content + &async_read_file("bar.txt").await
} else {
content
}
}
Эта ф-ция - прямой перевод example написанной выше, которая использовала комбинаторные ф-ции. Используя оператор .await, мы можем получить значение future без необходимости использования каких-либо замыканий или типов Either. В результате, мы можем писать наш код так же, как если бы это был обычный синхронный код, с той лишь разницей, что это все еще асинхронный код.
Преобразованиe Конечных Автоматов (Машина состояний)
За кулисами компилятор преобразует тело ф-ции async в state machine с каждым вызовом .await, представляющим собой разное состояние. Для вышеуказанной ф-ции example, компилятор создает state machine с четырьмя состояниями.
Каждое состояние представляет собой точку остановки в функции. Состояния "Start" и "End", указывают на начало и конец выполнения ф-ции. Состояние "waiting on foo.txt" - функция в данный момент ждёт первого результата async_read_file. Аналогично, состояние "waiting on bar.txt" представляет остановку, когда ф-ция ожидает второй результат async_read_file.
Конечный автомат реализует trait Future делая каждый вызов poll возможным переход между состояниями:
Диаграмма использует стрелки для представления переключений состояний и ромбы для представления альтернативных путей. Например, если файл foo.txt не готов, то мы используется путь "no" переходя в состояние "waiting on foo.txt". Иначе, используется путь "да". Где маленький красный комб без подписи - ветвь ф-ции exmple, где if content.len() < 100.
Мы видим, что первый вызов poll запускает функцию и она выполняться до тех пор, пока у футуры не будет результата. Если все футуры на пути готовы, ф-ция может выполниться до состояния "end" , то есть вернуть свой результат, завернутый в Poll::Ready. В противном случае конечный автомат переходит в состояние ожидания и возвращает Poll::Pending. При следующем вызове poll машина состояний начинает с последнего состояния ожидания и повторяет последнюю операцию.
Сохранение состояния
Для продолжнеия работы с последнего состояния ожидания, автомат должен отслеживать текущее состояние внутри себя. Еще, он должен сохранять все переменные, которые необходимы для продолжнеия выполнения при следующем вызове poll. Здесь компилятор действительно может проявить себя: зная, когда используются те или иные переменные, он может автоматически создавать структуры с точным набором требуемых переменных.
Например, компилятор генерирует структуры для вышеприведенной ф-ции example:
// снова `example` что бы вам не пришлось прокручивать вверх
async fn example(min_len: usize) -> String {
let content = async_read_file("foo.txt").await;
if content.len() < min_len {
content + &async_read_file("bar.txt").await
} else {
content
}
}
// компиялтор генерирует структуры
struct StartState {
min_len: usize,
}
struct WaitingOnFooTxtState {
min_len: usize,
foo_txt_future: impl Future<Output = String>,
}
struct WaitingOnBarTxtState {
content: String,
bar_txt_future: impl Future<Output = String>,
}
struct EndState {}
В состояниях "start" и "waiting on foo.txt" необходимо сохранить параметр min_len для последующего сравнения с content.len(). Состояние "waiting on foo.txt" дополнительно содержит foo_txt_future, представляющий future возвращаемое вызовом async_read_file. Этe футуру нужно опросить снова, когда автомат продолжит свою работу, поэтому его нужно сохранить.
Состояние "waiting on bar.txt" содержит переменную content для последующей конкатенации строк при загрузке файла bar.txt. Оно также хранит bar_txt_future, представляющее текущую загрузку файла bar.txt. Эта структура не содержит переменную min_len, потому что она уже не нужна после проверки длины строки content.len(). В состоянии "end", в структуре ничего нет, т.к. ф-ция завершилась полностью.
Учтите, что приведенный здесь код - это только пример того, какая структура может быть сгенерирована компилятором Имена структур и расположение полей - детали реализации и могут отличаться.
Полный Конечный Автомат
При этом точно сгенерированный код компилятора является деталью реализации, это помогает понять, представив, как могла бы выглядеть машина состояний для функции example. Мы уже определили структуры, представляющие разные состояния и содержащие необходимые переменные. Чтобы создать машину состояний на их основе, мы можем объединить их в enum:
enum ExampleStateMachine {
Start(StartState),
WaitingOnFooTxt(WaitingOnFooTxtState),
WaitingOnBarTxt(WaitingOnBarTxtState),
End(EndState),
}
Мы определяем отдельный вариант перечисления (enum) для каждого состояния и добавляем соответствующую структуру состояния в каждый вариант как поле. Чтобы реализовать переходы между состояниями, компилятор генерирует реализацию trait'а Future на основе функции example:
impl Future for ExampleStateMachine {
type Output = String; // возвращает тип из `example`
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
loop {
match self { // TODO: handle pinning
ExampleStateMachine::Start(state) => {…}
ExampleStateMachine::WaitingOnFooTxt(state) => {…}
ExampleStateMachine::WaitingOnBarTxt(state) => {…}
ExampleStateMachine::End(state) => {…}
}
}
}
}
Тип Output будущего равен String, потому что это тип возвращаемого значения функции example. Для реализации метода poll мы используем условную инструкцию match на текущем состоянии внутри цикла. Идея в том, что мы переходим к следующему состоянию, пока это возможно, и явно возвращаем Poll::Pending, когда мы не можем продолжить.
Для упрощения мы представляем только упрощенный код и не обрабатываем закрепление, владения, lifetimes, и т.д. Поэтому этот и следующий код должны быть восприняты как псевдокод и не использоваться напрямую. Конечно, реальный генерируемый компилятором код обрабатывает всё верно, хотя возможно это будет сделано по-другому.
Чтобы сохранить примеры кода маленькими, мы представляем код для каждого варианта match отдельно. Начнем с состояния Start:
ExampleStateMachine::Start(state) => {
// из тела `example`
let foo_txt_future = async_read_file("foo.txt");
// операция`.await`
let state = WaitingOnFooTxtState {
min_len: state.min_len,
foo_txt_future,
};
*self = ExampleStateMachine::WaitingOnFooTxt(state);
}
Машина состояний находится в состоянии Start, когда она прямо в начале функции. В этом случае выполняем весь код из тела функции example до первого .await. Чтобы обработать операцию .await, мы меняем состояние машины на WaitingOnFooTxt, которое включает в себя построение структуры WaitingOnFooTxtState.
Пока match self {…} выполняется в цилке, выполнение прыгает к WaitingOnFooTxt:
ExampleStateMachine::WaitingOnFooTxt(state) => {
match state.foo_txt_future.poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(content) => {
// из тела `example`
if content.len() < state.min_len {
let bar_txt_future = async_read_file("bar.txt");
// операция `.await`
let state = WaitingOnBarTxtState {
content,
bar_txt_future,
};
*self = ExampleStateMachine::WaitingOnBarTxt(state);
} else {
*self = ExampleStateMachine::End(EndState);
return Poll::Ready(content);
}
}
}
}
В этом варианте match, вначале мы вызываем функцию poll для foo_txt_future. Если она не готова, мы выходим из цикла и возвращаем Poll::Pending. В этом случае self остается в состоянии WaitingOnFooTxt, следующий вызов функции poll на машине состояний попадёт в тот же match и повторит проверку готовности foo_txt_future.
Когда foo_txt_future готов, мы присваиваем результат переменной content и продолжаем выполнять код функции example: Если content.len() меньше сохранённого в структуре состояния min_len, файл bar.txt читается асинхронно. Мы ещё раз переводим операцию .await в изменение состояния, теперь в состояние WaitingOnBarTxt. Следуя за выполнением match внутри цикла, выполнение прямо переходит к варианту match для нового состояния позже, где проверяется готовность bar_txt_future.
В случае входа в ветку else, более никаких операций .await не происходит. Мы достигаем конца функции и возвращаем content обёрнутую в Poll::Ready. Также меняем текущее состояние на End.
Код для состояния WaitingOnBarTxt выглядит следующим образом:
ExampleStateMachine::WaitingOnBarTxt(state) => {
match state.bar_txt_future.poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(bar_txt) => {
*self = ExampleStateMachine::End(EndState);
// из тела `example`
return Poll::Ready(state.content + &bar_txt);
}
}
}
Аналогично состоянию WaitingOnFooTxt, мы начинаем с проверки готовности bar_txt_future. Если она ещё не готова, мы выходим из цикла и возвращаем Poll::Pending. В противном случае, мы можем выполнить последнюю операцию функции example: конкатенацию переменной content с результатом футуры. Обновляем машину состояний в состояние End и затем возвращаем результат обёрнутый в Poll::Ready.
В итоге, код для End состояния выглядит так:
ExampleStateMachine::End(_) => {
panic!("poll вызван после возврата Poll::Ready");
}
Футуры не должны повторно проверяться после того, как они вернули Poll::Ready, поэтому паникуем, если вызвана функция poll, когда мы уже находимся в состоянии End.
Теперь мы знаем, что сгенерированная машина состояний и ее реализация интерфейса Future могла бы выглядеть так. На практике компилятор генерирует код по-другому. (Если вас заинтересует, то реализация ныне основана на корутинах, но это только деталь имплементации.)
Последняя часть загадки – сгенерированный код для самой функции example. Помните, что заголовок функции был определён следующим образом:
async fn example(min_len: usize) -> String
Теперь, когда весь функционал реализуется машиной состояний, единственное, что ф-ция должна сделать - это инициализировать эту машику и вернуть ее. Сгенерированный код для этого может выглядеть следующим образом:
fn example(min_len: usize) -> ExampleStateMachine {
ExampleStateMachine::Start(StartState {
min_len,
})
}
Функция больше не имеет модификатора async, поскольку теперь явно возвращает тип ExampleStateMachine, который реализует трейт Future. Как ожидалось, машина состояний создается в состоянии start и соответствующая ему структура состояния инициализируется параметром min_len.
Заметьте, что эта функция не запускает выполнение машины состояний. Это фундаментальное архитектурное решение для футур в Rust: они ничего не делают, пока не будет произведена первая проверка на готовность.
Pinning
[!note] Закрепление (pinning, пиннинг)
Мы уже несколько раз столкнулись с понятием закрепления (pinnig, пиннинг) в этом посте. Наконец, время чтобы изучить, что такое закрепление и почему оно необходимо.
[!note] pinning - механизм, который гарантирует, что объект в памяти не будет перемещен.
Самоссылающиеся структуры
Как объяснялось выше, переходы конечных автоматов хранят локальные переменные для каждой точки остановки в структуре. Для простых примеров, как наш example функции, это было просто и не привело к никаким проблемам. Однако делаются сложнее, когда переменные ссылаются друг на друга. Например, рассмотрите следующую функцию:
async fn pin_example() -> i32 {
let array = [1, 2, 3];
let element = &array[2];
async_write_file("foo.txt", element.to_string()).await;
*element
}
Эта функция создает маленький array с содержимым 1, 2, и 3. Затем она создает ссылку на последний элемент массива и хранит ее в переменной element. Далее, асинхронно записывает число, преобразованное в строку, в файл foo.txt. В конце, возвращает число, ссылка на которое хранится в element.
Следуя своей единственной операции await, машина состояний состоит из трех состояний: start, end и "waiting on write". Функция не принимает аргументов, поэтому структура для начального состояния пуста. Как обычно, структура для конечного состояния также пустая, поскольку функция завершена на этом этапе. Структура для "waiting on write" более интересна:
struct WaitingOnWriteState {
array: [1, 2, 3],
element: 0x1001c, // адрес последнего элемента в array
}
Мы должны хранить как array, так и element потому что element требуется для значения возврата, а array ссылается на element. Следовательно, element является указателем (pointer) (адресом памяти), который хранит адрес ссылаемого элемента. В этом примере мы использовали 0x1001c в качестве примера адреса, в реальности он должен быть адресом последнего элемента поля array, что зависит от места расположения структуры в памяти. Структуры с такими внутренними указателями называются самоссылочными (self-referential) структурами, потому что они ссылаются на себя из одного из своих полей.
Проблемы с Самоссылочными Структурами
Внутренний указатель нашей самоссылочной структуры приводит к базовой проблеме, которая становится очевидной, когда мы посмотрим на её раскладку памяти:
Поле array начинается в адресе 0x10014, а поле element - в адресе 0x10020. Оно указывает на адрес 0x1001c, потому что последний элемент массива находится там. В этот момент все ещё в порядке. Однако проблема возникает, когда мы перемещаем эту структуру на другой адрес памяти:
Мы переместили структуру немного так, чтобы она теперь начиналась в адресе 0x10024. Это могло произойти, например, когда мы передаем структуру как аргумент функции или присваиваем ей другое переменной стека. Проблема заключается в том, что поле element все ещё указывает на адрес 0x1001c, хотя последний элемент массива теперь находится в адресе 0x1002c. Поэтому указатель висит, с результатом неопределённого поведения на следующем вызове poll.
Возможные решения
Существует три основных подхода к решению проблемы висящих указателей (dangling pointers):
- Обновление указателя при перемещении: Идея состоит в обновлении внутреннего указателя при каждом перемещении структуры в памяти, чтобы она оставалась действительной после перемещения. Однако этот подход требует значительных изменений в Rust, которые могут привести к потенциальным значительным потерям производительности. Причина заключается в том, что необходимо каким-то образом отслеживать тип всех полей структуры и проверять на каждом операции перемещения, требуется ли обновление указателя.
- Хранение смещения (offset) вместо самоссылающихся ссылок: Чтобы избежать необходимости обновления указателей, компилятор мог бы попытаться хранить самоссы ссылки в форме смещений от начала структуры вместо прямых ссылок. Например, поле
elementвышеупомянутойWaitingOnWriteStateструктуры можно было бы хранить в виде поляelement_offsetc значением 8, потому что элемент массива, на который указывает ссылка, находится за 8 байтов после начала структуры. Смещение остается неизменным при перемещении структуры, так что не требуются обновления полей. Проблема с этим подходом в том, что требуется, чтобы компилятор обнаружил всех самоссылок. Это невозможно на этапе компилящии потому, что значение ссылки может зависеть от ввода пользователя, так что нам потребуется система анализа ссылок и корректная генерация состояния для структур во время исполнения. Это приведёт к дополнительным расходам времени на выполнение, а также предотвратит определённые оптимизации компилятора, что приведёт к еще большим потерям производительности. - Запретить перемещать структуру: Мы увидели выше, что висящий указатель возникает только при перемещении структуры в памяти. Запретив все операции перемещения для самоссылающихся структур, можно избежать этой проблемы. Большое преимущество этого подхода состоит в том, что он можно реализовать на уровне системы типов без дополнительных расходов времени выполнения. Недостаток заключается в том, что оно возлагает на программиста обязанности по обработке перемещений самоссылающихся структур.
Rust выбрал третий подход из-за принципа предоставления бесплатных абстракций (zero cost abstractions), что означает, что абстракции не должны накладывать дополнительные расходы времени выполнения. API pinning предлагалось для решения этой проблемы в RFC 2349 (https://github.com/rust-lang/rfcs/blob/master/text/2349-pin.md). В следующем разделе мы дадим краткий обзор этого API и объясним, как оно работает с async/await и futures.
Значения на Куче (Heap)
Первый наблюдение состоит в том, что значения, выделенные на [куче], обычно имеют фиксированный адрес памяти. Они создаются с помощью вызова allocate и затем ссылаются на тип указателя, такой как Box<T>. Хотя перемещение указательного типа возможно, значение кучи, которое указывает на него, остается в том же адресе памяти до тех пор, пока оно не будет освобождено с помощью вызова deallocate еще раз.
Используя аллокацию на куче, можно попытаться создать самоссылающуюся структуру:
fn main() {
let mut heap_value = Box::new(SelfReferential {
self_ptr: 0 as *const _,
});
let ptr = &*heap_value as *const SelfReferential;
heap_value.self_ptr = ptr;
println!("heap value at: {:p}", heap_value);
println!("internal reference: {:p}", heap_value.self_ptr);
}
struct SelfReferential {
self_ptr: *const Self,
}
Мы создаем простую структуру с названием SelfReferential, которая содержит только одно поле c указателем. Во-первых, мы инициализируем эту структуру с пустым указателем и затем выделяем ее на куче с помощью Box::new. Затем мы определяем адрес кучи для выделенной структуры и храним его в переменной ptr. В конце концов, мы делаем структуру самоссылающейся, назначив переменную ptr полю self_ptr.
Когда мы запускаем этот код в песочнице, мы видим, что адрес на куче и внутренний указатель равны, что означает, что поле self_ptr валидное. Поскольку переменная heap_value является только указателем, перемещение его (например, передачей в функцию) не изменяет адрес самой структуры, поэтому self_ptr остается действительным даже при перемещении указателя.
Тем не менее, все еще есть путь сломать этот пример: мы можем выйти из Box<T> или изменить содержимое:
let stack_value = mem::replace(&mut *heap_value, SelfReferential {
self_ptr: 0 as *const _,
});
println!("value at: {:p}", &stack_value);
println!("internal reference: {:p}", stack_value.self_ptr);
Мы используем функцию mem::replace, чтобы заменить значение, выделенное в куче, новым экземпляром структуры. Это позволяет нам переместить исходное значение heap_value в стек, в то время как поле self_ptr структуры теперь является висящим указателем, который по-прежнему указывает на старый адрес в куче. Когда вы запустите пример в песочнице, вы увидите, что строки «value at:» и «internal reference:», показывают разные указатели. Таким образом, выделение значения в куче недостаточно для обеспечения безопасности самоссылок.
Основная проблема, которая привела к вышеуказанной ошибке, заключается в том, что Box<T> позволяет нам получить ссылку &mut T на значение, выделенное в куче. Эта ссылка &mut позволяет использовать такие методы, как mem::replace или mem::swap, для аннулирования значения, выделенного в куче. Чтобы решить эту проблему, мы должны предотвратить создание ссылок &mut на самореференциальные структуры.
Pin<Box<T>> и Unpin
API закрепления предоставляет решение проблемы &mut T в виде типа-обертки Pin и трейта-маркера Unpin. Идея использования - ограничить все методы Pin, которые могут быть использованы для получения ссылок &mut на обернутое значение (например, get_mut или deref_mut), на трейт Unpin. Трейт Unpin является авто трейтом (auto trait), который автоматически реализуется для всех типов, за исключением тех, которые явно отказываются от него. Заставляя самореференциальные структуры отказаться от Unpin, не остается (безопасного) способа получить &mut T из типа Pin<Box<T>> для них. В результате их внутренние самореференции гарантированно остаются действительными.
Как пример обновим тип SelfReferential тип из примера выше, что бы отказаться от Unpin:
use core::marker::PhantomPinned;
struct SelfReferential {
self_ptr: *const Self,
_pin: PhantomPinned,
}
Мы отказываемся от Unpin, добавляя второе поле _pin типа PhantomPinned. Этот тип является маркерным типом нулевого размера, единственной целью которого является отказ от реализации трейта Unpin. Из-за того, как работают [авто трейты], одного поля, которое не является Unpin, достаточно, чтобы полностью исключить структуру из Unpin.
Второй шаг — изменить тип Box<SelfReferential> в примере на Pin<Box<SelfReferential>>. Самый простой способ сделать это — использовать функцию Box::pin вместо Box::new для создания значения, размещаемого в куче:
let mut heap_value = Box::pin(SelfReferential {
self_ptr: 0 as *const _,
_pin: PhantomPinned,
});
В дополнение к изменению Box::new на Box::pin, нам также нужно добавить новое поле _pin в инициализатор структуры. Т.к. PhantomPinned является типом нулевого размера, нам нужно только его имя типа для инициализации.
Когда мы попробуем запустить наш скорректированный пример сейчас, он больше не работает:
error[E0594]: cannot assign to data in a dereference of `std::pin::Pin<std::boxed::Box<SelfReferential>>`
--> src/main.rs:10:5
|
10 | heap_value.self_ptr = ptr;
| ^^^^^^^^^^^^^^^^^^^^^^^^^ cannot assign
|
= help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `std::pin::Pin<std::boxed::Box<SelfReferential>>`
error[E0596]: cannot borrow data in a dereference of `std::pin::Pin<std::boxed::Box<SelfReferential>>` as mutable
--> src/main.rs:16:36
|
16 | let stack_value = mem::replace(&mut *heap_value, SelfReferential {
| ^^^^^^^^^^^^^^^^ cannot borrow as mutable
|
= help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `std::pin::Pin<std::boxed::Box<SelfReferential>>`
Обе ошибки возникают потому, что тип Pin<Box<SelfReferential>> больше не реализует трейт DerefMut. Это именно то, чего мы хотели, поскольку трейт DerefMut возвращал бы ссылку &mut, что мы и хотели предотвратить. Это происходит только потому, что мы отказались от Unpin и изменили Box::new на Box::pin.
Теперь проблема в том, что компилятор не только предотвращает перемещение типа в строке 16, но и запрещает инициализацию поля self_ptr в строке 10. Это происходит потому, что компилятор не может различить допустимые и недопустимые использования ссылок &mut. Чтобы инициализация снова заработала, нам нужно использовать небезопасный метод get_unchecked_mut:
// безопасно, т.к. изменение поля не перемещает всю структуру
unsafe {
let mut_ref = Pin::as_mut(&mut heap_value);
Pin::get_unchecked_mut(mut_ref).self_ptr = ptr;
}
Функция get_unchecked_mut работает с Pin<&mut T> вместо Pin<Box<T>>, поэтому нам нужно использовать Pin::as_mut для преобразования значения. Затем мы можем установить поле self_ptr, используя ссылку &mut, возвращаемую get_unchecked_mut.
Теперь единственной оставшейся ошибкой является желаемая ошибка на mem::replace. Помните, что эта операция пытается переместить значение, размещённое в куче, на стек, что нарушило бы самоссылку, хранящуюся в поле self_ptr. Отказываясь от Unpin и используя Pin<Box<T>>, мы можем предотвратить эту операцию на этапе компиляции и таким образом безопасно работать с самоссыльными структурами. Как мы видели, компилятор не может доказать, что создание самоссылки безопасно (пока), поэтому нам нужно использовать небезопасный блок и самостоятельно проверить корректность.
Пиннинг на стеке и Pin<&mut T>
В предыдущем разделе мы узнали, как использовать Pin<Box<T>> для безопасного создания самоссыльного значения, размещённого в куче. Хотя этот подход работает хорошо и относительно безопасен (кроме unsafe), необходимая аллокация в куче бьет по производительности. Поскольку Rust стремится предоставлять абстракции с нулевыми затратами (zero-cost abstractions) где это возможно, API закрепления также позволяет создавать экземпляры Pin<&mut T>, которые указывают на значения, размещённые на стеке.
В отличие от экземпляров Pin<Box<T>>, которые имеют владение обёрнутым значением, экземпляры Pin<&mut T> лишь временно заимствуют обёрнутое значение. Это усложняет задачу, так как программисту необходимо самостоятельно обеспечивать дополнительные гарантии. Важно, чтобы Pin<&mut T> оставался закрепленным на протяжении всей жизни ссылочного T, что может быть сложно проверить для переменных на стеке. Чтобы помочь с этим, существуют такие крейты, как pin-utils, но я все же не рекомендую закреплять на стеке, если вы не уверены в своих действиях.
Что бы узнать большое обратитесь к документации модуля pin и метода Pin::new_unchecked.
Пиннинг и Футуры
Как мы уже увидели в этом посте, метод Future::poll использует пиннинг в виде параметра Pin<&mut Self>:
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>
Причина, по которой этот метод принимает self: Pin<&mut Self> вместо обычного &mut self в том, что экземпляры футур, созданные через async/await, часто являются самоссыльными, как мы видели выше. Оборачивая Self в Pin и позволяя компилятору отказаться от Unpin для самоссыльных футур, генерируемых из async/await, гарантируется, что футуры не будут перемещены в памяти между вызовами poll. Это обеспечивает сохранность всех внутренних ссылок.
Стоит отметить, что перемещение футур до первого вызова poll допустимо. Это связано с тем, что футуры являются ленивыми и ничего не делают, пока их не вызовут в первый раз. Состояние start сгенерированных конечных автоматов, следовательно, содержит только аргументы функции, но не внутренние ссылки. Чтобы вызвать poll, вызывающему необходимо сначала обернуть фьючерс в Pin, что гарантирует, что фьючерс больше не может быть перемещён в памяти. Поскольку пиннинг на стеке сложнее сделать правильно, я рекомендую всегда использовать Box::pin в сочетании с Pin::as_mut для этого.
Если вас интересует, как безопасно реализовать комбинатора футур с использованием закрепления на стеке, взгляните на относительно короткий исходный код метода комбинатора map из крейта futures и раздел о projections and structural pinning в документации pin.
Executors and Wakers
Используя async/await, можно эргономично работать с футурами в полностью асинхронном режиме. Однако, как мы узнали выше, футуры ничего не делают, пока их не вызовут. Это означает, что нам нужно в какой-то момент вызвать poll, иначе асинхронный код никогда не будет выполнен.
Запуская одну футуры, мы можем вручную ожидать ее исполнения в цикле, как описано выше. Однако этот подход очень неэффективен и непрактичен для программ, создающих большое количество футур. Наиболее распространённым решением этой проблемы является определение глобального исполнителя, который отвечает за опрос всех футур в системе, пока они не завершатся.
Executors
Цель исполнителя в том, чтобы позволить создавать футуры в качестве независимых задач, обычно через какой-либо метод spawn. Исполнитель затем отвечает за опрос всех футур, пока они не завершатся. Большое преимущество управления всеми футурами в одном месте состоит в том, что исполнитель может переключаться на другую футуру, когда текущая футура возвращает Poll::Pending. Таким образом, асинхронные операции выполняются параллельно, и процессор остаётся загруженным.
Многие реализации исполнителей также могут использовать преимущества систем с несколькими ядрами процессора. Они создают thread pool, способный использовать все ядра, если достаточно работы, и применяют такие техники, как work stealing, для балансировки нагрузки между ядрами. Существуют также специальные реализации исполнителей для встроенных систем, которые оптимизируют низкую задержку и затраты памяти.
Чтобы избежать накладных расходов на повторный опрос футур, исполнители обычно используют API waker, поддерживаемый футурами Rust.
Wakers
Идея API waker в том, что специальный тип Waker передаётся в каждом вызове poll, при этом обернутый в тип Context. Этот тип Waker создаётся исполнителем и может использоваться асинхронной задачей для сигнализации о своём (частичном) завершении. В результате исполнитель не должен вызывать poll на футуре, которая ранее вернула Poll::Pending, пока не получит уведомление от соответствующего waker.
Лучше всего иллюстрируется небольшим примером:
async fn write_file() {
async_write_file("foo.txt", "Hello").await;
}
Эта функция асинхронно записывает строку "Hello" в файл foo.txt. Поскольку запись на жёсткий диск занимает некоторое время, первый вызов poll на этой футуре, вероятно, вернёт Poll::Pending. Однако драйвер жёсткого диска внутри будет хранить Waker, переданный в вызов poll, и использовать его для уведомления исполнителя, когда файл будет записан на диск. Таким образом, исполнитель не тратит время на poll футуры, пока не получит уведомление от waker.
Мы увидим, как работает тип Waker в деталях, когда создадим свой собственный исполнитель с поддержкой waker в разделе реализации этого поста.
Cooperative Multitasking?
В начале этого поста мы говорили о вытесняющей (preemptive) и кооперативной многозадачности. В то время как вытесняющая многозадачность полагается на операционную систему для принудительного переключения между выполняемыми задачами, кооперативная многозадачность требует, чтобы задачи добровольно уступали контроль над CPU через операцию yield на регулярной основе. Большое преимущество кооперативного подхода в том, что задачи могут сохранять своё состояние самостоятельно, что приводит к более эффективным переключениям контекста и делает возможным совместное использование одного и того же стека вызовов между задачами.
Это может не быть сразу очевидным, но футуры и async/await представляют собой реализацию кооперативного паттерна многозадачности:
- Каждая футура, добавляемая в исполнитель, по сути является кооперативной задачей.
- Вместо использования явной операции yield, футуры уступают контроль над ядром CPU, возвращая
Poll::Pending(илиPoll::Readyв конце).- Нет ничего, что заставляло бы футуру уступать CPU. Если они захотят, они могут никогда не возвращаться из
poll, например, бесконечно выполняя цикл. - Поскольку каждая футура может блокировать выполнение других футур в исполнителе, нам нужно доверять им, чтобы они не были вредоносными (malicious).
- Нет ничего, что заставляло бы футуру уступать CPU. Если они захотят, они могут никогда не возвращаться из
- Футуры внутренне хранят всё состояние, необходимое для продолжения выполнения при следующем вызове
poll. При использовании async/await компилятор автоматически определяет все переменные, которые необходимы, и сохраняет их внутри сгенерированной машины состояний.- Сохраняется только минимально необходимое состояние для продолжения.
- Поскольку метод
pollотдает стек вызовов при возврате, тот же стек может использоваться для опроса других футур.
Мы видим, что футуры и async/await идеально соответствуют паттерну кооперативной многозадачности; они просто используют другую терминологию. В дальнейшем мы будем использовать термины "задача" и "футура" взаимозаменяемо.
Implementation
Теперь, когда мы понимаем, как работает кооперативная многозадачность на основе футур и async/await в Rust, пора добавить поддержку этого в наш ядро. Поскольку трейт Future является частью библиотеки core, а async/await — это особенность самого языка, нам не нужно делать ничего особенного, чтобы использовать его в нашем #![no_std] ядре. Единственное требование — использовать, как минимум, nightly версию Rust от 2020-03-25, поскольку до этого времени async/await не поддерживала no_std.
С достаточно свежей nightly версией мы можем начать использовать async/await в нашем main.rs:
// in src/main.rs
async fn async_number() -> u32 {
42
}
async fn example_task() {
let number = async_number().await;
println!("async number: {}", number);
}
Функция async_number является async fn, поэтому компилятор преобразует её в машину состояний, реализующую Future. Поскольку функция возвращает только 42, результирующая футура непосредственно вернёт Poll::Ready(42) при первом вызове poll. Как и async_number, функция example_task также является async fn. Она ожидает число, возвращаемое async_number, а затем выводит его с помощью макроса println.
Чтобы запустить футуру, которую вернул example_task, нам нужно вызывать poll на ней, пока он не сигнализирует о своём завершении, возвращая Poll::Ready. Для этого нам нужно создать простой тип исполнителя.
Task
Перед тем как начать реализацию исполнителя, мы создаем новый модуль task с типом Task:
// in src/lib.rs
pub mod task;
// in src/task/mod.rs
use core::{future::Future, pin::Pin};
use alloc::boxed::Box;
pub struct Task {
future: Pin<Box<dyn Future<Output = ()>>>,
}
Структура Task является обёрткой вокруг закрепленной, размещённой в куче и динамически диспетчеризуемой футуры с пустым типом () в качестве выходного значения. Давайте разберём её подробнее:
- Мы требуем, чтобы футура, связанная с задачей, возвращала
(). Это означает, что задачи не возвращают никаких результатов, они просто выполняются для побочных эффектов. Например, функцияexample_task, которую мы определили выше, не имеет возвращаемого значения, но выводит что-то на экран как побочный эффект (side effect). - Ключевое слово
dynуказывает на то, что мы храним trait object вBox. Это означает, что методы на футуре диспетчеризуются динамически, позволяя хранить в типеTaskразные типы футур. Это важно, поскольку каждаяasync fnимеет свой собственный тип, и мы хотим иметь возможность создавать несколько разных задач. - Как мы узнали в разделе о закреплении, тип
Pin<Box>обеспечивает, что значение не может быть перемещено в памяти, помещая его в кучу и предотвращая создание&mutссылок на него. Это важно, потому что фьючерсы, генерируемые async/await, могут быть самоссыльными, т.е. содержать указатели на себя, которые станут недействительными, если футура будет перемещена.
Чтобы разрешить создание новых структур Task из фьючерсов, мы создаём функцию new:
// in src/task/mod.rs
impl Task {
pub fn new(future: impl Future<Output = ()> + 'static) -> Task {
Task {
future: Box::pin(future),
}
}
}
Функция принимает произвольную футуру с выходным типом () и закрепляет его в памяти через Box::pin. Затем она оборачивает упакованную футуру в структуру Task и возвращает ее. Здесь нужно время жизни'static, т.к. возвращаемый Task может жить произвольное время, следовательно, футура также должна быть действительнтой в течение этого времени.
Мы также добавляем метод poll, чтобы позволить исполнителю опрашивать хранимую футуру:
// in src/task/mod.rs
use core::task::{Context, Poll};
impl Task {
fn poll(&mut self, context: &mut Context) -> Poll<()> {
self.future.as_mut().poll(context)
}
}
Поскольку метод poll трейта Future ожидает вызова на типе Pin<&mut T>, мы сначала используем метод Pin::as_mut, чтобы преобразовать поле self.future типа Pin<Box<T>>. Затем мы вызываем poll на преобразованном поле self.future и возвращаем результат. Поскольку метод Task::poll должен вызываться только исполнителем, который мы создадим через мгновение, мы оставляем функцию приватной для модуля task.
Simple Executor
Поскольку исполнители могут быть довольно сложными, мы намеренно начинаем с создания очень базового исполнителя, прежде чем реализовывать более продвинутого. Для этого мы сначала создаём новый подмодуль task::simple_executor:
// in src/task/mod.rs
pub mod simple_executor;
// in src/task/simple_executor.rs
use super::Task;
use alloc::collections::VecDeque;
pub struct SimpleExecutor {
task_queue: VecDeque<Task>,
}
impl SimpleExecutor {
pub fn new() -> SimpleExecutor {
SimpleExecutor {
task_queue: VecDeque::new(),
}
}
pub fn spawn(&mut self, task: Task) {
self.task_queue.push_back(task)
}
}
Структура содержит единственное поле task_queue типа VecDeque, которое по сути является вектором, позволяющим выполнять операции добавления и удаления с обоих концов. Идея в том, что мы можем вставлять новые задачи через метод spawn в конец и извлекаем следующую задачу для выполнения из начала. Таким образом, мы получаем простую FIFO очередь ("первый пришёл — первый вышел").
Dummy Waker
Чтобы вызвать метод poll, нам нужно создать тип Context, который оборачивает тип Waker. Начнём с простого: мы сначала создадим заглушку waker, которая ничего не делает. Для этого мы создаём экземпляр RawWaker, который определяет реализацию различных методов Waker, а затем используем функцию Waker::from_raw, чтобы превратить его в Waker:
// in src/task/simple_executor.rs
use core::task::{Waker, RawWaker};
fn dummy_raw_waker() -> RawWaker {
todo!();
}
fn dummy_waker() -> Waker {
unsafe { Waker::from_raw(dummy_raw_waker()) }
}
Функция from_raw является небезопасной, может быть неопределенное поведение (undefined behavior), если программист не соблюдает документированные требования к RawWaker. Прежде чем мы рассмотрим реализацию функции dummy_raw_waker, давайте сначала попытаемся понять, как работает тип RawWaker.
RawWaker
Тип RawWaker требует от программиста явного определения таблицы виртуальных методов (vtable), которая указывает функции, которые должны быть вызваны при клонировании (cloned), пробуждении (woken) или удалении (droppen) RawWaker. Расположение этой vtable определяется типом RawWakerVTable. Каждая функция получает аргумент *const (), который является type-erased указателем на некоторое значение. Причина использования указателя *const () вместо правильной ссылки в том, что тип RawWaker должен быть non-generic, но при этом поддерживать произвольные типы. Указатель передается в аргументе data ф-ции RawWaker::new, которая просто инициализирует RawWaker. Затем Waker использует этот RawWaker, чтобы вызывать функции vtable с data.
Как правило, RawWaker создаётся для какой-то структуры, размещённой в куче, которая обёрнута в тип Box или Arc. Для таких типов можно использовать методы, такие как Box::into_raw, чтобы преобразовать Box<T> в указатель *const T. Этот указатель затем можно привести к анонимному указателю *const () и передать в RawWaker::new. Поскольку каждая функция vtable получает один и тот же *const () в качестве аргумента, функции могут безопасно привести указатель обратно к Box<T> или &T, чтобы работать с ним. Как вы можете себе представить, этот процесс крайне опасен и легко может привести к неопределённому поведению в случае ошибок. По этой причине вручную создавать RawWaker не рекомендуется, если это не является необходимым.
A Dummy RawWaker
Хотя вручную создавать RawWaker не рекомендуется, в настоящее время нет другого способа создать заглушку Waker, которая ничего не делает. К счастью, тот факт, что мы хотим ничего не делать, делает реализацию функции dummy_raw_waker относительно безопасной:
// in src/task/simple_executor.rs
use core::task::RawWakerVTable;
fn dummy_raw_waker() -> RawWaker {
fn no_op(_: *const ()) {}
fn clone(_: *const ()) -> RawWaker {
dummy_raw_waker()
}
let vtable = &RawWakerVTable::new(clone, no_op, no_op, no_op);
RawWaker::new(0 as *const (), vtable)
}
Сначала мы определяем две внутренние функции с именами no_op и clone. Функция no_op принимает указатель *const () и ничего не делает. Функция clone также принимает указатель *const () и возвращает новый RawWaker, снова вызывая dummy_raw_waker. Мы используем эти две функции для создания минимальной RawWakerVTable: функция clone используется для операций клонирования, а функция no_op — для всех остальных операций. Поскольку RawWaker ничего не делает, не имеет значения, что мы возвращаем новый RawWaker из clone вместо его клонирования.
После создания vtable мы используем функцию RawWaker::new для создания RawWaker. Переданный *const () не имеет значения, поскольку ни одна из функций vtable не использует его. По этой причине мы просто передаем нулевой указатель.
A run Method
Теперь у нас есть способ создать экземпляр Waker, и мы можем использовать его для реализации метода run в нашем исполнителе. Самый простой метод run — это многократный опрос всех задач в очереди в цикле до тех пор, пока все они не будут выполнены. Это не очень эффективно, так как не использует уведомления от Waker, но это простой способ запустить эти штуки:
// in src/task/simple_executor.rs
use core::task::{Context, Poll};
impl SimpleExecutor {
pub fn run(&mut self) {
while let Some(mut task) = self.task_queue.pop_front() {
let waker = dummy_waker();
let mut context = Context::from_waker(&waker);
match task.poll(&mut context) {
Poll::Ready(()) => {} // task готов
Poll::Pending => self.task_queue.push_back(task),
}
}
}
}
Функция использует цикл while let, чтобы обработать все задачи в task_queue. Для каждой задачи сначала создаётся тип Context, оборачивая экземпляр Waker, возвращаемый нашей функцией dummy_waker. Затем вызывается метод Task::poll с этим context. Если метод poll возвращает Poll::Ready, задача завершена, и мы можем продолжить с следующей задачей. Если задача всё ещё Poll::Pending, мы добавляем её в конец очереди, чтобы она была опрошена снова в следующей итерации цикла.
Trying It
С нашим типом SimpleExecutor мы теперь можем попробовать запустить задачу, возвращаемую функцией example_task, в нашем main.rs:
// in src/main.rs
use blog_os::task::{Task, simple_executor::SimpleExecutor};
fn kernel_main(boot_info: &'static BootInfo) -> ! {
// […] инициализация всякого, включая `init_heap`
let mut executor = SimpleExecutor::new();
executor.spawn(Task::new(example_task()));
executor.run();
// […] test_main, "It did not crash!" сообщение, hlt_loop
}
// ниже example_task, что бы вам не нужно было скролить
async fn async_number() -> u32 {
42
}
async fn example_task() {
let number = async_number().await;
println!("async number: {}", number);
}
Когда мы запускаем её, мы видим, что ожидаемое сообщение "async number: 42" выводится на экран:
Давайте подытожим шаги, которые происходят в этом примере:
- Сначала создаётся новый экземпляр нашего типа
SimpleExecutorс пустойtask_queue. - Затем мы вызываем асинхронную функцию
example_task, которая возвращает футуру. Мы оборачиваем эту футуру в типTask, который перемещает её в кучу и закрепляет, а затем добавляем задачу вtask_queueисполнителя через методspawn. - После этого мы вызываем метод
run, чтобы начать выполнение единственной задачи в очереди. Это включает в себя:- Извлечение задачи из начала
task_queue. - Создание
RawWakerдля задачи, преобразование его в экземплярWakerи создание экземпляраContextна его основе. - Вызов метода
pollна футуре задачи, используя только что созданныйContext. - Поскольку
example_taskне ждёт ничего, она может непосредственно выполняться до конца при первом вызовеpoll. Именно здесь выводится строка "async number: 42". - Т.к
example_taskнапрямую возвращаетPoll::Ready, она не добавляется обратно в очередь задач.
- Извлечение задачи из начала
- Метод
runвозвращается после того, какtask_queueстановится пустым. Выполнение нашей функцииkernel_mainпродолжается, и выводится сообщение "It did not crash!".
Async Keyboard Input
Our simple executor does not utilize the Waker notifications and simply loops over all tasks until they are done. This wasn't a problem for our example since our example_task can directly run to finish on the first poll call. To see the performance advantages of a proper Waker implementation, we first need to create a task that is truly asynchronous, i.e., a task that will probably return Poll::Pending on the first poll call.
We already have some kind of asynchronicity in our system that we can use for this: hardware interrupts. As we learned in the Interrupts post, hardware interrupts can occur at arbitrary points in time, determined by some external device. For example, a hardware timer sends an interrupt to the CPU after some predefined time has elapsed. When the CPU receives an interrupt, it immediately transfers control to the corresponding handler function defined in the interrupt descriptor table (IDT).
In the following, we will create an asynchronous task based on the keyboard interrupt. The keyboard interrupt is a good candidate for this because it is both non-deterministic and latency-critical. Non-deterministic means that there is no way to predict when the next key press will occur because it is entirely dependent on the user. Latency-critical means that we want to handle the keyboard input in a timely manner, otherwise the user will feel a lag. To support such a task in an efficient way, it will be essential that the executor has proper support for Waker notifications.
Scancode Queue
Currently, we handle the keyboard input directly in the interrupt handler. This is not a good idea for the long term because interrupt handlers should stay as short as possible as they might interrupt important work. Instead, interrupt handlers should only perform the minimal amount of work necessary (e.g., reading the keyboard scancode) and leave the rest of the work (e.g., interpreting the scancode) to a background task.
A common pattern for delegating work to a background task is to create some sort of queue. The interrupt handler pushes units of work to the queue, and the background task handles the work in the queue. Applied to our keyboard interrupt, this means that the interrupt handler only reads the scancode from the keyboard, pushes it to the queue, and then returns. The keyboard task sits on the other end of the queue and interprets and handles each scancode that is pushed to it:
A simple implementation of that queue could be a mutex-protected VecDeque. However, using mutexes in interrupt handlers is not a good idea since it can easily lead to deadlocks. For example, when the user presses a key while the keyboard task has locked the queue, the interrupt handler tries to acquire the lock again and hangs indefinitely. Another problem with this approach is that VecDeque automatically increases its capacity by performing a new heap allocation when it becomes full. This can lead to deadlocks again because our allocator also uses a mutex internally. Further problems are that heap allocations can fail or take a considerable amount of time when the heap is fragmented.
To prevent these problems, we need a queue implementation that does not require mutexes or allocations for its push operation. Such queues can be implemented by using lock-free atomic operations for pushing and popping elements. This way, it is possible to create push and pop operations that only require a &self reference and are thus usable without a mutex. To avoid allocations on push, the queue can be backed by a pre-allocated fixed-size buffer. While this makes the queue bounded (i.e., it has a maximum length), it is often possible to define reasonable upper bounds for the queue length in practice, so that this isn't a big problem.
The crossbeam Crate
Implementing such a queue in a correct and efficient way is very difficult, so I recommend sticking to existing, well-tested implementations. One popular Rust project that implements various mutex-free types for concurrent programming is crossbeam. It provides a type named ArrayQueue that is exactly what we need in this case. And we're lucky: the type is fully compatible with no_std crates with allocation support.
To use the type, we need to add a dependency on the crossbeam-queue crate:
# in Cargo.toml
[dependencies.crossbeam-queue]
version = "0.3.11"
default-features = false
features = ["alloc"]
By default, the crate depends on the standard library. To make it no_std compatible, we need to disable its default features and instead enable the alloc feature. (Note that we could also add a dependency on the main crossbeam crate, which re-exports the crossbeam-queue crate, but this would result in a larger number of dependencies and longer compile times.)
Queue Implementation
Using the ArrayQueue type, we can now create a global scancode queue in a new task::keyboard module:
// in src/task/mod.rs
pub mod keyboard;
// in src/task/keyboard.rs
use conquer_once::spin::OnceCell;
use crossbeam_queue::ArrayQueue;
static SCANCODE_QUEUE: OnceCell<ArrayQueue<u8>> = OnceCell::uninit();
Since ArrayQueue::new performs a heap allocation, which is not possible at compile time (yet), we can't initialize the static variable directly. Instead, we use the OnceCell type of the conquer_once crate, which makes it possible to perform a safe one-time initialization of static values. To include the crate, we need to add it as a dependency in our Cargo.toml:
# in Cargo.toml
[dependencies.conquer-once]
version = "0.2.0"
default-features = false
Instead of the OnceCell primitive, we could also use the lazy_static macro here. However, the OnceCell type has the advantage that we can ensure that the initialization does not happen in the interrupt handler, thus preventing the interrupt handler from performing a heap allocation.
Filling the Queue
To fill the scancode queue, we create a new add_scancode function that we will call from the interrupt handler:
// in src/task/keyboard.rs
use crate::println;
/// Called by the keyboard interrupt handler
///
/// Must not block or allocate.
pub(crate) fn add_scancode(scancode: u8) {
if let Ok(queue) = SCANCODE_QUEUE.try_get() {
if let Err(_) = queue.push(scancode) {
println!("WARNING: scancode queue full; dropping keyboard input");
}
} else {
println!("WARNING: scancode queue uninitialized");
}
}
We use OnceCell::try_get to get a reference to the initialized queue. If the queue is not initialized yet, we ignore the keyboard scancode and print a warning. It's important that we don't try to initialize the queue in this function because it will be called by the interrupt handler, which should not perform heap allocations. Since this function should not be callable from our main.rs, we use the pub(crate) visibility to make it only available to our lib.rs.
The fact that the ArrayQueue::push method requires only a &self reference makes it very simple to call the method on the static queue. The ArrayQueue type performs all the necessary synchronization itself, so we don't need a mutex wrapper here. In case the queue is full, we print a warning too.
To call the add_scancode function on keyboard interrupts, we update our keyboard_interrupt_handler function in the interrupts module:
// in src/interrupts.rs
extern "x86-interrupt" fn keyboard_interrupt_handler(
_stack_frame: InterruptStackFrame
) {
use x86_64::instructions::port::Port;
let mut port = Port::new(0x60);
let scancode: u8 = unsafe { port.read() };
crate::task::keyboard::add_scancode(scancode); // new
unsafe {
PICS.lock()
.notify_end_of_interrupt(InterruptIndex::Keyboard.as_u8());
}
}
We removed all the keyboard handling code from this function and instead added a call to the add_scancode function. The rest of the function stays the same as before.
As expected, keypresses are no longer printed to the screen when we run our project using cargo run now. Instead, we see the warning that the scancode queue is uninitialized for every keystroke.
Scancode Stream
To initialize the SCANCODE_QUEUE and read the scancodes from the queue in an asynchronous way, we create a new ScancodeStream type:
// in src/task/keyboard.rs
pub struct ScancodeStream {
_private: (),
}
impl ScancodeStream {
pub fn new() -> Self {
SCANCODE_QUEUE.try_init_once(|| ArrayQueue::new(100))
.expect("ScancodeStream::new should only be called once");
ScancodeStream { _private: () }
}
}
The purpose of the _private field is to prevent construction of the struct from outside of the module. This makes the new function the only way to construct the type. In the function, we first try to initialize the SCANCODE_QUEUE static. We panic if it is already initialized to ensure that only a single ScancodeStream instance can be created.
To make the scancodes available to asynchronous tasks, the next step is to implement a poll-like method that tries to pop the next scancode off the queue. While this sounds like we should implement the Future trait for our type, this does not quite fit here. The problem is that the Future trait only abstracts over a single asynchronous value and expects that the poll method is not called again after it returns Poll::Ready. Our scancode queue, however, contains multiple asynchronous values, so it is okay to keep polling it.
The Stream Trait
Since types that yield multiple asynchronous values are common, the futures crate provides a useful abstraction for such types: the Stream trait. The trait is defined like this:
pub trait Stream {
type Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context)
-> Poll<Option<Self::Item>>;
}
This definition is quite similar to the Future trait, with the following differences:
- The associated type is named
Iteminstead ofOutput. - Instead of a
pollmethod that returnsPoll<Self::Item>, theStreamtrait defines apoll_nextmethod that returns aPoll<Option<Self::Item>>(note the additionalOption).
There is also a semantic difference: The poll_next can be called repeatedly, until it returns Poll::Ready(None) to signal that the stream is finished. In this regard, the method is similar to the Iterator::next method, which also returns None after the last value.
Implementing Stream
Let's implement the Stream trait for our ScancodeStream to provide the values of the SCANCODE_QUEUE in an asynchronous way. For this, we first need to add a dependency on the futures-util crate, which contains the Stream type:
# in Cargo.toml
[dependencies.futures-util]
version = "0.3.4"
default-features = false
features = ["alloc"]
We disable the default features to make the crate no_std compatible and enable the alloc feature to make its allocation-based types available (we will need this later). (Note that we could also add a dependency on the main futures crate, which re-exports the futures-util crate, but this would result in a larger number of dependencies and longer compile times.)
Now we can import and implement the Stream trait:
// in src/task/keyboard.rs
use core::{pin::Pin, task::{Poll, Context}};
use futures_util::stream::Stream;
impl Stream for ScancodeStream {
type Item = u8;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<u8>> {
let queue = SCANCODE_QUEUE.try_get().expect("not initialized");
match queue.pop() {
Some(scancode) => Poll::Ready(Some(scancode)),
None => Poll::Pending,
}
}
}
We first use the OnceCell::try_get method to get a reference to the initialized scancode queue. This should never fail since we initialize the queue in the new function, so we can safely use the expect method to panic if it's not initialized. Next, we use the ArrayQueue::pop method to try to get the next element from the queue. If it succeeds, we return the scancode wrapped in Poll::Ready(Some(…)). If it fails, it means that the queue is empty. In that case, we return Poll::Pending.
Waker Support
Like the Futures::poll method, the Stream::poll_next method requires the asynchronous task to notify the executor when it becomes ready after Poll::Pending is returned. This way, the executor does not need to poll the same task again until it is notified, which greatly reduces the performance overhead of waiting tasks.
To send this notification, the task should extract the Waker from the passed Context reference and store it somewhere. When the task becomes ready, it should invoke the wake method on the stored Waker to notify the executor that the task should be polled again.
AtomicWaker
To implement the Waker notification for our ScancodeStream, we need a place where we can store the Waker between poll calls. We can't store it as a field in the ScancodeStream itself because it needs to be accessible from the add_scancode function. The solution to this is to use a static variable of the AtomicWaker type provided by the futures-util crate. Like the ArrayQueue type, this type is based on atomic instructions and can be safely stored in a static and modified concurrently.
Let's use the AtomicWaker type to define a static WAKER:
// in src/task/keyboard.rs
use futures_util::task::AtomicWaker;
static WAKER: AtomicWaker = AtomicWaker::new();
The idea is that the poll_next implementation stores the current waker in this static, and the add_scancode function calls the wake function on it when a new scancode is added to the queue.
Storing a Waker
The contract defined by poll/poll_next requires the task to register a wakeup for the passed Waker when it returns Poll::Pending. Let's modify our poll_next implementation to satisfy this requirement:
// in src/task/keyboard.rs
impl Stream for ScancodeStream {
type Item = u8;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<u8>> {
let queue = SCANCODE_QUEUE
.try_get()
.expect("scancode queue not initialized");
// fast path
if let Some(scancode) = queue.pop() {
return Poll::Ready(Some(scancode));
}
WAKER.register(&cx.waker());
match queue.pop() {
Some(scancode) => {
WAKER.take();
Poll::Ready(Some(scancode))
}
None => Poll::Pending,
}
}
}
Like before, we first use the OnceCell::try_get function to get a reference to the initialized scancode queue. We then optimistically try to pop from the queue and return Poll::Ready when it succeeds. This way, we can avoid the performance overhead of registering a waker when the queue is not empty.
If the first call to queue.pop() does not succeed, the queue is potentially empty. Only potentially because the interrupt handler might have filled the queue asynchronously immediately after the check. Since this race condition can occur again for the next check, we need to register the Waker in the WAKER static before the second check. This way, a wakeup might happen before we return Poll::Pending, but it is guaranteed that we get a wakeup for any scancodes pushed after the check.
After registering the Waker contained in the passed Context through the AtomicWaker::register function, we try to pop from the queue a second time. If it now succeeds, we return Poll::Ready. We also remove the registered waker again using AtomicWaker::take because a waker notification is no longer needed. In case queue.pop() fails for a second time, we return Poll::Pending like before, but this time with a registered wakeup.
Note that there are two ways that a wakeup can happen for a task that did not return Poll::Pending (yet). One way is the mentioned race condition when the wakeup happens immediately before returning Poll::Pending. The other way is when the queue is no longer empty after registering the waker, so that Poll::Ready is returned. Since these spurious wakeups are not preventable, the executor needs to be able to handle them correctly.
Waking the Stored Waker
To wake the stored Waker, we add a call to WAKER.wake() in the add_scancode function:
// in src/task/keyboard.rs
pub(crate) fn add_scancode(scancode: u8) {
if let Ok(queue) = SCANCODE_QUEUE.try_get() {
if let Err(_) = queue.push(scancode) {
println!("WARNING: scancode queue full; dropping keyboard input");
} else {
WAKER.wake(); // new
}
} else {
println!("WARNING: scancode queue uninitialized");
}
}
The only change that we made is to add a call to WAKER.wake() if the push to the scancode queue succeeds. If a waker is registered in the WAKER static, this method will call the equally-named wake method on it, which notifies the executor. Otherwise, the operation is a no-op, i.e., nothing happens.
It is important that we call wake only after pushing to the queue because otherwise the task might be woken too early while the queue is still empty. This can, for example, happen when using a multi-threaded executor that starts the woken task concurrently on a different CPU core. While we don't have thread support yet, we will add it soon and don't want things to break then.
Keyboard Task
Now that we implemented the Stream trait for our ScancodeStream, we can use it to create an asynchronous keyboard task:
// in src/task/keyboard.rs
use futures_util::stream::StreamExt;
use pc_keyboard::{layouts, DecodedKey, HandleControl, Keyboard, ScancodeSet1};
use crate::print;
pub async fn print_keypresses() {
let mut scancodes = ScancodeStream::new();
let mut keyboard = Keyboard::new(ScancodeSet1::new(),
layouts::Us104Key, HandleControl::Ignore);
while let Some(scancode) = scancodes.next().await {
if let Ok(Some(key_event)) = keyboard.add_byte(scancode) {
if let Some(key) = keyboard.process_keyevent(key_event) {
match key {
DecodedKey::Unicode(character) => print!("{}", character),
DecodedKey::RawKey(key) => print!("{:?}", key),
}
}
}
}
}
The code is very similar to the code we had in our keyboard interrupt handler before we modified it in this post. The only difference is that, instead of reading the scancode from an I/O port, we take it from the ScancodeStream. For this, we first create a new Scancode stream and then repeatedly use the next method provided by the StreamExt trait to get a Future that resolves to the next element in the stream. By using the await operator on it, we asynchronously wait for the result of the future.
We use while let to loop until the stream returns None to signal its end. Since our poll_next method never returns None, this is effectively an endless loop, so the print_keypresses task never finishes.
Let's add the print_keypresses task to our executor in our main.rs to get working keyboard input again:
// in src/main.rs
use blog_os::task::keyboard; // new
fn kernel_main(boot_info: &'static BootInfo) -> ! {
// […] initialization routines, including init_heap, test_main
let mut executor = SimpleExecutor::new();
executor.spawn(Task::new(example_task()));
executor.spawn(Task::new(keyboard::print_keypresses())); // new
executor.run();
// […] "it did not crash" message, hlt_loop
}
When we execute cargo run now, we see that keyboard input works again:
If you keep an eye on the CPU utilization of your computer, you will see that the QEMU process now continuously keeps the CPU busy. This happens because our SimpleExecutor polls tasks over and over again in a loop. So even if we don't press any keys on the keyboard, the executor repeatedly calls poll on our print_keypresses task, even though the task cannot make any progress and will return Poll::Pending each time.
Executor with Waker Support
To fix the performance problem, we need to create an executor that properly utilizes the Waker notifications. This way, the executor is notified when the next keyboard interrupt occurs, so it does not need to keep polling the print_keypresses task over and over again.
Task Id
The first step in creating an executor with proper support for waker notifications is to give each task a unique ID. This is required because we need a way to specify which task should be woken. We start by creating a new TaskId wrapper type:
// in src/task/mod.rs
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
struct TaskId(u64);
The TaskId struct is a simple wrapper type around u64. We derive a number of traits for it to make it printable, copyable, comparable, and sortable. The latter is important because we want to use TaskId as the key type of a BTreeMap in a moment.
To create a new unique ID, we create a TaskId::new function:
use core::sync::atomic::{AtomicU64, Ordering};
impl TaskId {
fn new() -> Self {
static NEXT_ID: AtomicU64 = AtomicU64::new(0);
TaskId(NEXT_ID.fetch_add(1, Ordering::Relaxed))
}
}
The function uses a static NEXT_ID variable of type AtomicU64 to ensure that each ID is assigned only once. The fetch_add method atomically increases the value and returns the previous value in one atomic operation. This means that even when the TaskId::new method is called in parallel, every ID is returned exactly once. The Ordering parameter defines whether the compiler is allowed to reorder the fetch_add operation in the instructions stream. Since we only require that the ID be unique, the Relaxed ordering with the weakest requirements is enough in this case.
We can now extend our Task type with an additional id field:
// in src/task/mod.rs
pub struct Task {
id: TaskId, // new
future: Pin<Box<dyn Future<Output = ()>>>,
}
impl Task {
pub fn new(future: impl Future<Output = ()> + 'static) -> Task {
Task {
id: TaskId::new(), // new
future: Box::pin(future),
}
}
}
The new id field makes it possible to uniquely name a task, which is required for waking a specific task.
The Executor Type
We create our new Executor type in a task::executor module:
// in src/task/mod.rs
pub mod executor;
// in src/task/executor.rs
use super::{Task, TaskId};
use alloc::{collections::BTreeMap, sync::Arc};
use core::task::Waker;
use crossbeam_queue::ArrayQueue;
pub struct Executor {
tasks: BTreeMap<TaskId, Task>,
task_queue: Arc<ArrayQueue<TaskId>>,
waker_cache: BTreeMap<TaskId, Waker>,
}
impl Executor {
pub fn new() -> Self {
Executor {
tasks: BTreeMap::new(),
task_queue: Arc::new(ArrayQueue::new(100)),
waker_cache: BTreeMap::new(),
}
}
}
Instead of storing tasks in a VecDeque like we did for our SimpleExecutor, we use a task_queue of task IDs and a BTreeMap named tasks that contains the actual Task instances. The map is indexed by the TaskId to allow efficient continuation of a specific task.
The task_queue field is an ArrayQueue of task IDs, wrapped into the Arc type that implements reference counting. Reference counting makes it possible to share ownership of the value among multiple owners. It works by allocating the value on the heap and counting the number of active references to it. When the number of active references reaches zero, the value is no longer needed and can be deallocated.
We use this Arc<ArrayQueue> type for the task_queue because it will be shared between the executor and wakers. The idea is that the wakers push the ID of the woken task to the queue. The executor sits on the receiving end of the queue, retrieves the woken tasks by their ID from the tasks map, and then runs them. The reason for using a fixed-size queue instead of an unbounded queue such as SegQueue is that interrupt handlers should not allocate on push to this queue.
In addition to the task_queue and the tasks map, the Executor type has a waker_cache field that is also a map. This map caches the Waker of a task after its creation. This has two reasons: First, it improves performance by reusing the same waker for multiple wake-ups of the same task instead of creating a new waker each time. Second, it ensures that reference-counted wakers are not deallocated inside interrupt handlers because it could lead to deadlocks (there are more details on this below).
To create an Executor, we provide a simple new function. We choose a capacity of 100 for the task_queue, which should be more than enough for the foreseeable future. In case our system will have more than 100 concurrent tasks at some point, we can easily increase this size.
Spawning Tasks
As for the SimpleExecutor, we provide a spawn method on our Executor type that adds a given task to the tasks map and immediately wakes it by pushing its ID to the task_queue:
// in src/task/executor.rs
impl Executor {
pub fn spawn(&mut self, task: Task) {
let task_id = task.id;
if self.tasks.insert(task.id, task).is_some() {
panic!("task with same ID already in tasks");
}
self.task_queue.push(task_id).expect("queue full");
}
}
If there is already a task with the same ID in the map, the [BTreeMap::insert] method returns it. This should never happen since each task has a unique ID, so we panic in this case since it indicates a bug in our code. Similarly, we panic when the task_queue is full since this should never happen if we choose a large-enough queue size.
Running Tasks
To execute all tasks in the task_queue, we create a private run_ready_tasks method:
// in src/task/executor.rs
use core::task::{Context, Poll};
impl Executor {
fn run_ready_tasks(&mut self) {
// destructure `self` to avoid borrow checker errors
let Self {
tasks,
task_queue,
waker_cache,
} = self;
while let Some(task_id) = task_queue.pop() {
let task = match tasks.get_mut(&task_id) {
Some(task) => task,
None => continue, // task no longer exists
};
let waker = waker_cache
.entry(task_id)
.or_insert_with(|| TaskWaker::new(task_id, task_queue.clone()));
let mut context = Context::from_waker(waker);
match task.poll(&mut context) {
Poll::Ready(()) => {
// task done -> remove it and its cached waker
tasks.remove(&task_id);
waker_cache.remove(&task_id);
}
Poll::Pending => {}
}
}
}
}
The basic idea of this function is similar to our SimpleExecutor: Loop over all tasks in the task_queue, create a waker for each task, and then poll them. However, instead of adding pending tasks back to the end of the task_queue, we let our TaskWaker implementation take care of adding woken tasks back to the queue. The implementation of this waker type will be shown in a moment.
Let's look into some of the implementation details of this run_ready_tasks method:
-
We use destructuring to split
selfinto its three fields to avoid some borrow checker errors. Namely, our implementation needs to access theself.task_queuefrom within a closure, which currently tries to borrowselfcompletely. This is a fundamental borrow checker issue that will be resolved when RFC 2229 is implemented. -
For each popped task ID, we retrieve a mutable reference to the corresponding task from the
tasksmap. Since ourScancodeStreamimplementation registers wakers before checking whether a task needs to be put to sleep, it might happen that a wake-up occurs for a task that no longer exists. In this case, we simply ignore the wake-up and continue with the next ID from the queue. -
To avoid the performance overhead of creating a waker on each poll, we use the
waker_cachemap to store the waker for each task after it has been created. For this, we use theBTreeMap::entrymethod in combination withEntry::or_insert_withto create a new waker if it doesn't exist yet and then get a mutable reference to it. For creating a new waker, we clone thetask_queueand pass it together with the task ID to theTaskWaker::newfunction (implementation shown below). Since thetask_queueis wrapped into anArc, thecloneonly increases the reference count of the value, but still points to the same heap-allocated queue. Note that reusing wakers like this is not possible for all waker implementations, but ourTaskWakertype will allow it.
A task is finished when it returns Poll::Ready. In that case, we remove it from the tasks map using the BTreeMap::remove method. We also remove its cached waker, if it exists.
Waker Design
The job of the waker is to push the ID of the woken task to the task_queue of the executor. We implement this by creating a new TaskWaker struct that stores the task ID and a reference to the task_queue:
// in src/task/executor.rs
struct TaskWaker {
task_id: TaskId,
task_queue: Arc<ArrayQueue<TaskId>>,
}
Since the ownership of the task_queue is shared between the executor and wakers, we use the Arc wrapper type to implement shared reference-counted ownership.
The implementation of the wake operation is quite simple:
// in src/task/executor.rs
impl TaskWaker {
fn wake_task(&self) {
self.task_queue.push(self.task_id).expect("task_queue full");
}
}
We push the task_id to the referenced task_queue. Since modifications to the ArrayQueue type only require a shared reference, we can implement this method on &self instead of &mut self.
The Wake Trait
In order to use our TaskWaker type for polling futures, we need to convert it to a Waker instance first. This is required because the Future::poll method takes a Context instance as an argument, which can only be constructed from the Waker type. While we could do this by providing an implementation of the RawWaker type, it's both simpler and safer to instead implement the Arc-based Wake trait and then use the From implementations provided by the standard library to construct the Waker.
The trait implementation looks like this:
// in src/task/executor.rs
use alloc::task::Wake;
impl Wake for TaskWaker {
fn wake(self: Arc<Self>) {
self.wake_task();
}
fn wake_by_ref(self: &Arc<Self>) {
self.wake_task();
}
}
Since wakers are commonly shared between the executor and the asynchronous tasks, the trait methods require that the Self instance is wrapped in the Arc type, which implements reference-counted ownership. This means that we have to move our TaskWaker to an Arc in order to call them.
The difference between the wake and wake_by_ref methods is that the latter only requires a reference to the Arc, while the former takes ownership of the Arc and thus often requires an increase of the reference count. Not all types support waking by reference, so implementing the wake_by_ref method is optional. However, it can lead to better performance because it avoids unnecessary reference count modifications. In our case, we can simply forward both trait methods to our wake_task function, which requires only a shared &self reference.
Creating Wakers
Since the Waker type supports From conversions for all Arc-wrapped values that implement the Wake trait, we can now implement the TaskWaker::new function that is required by our Executor::run_ready_tasks method:
// in src/task/executor.rs
impl TaskWaker {
fn new(task_id: TaskId, task_queue: Arc<ArrayQueue<TaskId>>) -> Waker {
Waker::from(Arc::new(TaskWaker {
task_id,
task_queue,
}))
}
}
We create the TaskWaker using the passed task_id and task_queue. We then wrap the TaskWaker in an Arc and use the Waker::from implementation to convert it to a Waker. This from method takes care of constructing a RawWakerVTable and a RawWaker instance for our TaskWaker type. In case you're interested in how it works in detail, check out the implementation in the alloc crate.
A run Method
With our waker implementation in place, we can finally construct a run method for our executor:
// in src/task/executor.rs
impl Executor {
pub fn run(&mut self) -> ! {
loop {
self.run_ready_tasks();
}
}
}
This method just calls the run_ready_tasks function in a loop. While we could theoretically return from the function when the tasks map becomes empty, this would never happen since our keyboard_task never finishes, so a simple loop should suffice. Since the function never returns, we use the ! return type to mark the function as diverging to the compiler.
We can now change our kernel_main to use our new Executor instead of the SimpleExecutor:
// in src/main.rs
use blog_os::task::executor::Executor; // new
fn kernel_main(boot_info: &'static BootInfo) -> ! {
// […] initialization routines, including init_heap, test_main
let mut executor = Executor::new(); // new
executor.spawn(Task::new(example_task()));
executor.spawn(Task::new(keyboard::print_keypresses()));
executor.run();
}
We only need to change the import and the type name. Since our run function is marked as diverging, the compiler knows that it never returns, so we no longer need a call to hlt_loop at the end of our kernel_main function.
When we run our kernel using cargo run now, we see that keyboard input still works:
However, the CPU utilization of QEMU did not get any better. The reason for this is that we still keep the CPU busy the whole time. We no longer poll tasks until they are woken again, but we still check the task_queue in a busy loop. To fix this, we need to put the CPU to sleep if there is no more work to do.
Sleep If Idle
The basic idea is to execute the hlt instruction when the task_queue is empty. This instruction puts the CPU to sleep until the next interrupt arrives. The fact that the CPU immediately becomes active again on interrupts ensures that we can still directly react when an interrupt handler pushes to the task_queue.
To implement this, we create a new sleep_if_idle method in our executor and call it from our run method:
// in src/task/executor.rs
impl Executor {
pub fn run(&mut self) -> ! {
loop {
self.run_ready_tasks();
self.sleep_if_idle(); // new
}
}
fn sleep_if_idle(&self) {
if self.task_queue.is_empty() {
x86_64::instructions::hlt();
}
}
}
Since we call sleep_if_idle directly after run_ready_tasks, which loops until the task_queue becomes empty, checking the queue again might seem unnecessary. However, a hardware interrupt might occur directly after run_ready_tasks returns, so there might be a new task in the queue at the time the sleep_if_idle function is called. Only if the queue is still empty, do we put the CPU to sleep by executing the hlt instruction through the instructions::hlt wrapper function provided by the x86_64 crate.
Unfortunately, there is still a subtle race condition in this implementation. Since interrupts are asynchronous and can happen at any time, it is possible that an interrupt happens right between the is_empty check and the call to hlt:
if self.task_queue.is_empty() {
/// <--- interrupt can happen here
x86_64::instructions::hlt();
}
In case this interrupt pushes to the task_queue, we put the CPU to sleep even though there is now a ready task. In the worst case, this could delay the handling of a keyboard interrupt until the next keypress or the next timer interrupt. So how do we prevent it?
The answer is to disable interrupts on the CPU before the check and atomically enable them again together with the hlt instruction. This way, all interrupts that happen in between are delayed after the hlt instruction so that no wake-ups are missed. To implement this approach, we can use the interrupts::enable_and_hlt function provided by the x86_64 crate.
The updated implementation of our sleep_if_idle function looks like this:
// in src/task/executor.rs
impl Executor {
fn sleep_if_idle(&self) {
use x86_64::instructions::interrupts::{self, enable_and_hlt};
interrupts::disable();
if self.task_queue.is_empty() {
enable_and_hlt();
} else {
interrupts::enable();
}
}
}
To avoid race conditions, we disable interrupts before checking whether the task_queue is empty. If it is, we use the enable_and_hlt function to enable interrupts and put the CPU to sleep as a single atomic operation. In case the queue is no longer empty, it means that an interrupt woke a task after run_ready_tasks returned. In that case, we enable interrupts again and directly continue execution without executing hlt.
Now our executor properly puts the CPU to sleep when there is nothing to do. We can see that the QEMU process has a much lower CPU utilization when we run our kernel using cargo run again.
Possible Extensions
Our executor is now able to run tasks in an efficient way. It utilizes waker notifications to avoid polling waiting tasks and puts the CPU to sleep when there is currently no work to do. However, our executor is still quite basic, and there are many possible ways to extend its functionality:
- Scheduling: For our
task_queue, we currently use theVecDequetype to implement a first in first out (FIFO) strategy, which is often also called round robin scheduling. This strategy might not be the most efficient for all workloads. For example, it might make sense to prioritize latency-critical tasks or tasks that do a lot of I/O. See the scheduling chapter of the Operating Systems: Three Easy Pieces book or the Wikipedia article on scheduling for more information. - Task Spawning: Our
Executor::spawnmethod currently requires a&mut selfreference and is thus no longer available after invoking therunmethod. To fix this, we could create an additionalSpawnertype that shares some kind of queue with the executor and allows task creation from within tasks themselves. The queue could be thetask_queuedirectly or a separate queue that the executor checks in its run loop. - Utilizing Threads: We don't have support for threads yet, but we will add it in the next post. This will make it possible to launch multiple instances of the executor in different threads. The advantage of this approach is that the delay imposed by long-running tasks can be reduced because other tasks can run concurrently. This approach also allows it to utilize multiple CPU cores.
- Load Balancing: When adding threading support, it becomes important to know how to distribute the tasks between the executors to ensure that all CPU cores are utilized. A common technique for this is work stealing.
Summary
We started this post by introducing multitasking and differentiating between preemptive multitasking, which forcibly interrupts running tasks regularly, and cooperative multitasking, which lets tasks run until they voluntarily give up control of the CPU.
We then explored how Rust's support of async/await provides a language-level implementation of cooperative multitasking. Rust bases its implementation on top of the polling-based Future trait, which abstracts asynchronous tasks. Using async/await, it is possible to work with futures almost like with normal synchronous code. The difference is that asynchronous functions return a Future again, which needs to be added to an executor at some point in order to run it.
Behind the scenes, the compiler transforms async/await code to state machines, with each .await operation corresponding to a possible pause point. By utilizing its knowledge about the program, the compiler is able to save only the minimal state for each pause point, resulting in a very small memory consumption per task. One challenge is that the generated state machines might contain self-referential structs, for example when local variables of the asynchronous function reference each other. To prevent pointer invalidation, Rust uses the Pin type to ensure that futures cannot be moved in memory anymore after they have been polled for the first time.
For our implementation, we first created a very basic executor that polls all spawned tasks in a busy loop without using the Waker type at all. We then showed the advantage of waker notifications by implementing an asynchronous keyboard task. The task defines a static SCANCODE_QUEUE using the mutex-free ArrayQueue type provided by the crossbeam crate. Instead of handling keypresses directly, the keyboard interrupt handler now puts all received scancodes in the queue and then wakes the registered Waker to signal that new input is available. On the receiving end, we created a ScancodeStream type to provide a Future resolving to the next scancode in the queue. This made it possible to create an asynchronous print_keypresses task that uses async/await to interpret and print the scancodes in the queue.
To utilize the waker notifications of the keyboard task, we created a new Executor type that uses an Arc-shared task_queue for ready tasks. We implemented a TaskWaker type that pushes the ID of woken tasks directly to this task_queue, which are then polled again by the executor. To save power when no tasks are runnable, we added support for putting the CPU to sleep using the hlt instruction. Finally, we discussed some potential extensions to our executor, for example, providing multi-core support.
What's Next?
Using async/wait, we now have basic support for cooperative multitasking in our kernel. While cooperative multitasking is very efficient, it leads to latency problems when individual tasks keep running for too long, thus preventing other tasks from running. For this reason, it makes sense to also add support for preemptive multitasking to our kernel.
In the next post, we will introduce threads as the most common form of preemptive multitasking. In addition to resolving the problem of long-running tasks, threads will also prepare us for utilizing multiple CPU cores and running untrusted user programs in the future.


