Add Waker support to the poll_next implementation on ScancodeStream

This commit is contained in:
Philipp Oppermann
2020-03-20 19:01:20 +01:00
parent c3648e4b20
commit 4f29fdea72

View File

@@ -1219,6 +1219,89 @@ We first use the [`OnceCell::try_get`] method to get a reference to the initiali
#### Waker Support
Like the `Futures::poll` method, the `Stream::poll_next` method requires that the asynchronous task notifies the executor when it becomes ready after `Poll::Pending` is returned for the first time. This way, the executor does not need to poll the same task again until it is notified, which greatly reduces the performance overhead of waiting tasks.
To send this notification, the task should extract the [`Waker`] from the passed [`Context`] reference and store it somewhere. When the task becomes ready, it should invoke the [`wake`] method on the stored `Waker` to notify the executor that the task should be polled again.
##### AtomicWaker
To implement the `Waker` notification for our `ScancodeStream`, we need a place where we can store the `Waker` between poll calls. We can't store it as a field in the `ScancodeStream` itself because it needs to be accessible from the `add_scancode` function. The solution for this is to use a static variable of the [`AtomicWaker`] type provided by the `futures-util` crate. Like the `ArrayQueue` type, this type is based on atomic instructions and can be safely stored in a static and modified concurrently.
[`AtomicWaker`]: https://docs.rs/futures-util/0.3.4/futures_util/task/struct.AtomicWaker.html
Let's use the [`AtomicWaker`] type to define a static `WAKER`:
```rust
// in src/task/keyboard.rs
static WAKER: AtomicWaker = AtomicWaker::new();
```
The idea is that the `poll_next` implementation stores the current waker in this static and the `add_scancode` function calls the `wake` function on it when a new scancode is added to the queue.
##### Storing a Waker
The contract defined by `poll`/`poll_next` requires that the task registers a wakeup for the passed `Waker` when it returns `Poll::Pending`. Let's modify our `poll_next` implementation to satisfy these requirement:
```rust
// in src/task/keyboard.rs
impl Stream for ScancodeStream {
type Item = u8;
fn poll_next(self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<u8>> {
let queue = SCANCODE_QUEUE
.try_get()
.expect("scancode queue not initialized");
// fast path
if let Ok(scancode) = queue.pop() {
return Poll::Ready(Some(scancode));
}
WAKER.register(&cx.waker());
match scancodes.pop() {
Ok(scancode) => Poll::Ready(scancode),
Err(crossbeam_queue::PopError) => Poll::Pending,
}
}
}
```
Like before, we first use the [`OnceCell::try_get`] function to get a reference to the initialized scancode queue. We then optimistically try to `pop` from the queue and return `Poll::Ready` when it succeeds. This exploits the fact that it's only required to register a wakeup when returning `Poll::Pending`.
If the first call to `queue.pop()` does not succeed, the queue is potentially empty. Only potentially because the interrupt handler might have filled the queue asynchronously immediately after the check. Since this race condition can occur again on the next check, we need to register the `Waker` in the `WAKER` static before the second check. This way, a wakeup might happen before we return `Poll::Pending`, but it is guaranteed that we get a wakeup for any scancodes pushed after the check.
After registering the `Waker` contained in the passed [`Context`] through the [`AtomicWaker::register`] function, we try popping from the queue a second time. If it now succeeds, we return `Poll::Ready`. Otherwise, we return `Poll::Pending` like before, but this time with a registered wakeup.
[`AtomicWaker::register`]: https://docs.rs/futures-util/0.3.4/futures_util/task/struct.AtomicWaker.html#method.register
Note that there are two ways that a wakeup can happen for a task that did not return `Poll::Pending` (yet). One way is the mentioned race condition when the wakeup happens immediately before returning `Poll::Pending`. The other way is when the queue is no longer empty after registering the waker so that `Poll::Ready` is returned. Since these spurious wakeups are not preventable, the executor needs to be able to handle them correctly.
##### Waking the Stored Waker
To wake the stored `Waker`, we add a call to `WAKER.wake()` in the `add_scancode` function:
```rust
// in src/task/keyboard.rs
pub(crate) add_scancode(scancode: u8) {
if let Ok(queue) = SCANCODE_QUEUE.try_get() {
if let Err(_) = scancode_queue.push(scancode) {
println!("WARNING: scancode queue full; dropping keyboard input");
} else {
WAKER.wake(); // new
}
}
}
```
The only change that we performed is to add a call to `WAKER.wake()` if the push to the scancode queue succeeds. If a waker is registered in the `WAKER` static, this method will call the equally-named [`wake`] method on it, which notifies the executor. Otherwise, the operation is a no-op, i.e. nothing happens.
[`wake`]: https://doc.rust-lang.org/stable/core/task/struct.Waker.html#method.wake
It is important that we call `wake` only after pushing to the queue because otherwise the task might be woken too early when the queue is still empty. This can for example happen when using a multi-threaded executor that starts the woken task concurrently on a different CPU core. While we don't have thread support yet, we will add it soon and we don't want things to break then.
### Executor with Waker Support