mirror of
https://github.com/phil-opp/blog_os.git
synced 2025-12-16 22:37:49 +00:00
Finish implementation of executor with waker support
This commit is contained in:
@@ -1374,14 +1374,14 @@ Let's add the `print_keypresses` task to our executor in our `main.rs` to get wo
|
||||
fn kernel_main(boot_info: &'static BootInfo) -> ! {
|
||||
use blog_os::task::keyboard;
|
||||
|
||||
// […] initialization routines, including `init_heap`
|
||||
// […] initialization routines, including init_heap, test_main
|
||||
|
||||
let mut executor = SimpleExecutor::new();
|
||||
executor.spawn(Task::new(example_task()));
|
||||
executor.spawn(Task::new(keyboard::print_keypresses())); // new
|
||||
executor.run();
|
||||
|
||||
// […] test_main, "it did not crash" message, hlt_loop
|
||||
// […] "it did not crash" message, hlt_loop
|
||||
}
|
||||
```
|
||||
|
||||
@@ -1479,9 +1479,12 @@ impl Executor {
|
||||
In addition to a `task_queue`, that stores the tasks that are ready to execute, the type has a `waiting_tasks` map, a `wake_queue` and a `waker_cache`. These fields have the following purpose:
|
||||
|
||||
- The `waiting_tasks` map stores tasks that returned `Poll::Pending`. The map is indexed by the `TaskId` to allow efficient continuation a specific task.
|
||||
- The `wake_queue` is an reference-counted [`ArrayQueue`] of task IDs. It will be shared between the executor and wakers. The idea is that the wakers push the ID of the woken task to the queue. The executor sits on the receiving end of the queue and moves all woken tasks from the `waiting_tasks` map back to the `task_queue`. The reason for using a fixed-size queue instead of an unbounded queue such as [`SegQueue`] is that interrupt handlers that should not allocate will push to this queue.
|
||||
- The `wake_queue` is [`ArrayQueue`] of task IDs, wrapped into the [`Arc`] type that implements _reference counting_. Reference countingmakes it possible to share ownership of the value between multiple owners. It works by allocating the value on the heap and counting the number of active references to it. When the number of active references reaches zero, the value is no longer needed and can be deallocated.
|
||||
|
||||
We use the `Arc` wrapper for the `wake_queue` because it will be shared between the executor and wakers. The idea is that the wakers push the ID of the woken task to the queue. The executor sits on the receiving end of the queue and moves all woken tasks from the `waiting_tasks` map back to the `task_queue`. The reason for using a fixed-size queue instead of an unbounded queue such as [`SegQueue`] is that interrupt handlers that should not allocate will push to this queue.
|
||||
- The `waker_cache` map caches the [`Waker`] of a task after its creation. This has two reasons: First, it improves performance by reusing the same waker for multiple wake-ups of the same task instead of creating a new waker each time. Second, it ensures that reference-counted wakers are not deallocated inside interrupt handlers because it could lead to deadlocks (there are more details on this below).
|
||||
|
||||
[`Arc`]: https://doc.rust-lang.org/stable/alloc/sync/struct.Arc.html
|
||||
[`SegQueue`]: https://docs.rs/crossbeam-queue/0.2.1/crossbeam_queue/struct.SegQueue.html
|
||||
|
||||
To create an `Executor`, we provide a simple `new` function. We choose a capacity of 100 for the `wake_queue`, which should be more than enough for the foreseeable future. In case our system will have more than 100 concurrent tasks at some point, we can easily increase this size.
|
||||
@@ -1543,58 +1546,240 @@ To avoid the performance overhead of creating a waker on each poll, we use the `
|
||||
|
||||
#### Waker Design
|
||||
|
||||
- Waker
|
||||
- Executor::create_waker
|
||||
- Executor::wake_tasks
|
||||
The job of the waker is to push the ID of the woken task to the `wake_queue` of the executor. We implement this by creating a new `TaskWaker` struct that stores the task ID and a reference to the the `wake_queue`:
|
||||
|
||||
#### A `run` Method
|
||||
```rust
|
||||
// in src/task/executor.rs
|
||||
|
||||
#### Sleep If Idle
|
||||
struct TaskWaker {
|
||||
task_id: TaskId,
|
||||
wake_queue: Arc<ArrayQueue<TaskId>>,
|
||||
}
|
||||
```
|
||||
|
||||
Since the ownership of the `wake_queue` is shared between the executor and wakers, we use the [`Arc`] wrapper type to implement shared reference-counted ownership.
|
||||
|
||||
[`Arc`]: https://doc.rust-lang.org/stable/alloc/sync/struct.Arc.html
|
||||
|
||||
### Old
|
||||
The implementation of the wake operation is quite simple:
|
||||
|
||||
#### The `Wake` Trait
|
||||
```rust
|
||||
// in src/task/executor.rs
|
||||
|
||||
The simplest way to do this is by implementing the unstable [`Wake`] trait for an empty `DummyWaker` struct:
|
||||
impl TaskWaker {
|
||||
fn wake_task(&self) {
|
||||
self.wake_queue.push(self.task_id).expect("wake_queue full");
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
We push the `task_id` to the referenced `wake_queue`. Since modifications of the [`ArrayQueue`] type only require a shared reference, we can implement this method on `&self` instead of `&mut self`.
|
||||
|
||||
##### The `Wake` Trait
|
||||
|
||||
In order to use our `TaskWaker` type for polling futures, we need to convert it to a [`Waker`] instance first. This is required because the [`Future::poll`] method takes a [`Context`] instance as argument, which can only be constructed from the `Waker` type. While we could do this by providing an implementation of the [`RawWaker`] type, it's both simpler and safer to instead implement the [`Wake`] trait and then using the [`From`] implementations provided by the standard library to construct the `Waker`.
|
||||
|
||||
The trait implementation looks like this:
|
||||
|
||||
[`Wake`]: https://doc.rust-lang.org/nightly/alloc/task/trait.Wake.html
|
||||
|
||||
```rust
|
||||
// in src/task/simple_executor.rs
|
||||
|
||||
use alloc::{sync::Arc, task::Wake};
|
||||
use alloc::task::Wake;
|
||||
|
||||
struct DummyWaker;
|
||||
|
||||
impl Wake for DummyWaker {
|
||||
impl Wake for TaskWaker {
|
||||
fn wake(self: Arc<Self>) {
|
||||
// do nothing
|
||||
self.wake_task();
|
||||
}
|
||||
|
||||
fn wake_by_ref(self: &Arc<Self>) {
|
||||
self.wake_task();
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
The trait is still unstable, so we have to add **`#![feature(wake_trait)]`** to the top of our `lib.rs` to use it. The `wake` method of the trait is normally responsible for waking the corresponding task in the executor. However, our `SimpleExecutor` will not differentiate between ready and waiting tasks, so we don't need to do anything on `wake` calls.
|
||||
The trait is still unstable, so we have to add **`#![feature(wake_trait)]`** to the top of our `lib.rs` to use it. Since wakers are commonly shared between the executor and the asynchronous tasks, the trait methods require that the `Self` instance is wrapped in the [`Arc`] type, which implements reference-counted ownership. This means that we have to move our `TaskWaker` to an `Arc` to in order to call them.
|
||||
|
||||
Since wakers are normally shared between the executor and the asynchronous tasks, the `wake` method requires that the `Self` instance is wrapped in the [`Arc`] type, which implements reference-counted ownership. The basic idea is that the value is heap-allocated and the number of active references to it are counted. If the number of active references reaches zero, the value is no longer needed and can be deallocated.
|
||||
The difference between the `wake` and `wake_by_ref` methods is that the latter only requires a reference the the `Arc`, while the former takes ownership of the `Arc` and thus often requires an increase of the reference count. Not all types support waking by reference, so implementing the `wake_by_ref` method is optional, however it can lead to better performance because it avoids unnecessary reference count modifications. In our case, we can simply forward both trait methods to our `wake_task` function, which requires only a shared `&self` reference.
|
||||
|
||||
[`Arc`]: https://doc.rust-lang.org/stable/alloc/sync/struct.Arc.html
|
||||
##### Creating Wakers
|
||||
|
||||
To make our `DummyWaker` usable with the [`Context`] type, we need a method to convert it to the [`Waker`] defined in the core library:
|
||||
Since the `Waker` type supports [`From`] conversions for all `Arc`-wrapped values that implement the `Wake` trait, we can now implement the `Executor::create_waker` method using our `TaskWaker`:
|
||||
|
||||
[`From`]: https://doc.rust-lang.org/nightly/core/convert/trait.From.html
|
||||
|
||||
```rust
|
||||
// in src/task/simple_executor.rs
|
||||
// in src/task/executor.rs
|
||||
|
||||
use core::task::Waker;
|
||||
|
||||
impl DummyWaker {
|
||||
fn to_waker(self) -> Waker {
|
||||
Waker::from(Arc::new(self))
|
||||
impl Executor {
|
||||
fn create_waker(&self, task_id: TaskId) -> Waker {
|
||||
Waker::from(Arc::new(TaskWaker {
|
||||
task_id,
|
||||
wake_queue: self.wake_queue.clone(),
|
||||
}))
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
The method first makes the `self` instance reference-counted by wrapping it in an [`Arc`]. Then it uses the [`Waker::from`] method to create the `Waker`. This method is available for all reference counted types that implement the [`Wake`] trait.
|
||||
We create the `TaskWaker` using the passed `task_id` and a clone of the `wake_queue`. Since the `wake_queue` is wrapped into `Arc`, the `clone` only increases the reference count of the value, but still points to the same heap allocated queue. We store the `TaskWaker` in an `Arc` too because the `Waker::from` implementation requires it. This function then takes care of constructing a [`RawWakerVTable`] and a [`RawWaker`] instance for our `TaskWaker` type. In case you're interested in how it works in detail, check out the [implementation in the `alloc` crate][waker-from-impl].
|
||||
|
||||
[`Waker::from`]: TODO
|
||||
[waker-from-impl]: https://github.com/rust-lang/rust/blob/cdb50c6f2507319f29104a25765bfb79ad53395c/src/liballoc/task.rs#L58-L87
|
||||
|
||||
##### Handling Wake-Ups
|
||||
|
||||
To handle wake-ups in our executor, we add a `wake_tasks` method:
|
||||
|
||||
```rust
|
||||
// in src/task/executor.rs
|
||||
|
||||
impl Executor {
|
||||
fn wake_tasks(&mut self) {
|
||||
while let Ok(task_id) = self.wake_queue.pop() {
|
||||
if let Some(task) = self.waiting_tasks.remove(&task_id) {
|
||||
self.task_queue.push_back(task);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
We use a `while let` loop to pop all items from the `wake_queue`. For each popped task ID, we remove the corresponding task from the `waiting_tasks` map and add it to the back of the `task_queue`. Since we register wakers before checking whether a task needs to be put to sleep, it might happen that a wake-up occurs for tasks even though they are not in the `waiting_tasks` map. In this case, we simply ignore the wake-up.
|
||||
|
||||
#### A `run` Method
|
||||
|
||||
With our waker implementation in place, we can finally construct a `run` method for our executor:
|
||||
|
||||
```rust
|
||||
// in src/task/executor.rs
|
||||
|
||||
impl Executor {
|
||||
pub fn run(&mut self) -> ! {
|
||||
loop {
|
||||
self.wake_tasks();
|
||||
self.run_ready_tasks();
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
This method just calls the `wake_tasks` and `run_ready_tasks` functions in a loop. While we could theoretically return from the function when both the `task_queue` and the `waiting_tasks` map become empty, this would never happen since our `keyboard_task` never finishes, so a simply `loop` should suffice. Since the function never returns, we use the `!` return type to mark the function as [diverging] to the compiler.
|
||||
|
||||
[diverging]: https://doc.rust-lang.org/stable/rust-by-example/fn/diverging.html
|
||||
|
||||
We can now change our `kernel_main` to use our new `Executor` instead of the `SimpleExecutor`:
|
||||
|
||||
```rust
|
||||
// in src/main.rs
|
||||
|
||||
fn kernel_main(boot_info: &'static BootInfo) -> ! {
|
||||
use blog_os::task::executor::Executor;
|
||||
|
||||
// […] initialization routines, including init_heap, test_main
|
||||
|
||||
let mut executor = Executor::new();
|
||||
executor.spawn(Task::new(example_task()));
|
||||
executor.spawn(Task::new(keyboard::print_keypresses()));
|
||||
executor.run();
|
||||
}
|
||||
```
|
||||
|
||||
We only need to change the import and the type name. Since our `run` function is marked as diverging, the compiler knows that it never returns so that we no longer need a call to `hlt_loop` at the end of our `kernel_main` function.
|
||||
|
||||
When we run our kernel using `cargo xrun` now, we see that keyboard input still works:
|
||||
|
||||
TODO gif
|
||||
|
||||
However, the CPU utilization of QEMU did not get any better. The reason for this is that we still keep the CPU busy for the whole time. We no longer poll tasks until they are woken again, but we still check the `wake_queue` and the `task_queue` in a busy loop. To fix this, we need to put the CPU to sleep if there is no more work to do.
|
||||
|
||||
#### Sleep If Idle
|
||||
|
||||
The basic idea is to execute the [`hlt` instruction] when both the `task_queue` and the `wake_queue` are empty. This instruction puts the CPU to sleep until the next interrupt arrives. The fact that the CPU immediately becomes active again on interrupts ensures that we can still directly react when an interrupt handler pushes to the `wake_queue`.
|
||||
|
||||
[`hlt` instruction]: https://en.wikipedia.org/wiki/HLT_(x86_instruction)
|
||||
|
||||
To implement this, we create a new `sleep_if_idle` method to our executor and call it from our `run` method:
|
||||
|
||||
```rust
|
||||
// in src/task/executor.rs
|
||||
|
||||
impl Executor {
|
||||
pub fn run(&mut self) -> ! {
|
||||
loop {
|
||||
self.wake_tasks();
|
||||
self.run_ready_tasks();
|
||||
self.sleep_if_idle(); // new
|
||||
}
|
||||
}
|
||||
|
||||
fn sleep_if_idle(&self) {
|
||||
if self.wake_queue.is_empty() {
|
||||
x86_64::instructions::hlt();
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Since we call `sleep_if_idle` directly after `run_ready_tasks`, which loops until the `task_queue` becomes empty, we only need to check the `wake_queue`. If it is empty too, there is no task that is ready to run, so we execute the `hlt` instruction through the [`instructions::hlt`] wrapper function provided by the [`x86_64`] crate.
|
||||
|
||||
[`instructions::hlt`]: https://docs.rs/x86_64/0.9.6/x86_64/instructions/fn.hlt.html
|
||||
[`x86_64`]: https://docs.rs/x86_64/0.9.6/x86_64/index.html
|
||||
|
||||
Unfortunately, there is a subtle race condition in this implementation. Since interrupts are asynchronous and can happen at any time, it is possible that an interrupt happens between the `is_empty` check and the call to `hlt`:
|
||||
|
||||
```rust
|
||||
if self.wake_queue.is_empty() {
|
||||
/// <--- interrupt can happen here
|
||||
x86_64::instructions::hlt();
|
||||
}
|
||||
```
|
||||
|
||||
In case this interrupt pushes to the `wake_queue`, we put the CPU to sleep even though there is now a ready task. In the worst case, this could delay the handling of a keyboard interrupt until the next keypress or the next timer interrupt. So how do we prevent it?
|
||||
|
||||
The answer is to disable interrupts on the CPU before the check and atomically enable them again together with the `hlt` instruction. This way, all interrupts that happen between in between are delayed after the `hlt` instruction so that no wake-ups are missed. To implement this approach, we can use the [`enable_interrupts_and_hlt`] function provided by the [`x86_64`] crate:
|
||||
|
||||
[`enable_interrupts_and_hlt`]: https://docs.rs/x86_64/0.9.6/x86_64/instructions/interrupts/fn.enable_interrupts_and_hlt.html
|
||||
|
||||
```rust
|
||||
// in src/task/executor.rs
|
||||
|
||||
impl Executor {
|
||||
fn sleep_if_idle(&self) {
|
||||
use x86_64::instructions::interrupts::{self, enable_interrupts_and_hlt};
|
||||
|
||||
// fast path
|
||||
if !self.wake_queue.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
interrupts::disable();
|
||||
if self.wake_queue.is_empty() {
|
||||
enable_interrupts_and_hlt();
|
||||
} else {
|
||||
interrupts::enable();
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
To avoid unnecessarily disabling interrupts, we early return if the `wake_queue` is not empty. Otherwise, we disable interrupts and check the `wake_queue` again. If it is still empty, we use the [`enable_interrupts_and_hlt`] function to enable interrupts and put the CPU to sleep as a single atomic operation. In case the queue is no longer empty, it means that an interrupt woke a task between the first and the second check. In that case, we enable interrupts again and directly continue execution without executing `hlt`.
|
||||
|
||||
Now our executor properly puts the CPU to sleep when there is nothing to do. We can see that the QEMU process as a much lower CPU utilization when we run our kernel using `cargo xrun` now.
|
||||
|
||||
#### Possible Extensions
|
||||
|
||||
Our executor is now able to run tasks in an efficient way. It utilizes waker notifications to avoid polling waiting tasks and puts the CPU to sleep when there is currently no work to do. However, our executor is still quite basic and there are many possible ways to extend its functionality:
|
||||
|
||||
- **Scheduling:** We currently use the [`VecDeque`] type to implement a _first in first out_ (FIFO) strategy for our `task_queue`, which is often also called _round robin_ scheduling. This strategy might not be the most efficient for all workloads. For example, it might make sense to prioritize latency-critical tasks or task that do a lot of I/O. See the [scheduling chapter] of the [_Operating Systems: Three Easy Pieces_] book or the [Wikipedia article on scheduling][scheduling-wiki] for more information.
|
||||
- **Task Spawning**: Our `Executor::spawn` method currently requires a `&mut self` reference and is thus no longer available after starting the `run` method. To fix this, we could create an additional `Spawner` type that shares some kind of queue with the executor and allows task creation from within tasks themselves. The queue could be for example the `task_queue` directly or a separate queue that the executor checks in its run loop.
|
||||
- **Utilizing Threads**: We don't have support for threads yet, but we will add it in the next post. This will make it possible to launch multiple instances of the executor in different threads. The advantage of this approach is that the delay imposed by long running tasks can be reduced because other tasks can run concurrently. This approach also allows it to utilize multiple CPU cores.
|
||||
- **Load Balancing**: When adding threading support, it becomes important how to distribute the tasks between the executors to ensure that all CPU cores are utilized. A common technique for this is [_work stealing_].
|
||||
|
||||
[scheduling chapter]: http://pages.cs.wisc.edu/~remzi/OSTEP/cpu-sched.pdf
|
||||
[_Operating Systems: Three Easy Pieces_]: http://pages.cs.wisc.edu/~remzi/OSTEP/
|
||||
[scheduling-wiki]: https://en.wikipedia.org/wiki/Scheduling_(computing)
|
||||
[_work stealing_]: https://en.wikipedia.org/wiki/Work_stealing
|
||||
|
||||
## Summary
|
||||
|
||||
## What's Next?
|
||||
|
||||
Reference in New Issue
Block a user