Async Event Stream
Previously, in the multiple file version of the counter app, in
event.rs we created an EventHandler using std::thread::spawn,
i.e. OS threads.
In this section, we are going to do the same thing with “green” threads or tasks, i.e. rust’s
async-await features + a future executor. We will be using tokio for this.
Here’s example code of reading key presses asynchronously comparing std::thread and tokio::task.
Notably, we are using tokio::sync::mpsc channels instead of std::sync::mpsc channels. And
because of this, receiving on a channel needs to be .await’d and hence needs to be in a async fn
method.
enum Event {
Key(crossterm::event::KeyEvent)
}
struct EventHandler {
- rx: std::sync::mpsc::Receiver<Event>,
+ rx: tokio::sync::mpsc::UnboundedReceiver<Event>,
}
impl EventHandler {
fn new() -> Self {
let tick_rate = std::time::Duration::from_millis(250);
- let (tx, rx) = std::sync::mpsc::channel();
+ let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
- std::thread::spawn(move || {
+ tokio::spawn(async move {
loop {
if crossterm::event::poll(tick_rate).unwrap() {
match crossterm::event::read().unwrap() {
CrosstermEvent::Key(e) => {
if key.kind == event::KeyEventKind::Press {
tx.send(Event::Key(e)).unwrap()
}
},
_ => unimplemented!(),
}
}
}
})
EventHandler { rx }
}
- fn next(&self) -> Result<Event> {
+ async fn next(&mut self) -> Result<Event> {
- Ok(self.rx.recv()?)
+ self.rx.recv().await.ok_or(color_eyre::eyre::eyre!("Unable to get event"))
}
}
Even with this change, our EventHandler behaves the same way as before. In order to take advantage
of using tokio we have to use tokio::select!.
We can use tokio’s select! macro to wait on multiple
async computations and return when a any single computation completes.
Using crossterm::event::EventStream::new() requires the event-stream feature to be enabled.
This also requires the futures crate. Naturally you’ll also need tokio.
If you haven’t already, add the following to your Cargo.toml:
crossterm = { version = "0.27.0", features = ["event-stream"] }
futures = "0.3.28"
tokio = { version = "1.32.0", features = ["full"] }
tokio-util = "0.7.9" # required for `CancellationToken` introduced in the next section
Here’s what the EventHandler looks like with the select! macro:
use color_eyre::eyre::Result;
use crossterm::event::KeyEvent;
use futures::{FutureExt, StreamExt};
use tokio::{sync::mpsc, task::JoinHandle};
#[derive(Clone, Copy, Debug)]
pub enum Event {
Error,
Tick,
Key(KeyEvent),
}
#[derive(Debug)]
pub struct EventHandler {
_tx: mpsc::UnboundedSender<Event>,
rx: mpsc::UnboundedReceiver<Event>,
task: Option<JoinHandle<()>>,
}
impl EventHandler {
pub fn new() -> Self {
let tick_rate = std::time::Duration::from_millis(250);
let (tx, rx) = mpsc::unbounded_channel();
let _tx = tx.clone();
let task = tokio::spawn(async move {
let mut reader = crossterm::event::EventStream::new();
let mut interval = tokio::time::interval(tick_rate);
loop {
let delay = interval.tick();
let crossterm_event = reader.next().fuse();
tokio::select! {
maybe_event = crossterm_event => {
match maybe_event {
Some(Ok(evt)) => {
match evt {
crossterm::event::Event::Key(key) => {
if key.kind == crossterm::event::KeyEventKind::Press {
tx.send(Event::Key(key)).unwrap();
}
},
_ => {},
}
}
Some(Err(_)) => {
tx.send(Event::Error).unwrap();
}
None => {},
}
},
_ = delay => {
tx.send(Event::Tick).unwrap();
},
}
}
});
Self { _tx, rx, task: Some(task) }
}
pub async fn next(&mut self) -> Result<Event> {
self.rx.recv().await.ok_or(color_eyre::eyre::eyre!("Unable to get event"))
}
}
As mentioned before, since EventHandler::next() is a async function, when we use it we have to
call .await on it. And the function that is the call site of event_handler.next().await also
needs to be an async function. In our tutorial, we are going to use the event handler in the
run() function which will now be async.
Also, now that we are getting events asynchronously, we don’t need to call
crossterm::event::poll() in the update function. Let’s make the update function take an
Event instead.
If you place the above EventHandler in a src/tui.rs file, then here’s what our application now
looks like:
mod tui;
use color_eyre::eyre::Result;
use crossterm::{
event::{self, Event::Key, KeyCode::Char},
execute,
terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen},
};
use ratatui::{
prelude::{CrosstermBackend, Terminal},
widgets::Paragraph,
};
use crossterm::{
cursor,
event::{Event as CrosstermEvent, KeyEvent, KeyEventKind, MouseEvent},
};
pub type Frame<'a> = ratatui::Frame<'a, CrosstermBackend<std::io::Stderr>>;
fn startup() -> Result<()> {
enable_raw_mode()?;
execute!(std::io::stderr(), EnterAlternateScreen)?;
Ok(())
}
fn shutdown() -> Result<()> {
execute!(std::io::stderr(), LeaveAlternateScreen)?;
disable_raw_mode()?;
Ok(())
}
// App state
struct App {
counter: i64,
should_quit: bool,
}
// App actions
pub enum Action {
Tick,
Increment,
Decrement,
Quit,
None,
}
// App ui render function
fn ui(f: &mut Frame<'_>, app: &App) {
f.render_widget(Paragraph::new(format!("Counter: {}", app.counter)), f.size());
}
fn update(app: &mut App, event: Event) -> Result<()> {
if let Event::Key(key) = event {
match key.code {
Char('j') => app.counter += 1,
Char('k') => app.counter -= 1,
Char('q') => app.should_quit = true,
_ => {},
}
}
Ok(())
}
async fn run() -> Result<()> {
let mut events = tui::EventHandler::new(); // new
// ratatui terminal
let mut t = Terminal::new(CrosstermBackend::new(std::io::stderr()))?;
// application state
let mut app = App { counter: 0, should_quit: false };
loop {
let event = events.next().await?; // new
// application update
update(&mut app, event)?;
// application render
t.draw(|f| {
ui(f, &app);
})?;
// application exit
if app.should_quit {
break;
}
}
Ok(())
}
#[tokio::main]
async fn main() -> Result<()> {
// setup terminal
startup()?;
let result = run().await;
// teardown terminal before unwrapping Result of app run
shutdown()?;
result?;
Ok(())
}
Using tokio in this manner however only makes the key events asynchronous but doesn’t make the
rest of our application asynchronous yet. We will discuss that in the next section.