Async Event Stream

Previously, in the multiple file version of the counter app, in event.rs we created an EventHandler using std::thread::spawn, i.e. OS threads.

In this section, we are going to do the same thing with “green” threads or tasks, i.e. rust’s async-await features + a future executor. We will be using tokio for this.

Here’s example code of reading key presses asynchronously comparing std::thread and tokio::task. Notably, we are using tokio::sync::mpsc channels instead of std::sync::mpsc channels. And because of this, receiving on a channel needs to be .await’d and hence needs to be in a async fn method.

  enum Event {
    Key(crossterm::event::KeyEvent)
  }

  struct EventHandler {
-   rx: std::sync::mpsc::Receiver<Event>,
+   rx: tokio::sync::mpsc::UnboundedReceiver<Event>,
  }

  impl EventHandler {
    fn new() -> Self {
      let tick_rate = std::time::Duration::from_millis(250);
-     let (tx, rx) =  std::sync::mpsc::channel();
+     let (tx, mut rx) =  tokio::sync::mpsc::unbounded_channel();
-     std::thread::spawn(move || {
+     tokio::spawn(async move {
        loop {
          if crossterm::event::poll(tick_rate).unwrap() {
            match crossterm::event::read().unwrap() {
              CrosstermEvent::Key(e) => {
                if key.kind == event::KeyEventKind::Press {
                  tx.send(Event::Key(e)).unwrap()
                }
              },
              _ => unimplemented!(),
            }
          }
        }
      })

      EventHandler { rx }
    }

-   fn next(&self) -> Result<Event> {
+   async fn next(&mut self) -> Result<Event> {
-     Ok(self.rx.recv()?)
+     self.rx.recv().await.ok_or(color_eyre::eyre::eyre!("Unable to get event"))
    }
  }

Even with this change, our EventHandler behaves the same way as before. In order to take advantage of using tokio we have to use tokio::select!.

We can use tokio’s select! macro to wait on multiple async computations and return when a any single computation completes.

Note

Using crossterm::event::EventStream::new() requires the event-stream feature to be enabled. This also requires the futures crate. Naturally you’ll also need tokio.

If you haven’t already, add the following to your Cargo.toml:

crossterm = { version = "0.27.0", features = ["event-stream"] }
futures = "0.3.28"
tokio = { version = "1.32.0", features = ["full"] }
tokio-util = "0.7.9" # required for `CancellationToken` introduced in the next section

Here’s what the EventHandler looks like with the select! macro:

use color_eyre::eyre::Result;
use crossterm::event::KeyEvent;
use futures::{FutureExt, StreamExt};
use tokio::{sync::mpsc, task::JoinHandle};

#[derive(Clone, Copy, Debug)]
pub enum Event {
  Error,
  Tick,
  Key(KeyEvent),
}

#[derive(Debug)]
pub struct EventHandler {
  _tx: mpsc::UnboundedSender<Event>,
  rx: mpsc::UnboundedReceiver<Event>,
  task: Option<JoinHandle<()>>,
}

impl EventHandler {
  pub fn new() -> Self {
    let tick_rate = std::time::Duration::from_millis(250);

    let (tx, rx) = mpsc::unbounded_channel();
    let _tx = tx.clone();

    let task = tokio::spawn(async move {
      let mut reader = crossterm::event::EventStream::new();
      let mut interval = tokio::time::interval(tick_rate);
      loop {
        let delay = interval.tick();
        let crossterm_event = reader.next().fuse();
        tokio::select! {
          maybe_event = crossterm_event => {
            match maybe_event {
              Some(Ok(evt)) => {
                match evt {
                  crossterm::event::Event::Key(key) => {
                    if key.kind == crossterm::event::KeyEventKind::Press {
                      tx.send(Event::Key(key)).unwrap();
                    }
                  },
                  _ => {},
                }
              }
              Some(Err(_)) => {
                tx.send(Event::Error).unwrap();
              }
              None => {},
            }
          },
          _ = delay => {
              tx.send(Event::Tick).unwrap();
          },
        }
      }
    });

    Self { _tx, rx, task: Some(task) }
  }

  pub async fn next(&mut self) -> Result<Event> {
    self.rx.recv().await.ok_or(color_eyre::eyre::eyre!("Unable to get event"))
  }
}

As mentioned before, since EventHandler::next() is a async function, when we use it we have to call .await on it. And the function that is the call site of event_handler.next().await also needs to be an async function. In our tutorial, we are going to use the event handler in the run() function which will now be async.

Also, now that we are getting events asynchronously, we don’t need to call crossterm::event::poll() in the update function. Let’s make the update function take an Event instead.

If you place the above EventHandler in a src/tui.rs file, then here’s what our application now looks like:

mod tui;

use color_eyre::eyre::Result;
use crossterm::{
  event::{self, Event::Key, KeyCode::Char},
  execute,
  terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen},
};
use ratatui::{
  prelude::{CrosstermBackend, Terminal},
  widgets::Paragraph,
};
use crossterm::{
  cursor,
  event::{Event as CrosstermEvent, KeyEvent, KeyEventKind, MouseEvent},
};

pub type Frame<'a> = ratatui::Frame<'a, CrosstermBackend<std::io::Stderr>>;

fn startup() -> Result<()> {
  enable_raw_mode()?;
  execute!(std::io::stderr(), EnterAlternateScreen)?;
  Ok(())
}

fn shutdown() -> Result<()> {
  execute!(std::io::stderr(), LeaveAlternateScreen)?;
  disable_raw_mode()?;
  Ok(())
}

// App state
struct App {
  counter: i64,
  should_quit: bool,
}

// App actions
pub enum Action {
  Tick,
  Increment,
  Decrement,
  Quit,
  None,
}

// App ui render function
fn ui(f: &mut Frame<'_>, app: &App) {
  f.render_widget(Paragraph::new(format!("Counter: {}", app.counter)), f.size());
}

fn update(app: &mut App, event: Event) -> Result<()> {
  if let Event::Key(key) = event {
    match key.code {
      Char('j') => app.counter += 1,
      Char('k') => app.counter -= 1,
      Char('q') => app.should_quit = true,
      _ => {},
    }
  }
  Ok(())
}

async fn run() -> Result<()> {

  let mut events = tui::EventHandler::new(); // new

  // ratatui terminal
  let mut t = Terminal::new(CrosstermBackend::new(std::io::stderr()))?;

  // application state
  let mut app = App { counter: 0, should_quit: false };

  loop {
    let event = events.next().await?; // new

    // application update
    update(&mut app, event)?;

    // application render
    t.draw(|f| {
      ui(f, &app);
    })?;

    // application exit
    if app.should_quit {
      break;
    }
  }

  Ok(())
}

#[tokio::main]
async fn main() -> Result<()> {
  // setup terminal
  startup()?;

  let result = run().await;

  // teardown terminal before unwrapping Result of app run
  shutdown()?;

  result?;

  Ok(())
}

Using tokio in this manner however only makes the key events asynchronous but doesn’t make the rest of our application asynchronous yet. We will discuss that in the next section.