Implement Stream for ScancodeStream

This commit is contained in:
Philipp Oppermann
2020-03-20 17:23:22 +01:00
parent dd83feec2d
commit c3648e4b20

View File

@@ -1100,7 +1100,11 @@ We use the [`OnceCell::try_get`] to get a reference to the initialized queue. If
[`OnceCell::try_get`]: https://docs.rs/conquer-once/0.2.0/conquer_once/raw/struct.OnceCell.html#method.try_get
To call the function on keyboard interrupts, we update our `keyboard_interrupt_handler` function in the `interrupts` module:
The fact that the [`ArrayQueue::push`] method requires only a `&self` reference makes it very simple to call the method on the static queue. The `ArrayQueue` type performs all necessary synchronization itself, so we don't need a mutex wrapper here.
[`ArrayQueue::push`]: https://docs.rs/crossbeam/0.7.3/crossbeam/queue/struct.ArrayQueue.html#method.push
To call the `add_scancode` function on keyboard interrupts, we update our `keyboard_interrupt_handler` function in the `interrupts` module:
```rust
// in src/interrupts.rs
@@ -1127,6 +1131,94 @@ As expected, keypresses are no longer printed to the screen when we run our proj
#### Scancode Stream
#### AtomicWaker
To read the scancodes from the queue in an asynchronous way, we create a new `ScancodeStream` type:
```rust
// in src/task/keyboard.rs
pub struct ScancodeStream {
_private: (),
}
impl ScancodeStream {
pub fn new() -> Self {
SCANCODE_QUEUE.try_init_once(|| ArrayQueue::new(100))
.expect("ScancodeStream::new should only be called once");
ScancodeStream {
_private: (),
}
}
```
The purpose of the `_private` field is to prevent construction of the struct from outside of the module. This makes the `new` function the only way to construct the type. In the function, we first try to initialize the `SCANCODE_QUEUE` static. We panic if it is already initialized to ensure that only a single `ScancodeStream` type can be created.
To make the scancodes available to asynchronous tasks, the next step is to implement `poll`-like method that tries to pop the next scancode off the queue. While this sounds like we should implement [`Future`] trait for our type, this does not quite fit here. The problem is that the `Future` trait only abstracts over a single asynchronous value and expects that the `poll` method is not called again after it returns `Poll::Ready`. Our scancode queue, however, contains multiple asynchronous tasks so that it is ok to keep polling it.
##### The `Stream` Trait
Since types that yield multiple asynchronous values are common, the [`futures`] crate provides a useful abstraction for such types: the [`Stream`] trait. The trait is defined like this:
[`Stream`]: https://rust-lang.github.io/async-book/05_streams/01_chapter.html
```rust
pub trait Stream {
type Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context)
-> Poll<Option<Self::Item>>;
}
```
This definition is quite similar to the [`Future`] trait, with the following differences:
- The associated type is named `Item` instead of `Output`.
- Instead of a `poll` method that returns `Poll<Self::Item>`, the `Stream` trait defines a `poll_next` method that returns a `Poll<Option<Self::Item>>` (note the additional `Option`).
There is also a semantic difference: The `poll_next` can be called repeatedly, until it returns `Poll::Ready(None)` to signal that the stream is finished. In this regard, the method is similar to the [`Iterator::next`] method, which also returns `None` after the last value.
[`Iterator::next`]: https://doc.rust-lang.org/stable/core/iter/trait.Iterator.html#tymethod.next
##### Implementing `Stream`
Let's implement the `Stream` trait for our `ScancodeStream` to provide the values of the `SCANCODE_QUEUE` in an asynchronous way. For this, we first need to add a dependency on the `futures-util` crate, which contains the `Stream` type:
```toml
# in Cargo.toml
[dependencies.futures-util]
version = "0.3.4"
default-features = false
features = ["alloc"]
```
We disable the default features to make the crate `no_std` compatible and enable the `alloc` feature to make its allocation-based types available (we will need this later). <span class="gray">(Note that we could also add a dependency on the main `futures` crate, which re-exports the `futures-util` crate, but this would result in a larger number of dependencies and longer compile times.)</span>
Now we can import and implement the `Stream` trait:
```rust
// in src/task/keyboard.rs
use futures_util::stream::Stream;
impl Stream for ScancodeStream {
type Item = u8;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<u8>> {
let queue = SCANCODE_QUEUE.try_get().expect("not initialized");
match queue.pop() {
Ok(scancode) => Poll::Ready(Some(scancode)),
Err(crossbeam_queue::PopError) => Poll::Pending,
}
}
}
```
We first use the [`OnceCell::try_get`] method to get a reference to the initialized scancode queue. This should never fail since we initialize the queue in the `new` function, so we can safely use the `expect` method to panic if it's not initalized. Next, we use the [`ArrayQueue::pop`] to try to get the next element from the queue. If it succeeds we return the scancode wrapped in `Poll::Ready(Some(…))`. If it fails, it means that the queue is empty. In that case, we return `Poll::Pending`.
[`ArrayQueue::pop`]: https://docs.rs/crossbeam/0.7.3/crossbeam/queue/struct.ArrayQueue.html#method.pop
#### Waker Support
### Executor with Waker Support