packages/shared-skills/skills/programming/references/rust/async-tokio.md
Structured concurrency, cancellation, blocking-work isolation, channel selection. The patterns the agent should reach for by default.
// Default for services and CLIs that do real work
#[tokio::main(flavor = "multi_thread", worker_threads = 8)]
async fn main() -> anyhow::Result<()> { ... }
// For tiny CLIs or wasm where you measured single-thread is enough
#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> { ... }
Pick worker count explicitly. The default (num_cpus) is fine for servers; for desktop tools you usually want 2-4.
tokio::spawn returns a JoinHandle<T>. The future runs to completion even if the handle is dropped (detached). To enforce structured concurrency, use JoinSet:
use tokio::task::JoinSet;
let mut set = JoinSet::new();
for url in urls {
let client = client.clone();
set.spawn(async move { fetch(&client, &url).await });
}
let mut results = Vec::new();
while let Some(joined) = set.join_next().await {
match joined {
Ok(Ok(body)) => results.push(body),
Ok(Err(error)) => tracing::warn!(%error, "fetch failed"),
Err(panicked) if panicked.is_panic() => {
tracing::error!(?panicked, "worker panicked");
// Choose: re-raise, or continue with degraded result set.
}
Err(other) => tracing::error!(?other, "worker join error"),
}
}
JoinSet:
For wait-for-all semantics with one type, join!:
let (a, b, c) = tokio::join!(load_a(), load_b(), load_c());
let a = a?; let b = b?; let c = c?;
For first-of-many, select!:
tokio::select! {
biased; // bias to top-to-bottom checking when ordering matters
_ = shutdown.recv() => {
tracing::info!("shutdown signal");
return Ok(());
}
request = listener.accept() => {
handle_request(request?).await?;
}
}
Without biased, branches are polled in random order each iteration (good for fairness). Use biased only when you need deterministic priority (shutdown signal first, etc).
A future is cancelled when it is dropped (e.g., the select! arm wins another branch). Always think: if this future is dropped mid-await, what state is left behind?
Cancel-safe futures (you can drop without lasting effect):
recv() on channelsaccept() on listenerswait_for on watch::Receiverread_buf/write_all on streams only when buffers are owned by the future, otherwise noCancel-unsafe futures (dropping mid-way leaves partial state):
read_exact into an external bufferIf a function is cancel-unsafe, document it in a rustdoc # Cancel Safety section.
To explicitly opt out of cancellation, use tokio_util::sync::CancellationToken:
use tokio_util::sync::CancellationToken;
let token = CancellationToken::new();
let child = token.child_token();
tokio::spawn(async move {
tokio::select! {
_ = child.cancelled() => { /* clean up */ }
result = work() => { /* normal */ }
}
});
// later
token.cancel();
Pass child tokens down the call tree so the whole tree can be cancelled together.
use tokio::time::{timeout, Duration};
match timeout(Duration::from_secs(5), fetch(url)).await {
Ok(Ok(body)) => Ok(body),
Ok(Err(error)) => Err(error.into()),
Err(_elapsed) => Err(anyhow::anyhow!("timed out fetching {url}")),
}
Set timeouts on every external I/O boundary. Defaults of "wait forever" are bugs.
NEVER block inside an async task. Symptoms: deadlock, every future stalled, latency cliffs.
Heavy CPU or sync I/O → spawn_blocking:
let result = tokio::task::spawn_blocking(|| {
// CPU-bound: parsing, hashing, image processing
// Or sync I/O: rusqlite, OS APIs without async wrappers
expensive_pure_computation()
}).await?;
Long-running blocking jobs (more than ~1 second of CPU) → use a dedicated thread pool (rayon), not tokio's blocking pool which is sized for short bursts.
| Need | Use |
|---|---|
| 1-many producers → 1 consumer, async | tokio::sync::mpsc::channel(cap) |
| Same as above, both sync + async | flume::bounded(cap) |
| 1 → many fan-out, latest-value semantics | tokio::sync::watch::channel(initial) |
| 1 → many fan-out, queued | tokio::sync::broadcast::channel(cap) |
| One-shot reply | tokio::sync::oneshot::channel() |
| Backpressure-driven stream of items | tokio::sync::mpsc::Receiver + ReceiverStream |
Mpsc pattern:
let (tx, mut rx) = tokio::sync::mpsc::channel::<Job>(256);
tokio::spawn(async move {
while let Some(job) = rx.recv().await {
if let Err(error) = process(job).await {
tracing::warn!(%error, "job failed");
}
}
tracing::info!("queue closed, shutting down worker");
});
tx.send(Job { ... }).await?; // blocks if full, applies backpressure
Always bound channels. Unbounded channels are a memory leak waiting to happen.
futures::Stream is the async analogue of Iterator. Use it for paginated fetches, long-poll responses, file lines.
use futures::stream::{StreamExt, TryStreamExt};
let urls: Vec<String> = ...;
let bodies: Vec<String> = futures::stream::iter(urls)
.map(|url| async move { fetch(&url).await })
.buffer_unordered(8) // up to 8 in flight
.try_collect()
.await?;
buffer_unordered(n) is the throttle. Use it instead of spawning N tasks manually.
For producing a stream from a channel:
use tokio_stream::wrappers::ReceiverStream;
let (tx, rx) = tokio::sync::mpsc::channel::<Event>(64);
let stream = ReceiverStream::new(rx);
serve_sse(stream).await
use tokio::signal;
async fn shutdown_signal() {
let ctrl_c = async { signal::ctrl_c().await.expect("ctrl_c handler") };
#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("install signal handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {},
_ = terminate => {},
}
tracing::info!("shutdown signal received");
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let token = CancellationToken::new();
let server = tokio::spawn(run_server(token.child_token()));
shutdown_signal().await;
token.cancel();
let _ = tokio::time::timeout(Duration::from_secs(10), server).await;
Ok(())
}
Pattern: catch signal → cancel a token shared with the server → server's select! arms see the cancel and exit cleanly → wait with a timeout so a hung worker can't deadlock shutdown.
tokio::sync::Mutex — async mutex. Use for state shared between async tasks. Do not hold across .await without thinking (you'll serialize the whole system).tokio::sync::RwLock — async read-write lock. Same caveat.parking_lot::Mutex — sync mutex, faster than std::sync::Mutex, no poisoning. Use when the lock is held briefly and you do not need to .await while holding it.tokio::sync::Semaphore — bound concurrent operations. Perfect for "max 10 in-flight HTTP requests" or "max 3 DB writers".let sem = Arc::new(tokio::sync::Semaphore::new(10));
for url in urls {
let permit = sem.clone().acquire_owned().await?;
tokio::spawn(async move {
let _permit = permit; // released on task end
fetch(&url).await
});
}
.await. Compiles and runs, deadlocks at scale. Solution: refactor to release before await, or use tokio::sync::Mutex.? on JoinHandle. A panicked task returns Err(JoinError); if you .await and ignore, panics are silently swallowed.tokio::spawn instead of JoinSet. Detached tasks survive past their parent, causing leaks. Default to JoinSet for structured concurrency.block_on inside an async context. Causes deadlock under current_thread runtime, performance cliff under multi_thread.spawn_blocking or rayon.await that touches the network or filesystem needs tokio::time::timeout wrapping.#[tokio::test]
async fn fetches_and_parses() {
let server = wiremock::MockServer::start().await;
wiremock::Mock::given(wiremock::matchers::method("GET"))
.respond_with(wiremock::ResponseTemplate::new(200).set_body_string("{\"id\":1}"))
.mount(&server)
.await;
let result = my_client::fetch(&server.uri()).await.unwrap();
assert_eq!(result.id, 1);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn parallel_work() { ... }
For time-sensitive tests, advance virtual time:
#[tokio::test(start_paused = true)]
async fn time_travel() {
let start = tokio::time::Instant::now();
tokio::time::sleep(Duration::from_secs(3600)).await;
assert!(start.elapsed() >= Duration::from_secs(3600));
// Real wallclock elapsed: ~0ms.
}
fn + rayon is simpler and often faster.ureq (sync) is simpler.Async pays off when you have many concurrent I/O operations or need cancellation as a first-class primitive.