6. Building Futures by Hand 🟡
What you'll learn:
- Implementing a
TimerFuturewith thread-based waking- Building a
Joincombinator: run two futures concurrently- Building a
Selectcombinator: race two futures- How combinators compose — futures all the way down
A Simple Timer Future
Now let's build real, useful futures from scratch. This cements the theory from chapters 2-5.
TimerFuture: A Complete Example
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::thread;
use std::time::{Duration, Instant};
pub struct TimerFuture {
shared_state: Arc<Mutex<SharedState>>,
}
struct SharedState {
completed: bool,
waker: Option<Waker>,
}
impl TimerFuture {
pub fn new(duration: Duration) -> Self {
let shared_state = Arc::new(Mutex::new(SharedState {
completed: false,
waker: None,
}));
// Spawn a thread that sets completed=true after the duration
let thread_shared_state = Arc::clone(&shared_state);
thread::spawn(move || {
thread::sleep(duration);
let mut state = thread_shared_state.lock().unwrap();
state.completed = true;
if let Some(waker) = state.waker.take() {
waker.wake(); // Notify the executor
}
});
TimerFuture { shared_state }
}
}
impl Future for TimerFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let mut state = self.shared_state.lock().unwrap();
if state.completed {
Poll::Ready(())
} else {
// Store the waker so the timer thread can wake us
// IMPORTANT: Always update the waker — the executor may
// have changed it between polls
state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
// Usage:
// async fn example() {
// println!("Starting timer...");
// TimerFuture::new(Duration::from_secs(2)).await;
// println!("Timer done!");
// }
//
// ⚠️ This spawns an OS thread per timer — fine for learning, but in
// production use `tokio::time::sleep` which is backed by a shared
// timer wheel and requires zero extra threads.
Join: Running Two Futures Concurrently
Join polls two futures and completes when both finish. This is how tokio::join! works internally:
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
/// Polls two futures concurrently, returns both results as a tuple
pub struct Join<A, B>
where
A: Future,
B: Future,
{
a: MaybeDone<A>,
b: MaybeDone<B>,
}
enum MaybeDone<F: Future> {
Pending(F),
Done(F::Output),
Taken, // Output has been taken
}
impl<A, B> Join<A, B>
where
A: Future,
B: Future,
{
pub fn new(a: A, b: B) -> Self {
Join {
a: MaybeDone::Pending(a),
b: MaybeDone::Pending(b),
}
}
}
impl<A, B> Future for Join<A, B>
where
A: Future + Unpin,
B: Future + Unpin,
{
type Output = (A::Output, B::Output);
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Poll A if not done
if let MaybeDone::Pending(ref mut fut) = self.a {
if let Poll::Ready(val) = Pin::new(fut).poll(cx) {
self.a = MaybeDone::Done(val);
}
}
// Poll B if not done
if let MaybeDone::Pending(ref mut fut) = self.b {
if let Poll::Ready(val) = Pin::new(fut).poll(cx) {
self.b = MaybeDone::Done(val);
}
}
// Both done?
match (&self.a, &self.b) {
(MaybeDone::Done(_), MaybeDone::Done(_)) => {
// Take both outputs
let a_val = match std::mem::replace(&mut self.a, MaybeDone::Taken) {
MaybeDone::Done(v) => v,
_ => unreachable!(),
};
let b_val = match std::mem::replace(&mut self.b, MaybeDone::Taken) {
MaybeDone::Done(v) => v,
_ => unreachable!(),
};
Poll::Ready((a_val, b_val))
}
_ => Poll::Pending, // At least one is still pending
}
}
}
// Usage:
// let (page1, page2) = Join::new(
// http_get("https://example.com/a"),
// http_get("https://example.com/b"),
// ).await;
// Both requests run concurrently!
Key insight: "Concurrent" here means interleaved on the same thread. Join doesn't spawn threads — it polls both futures in the same
poll()call. This is cooperative concurrency, not parallelism.
graph LR
subgraph "Future Combinators"
direction TB
TIMER["TimerFuture<br/>Single future, wake after delay"]
JOIN["Join<A, B><br/>Wait for BOTH"]
SELECT["Select<A, B><br/>Wait for FIRST"]
RETRY["RetryFuture<br/>Re-create on failure"]
end
TIMER --> JOIN
TIMER --> SELECT
SELECT --> RETRY
style TIMER fill:#d4efdf,stroke:#27ae60,color:#000
style JOIN fill:#e8f4f8,stroke:#2980b9,color:#000
style SELECT fill:#fef9e7,stroke:#f39c12,color:#000
style RETRY fill:#fadbd8,stroke:#e74c3c,color:#000
Select: Racing Two Futures
Select completes when either future finishes first (the other is dropped):
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
pub enum Either<A, B> {
Left(A),
Right(B),
}
/// Returns whichever future completes first; drops the other
pub struct Select<A, B> {
a: A,
b: B,
}
impl<A, B> Select<A, B>
where
A: Future + Unpin,
B: Future + Unpin,
{
pub fn new(a: A, b: B) -> Self {
Select { a, b }
}
}
impl<A, B> Future for Select<A, B>
where
A: Future + Unpin,
B: Future + Unpin,
{
type Output = Either<A::Output, B::Output>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Poll A first
if let Poll::Ready(val) = Pin::new(&mut self.a).poll(cx) {
return Poll::Ready(Either::Left(val));
}
// Then poll B
if let Poll::Ready(val) = Pin::new(&mut self.b).poll(cx) {
return Poll::Ready(Either::Right(val));
}
Poll::Pending
}
}
// Usage with timeout:
// match Select::new(http_get(url), TimerFuture::new(timeout)).await {
// Either::Left(response) => println!("Got response: {}", response),
// Either::Right(()) => println!("Request timed out!"),
// }
<details> <summary><strong>🏋️ Exercise: Build a RetryFuture</strong> (click to expand)</summary>Fairness note: Our
Selectalways polls A first — if both are ready, A always wins. Tokio'sselect!macro randomizes the poll order for fairness.
Challenge: Build a RetryFuture<F, Fut> that takes a closure F: Fn() -> Fut and retries up to N times if the inner future returns Err. It should return the first Ok result or the last Err.
Hint: You'll need states for "running attempt" and "all attempts exhausted."
<details> <summary>🔑 Solution</summary>use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
pub struct RetryFuture<F, Fut, T, E>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<T, E>> + Unpin,
{
factory: F,
current: Option<Fut>,
remaining: usize,
last_error: Option<E>,
}
impl<F, Fut, T, E> RetryFuture<F, Fut, T, E>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<T, E>> + Unpin,
{
pub fn new(max_attempts: usize, factory: F) -> Self {
let current = Some((factory)());
RetryFuture {
factory,
current,
remaining: max_attempts.saturating_sub(1),
last_error: None,
}
}
}
impl<F, Fut, T, E> Future for RetryFuture<F, Fut, T, E>
where
F: Fn() -> Fut + Unpin,
Fut: Future<Output = Result<T, E>> + Unpin,
T: Unpin,
E: Unpin,
{
type Output = Result<T, E>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
if let Some(ref mut fut) = self.current {
match Pin::new(fut).poll(cx) {
Poll::Ready(Ok(val)) => return Poll::Ready(Ok(val)),
Poll::Ready(Err(e)) => {
self.last_error = Some(e);
if self.remaining > 0 {
self.remaining -= 1;
self.current = Some((self.factory)());
// Loop to poll the new future immediately
} else {
return Poll::Ready(Err(self.last_error.take().unwrap()));
}
}
Poll::Pending => return Poll::Pending,
}
} else {
return Poll::Ready(Err(self.last_error.take().unwrap()));
}
}
}
}
// Usage:
// let result = RetryFuture::new(3, || async {
// http_get("https://flaky-server.com/api").await
// }).await;
Key takeaway: The retry future is itself a state machine: it holds the current attempt and creates new inner futures on failure. This is how combinators compose — futures all the way down.
</details> </details>Key Takeaways — Building Futures by Hand
- A future needs three things: state, a
poll()implementation, and a waker registrationJoinpolls both sub-futures;Selectreturns whichever finishes first- Combinators are themselves futures wrapping other futures — it's turtles all the way down
- Building futures by hand gives deep insight, but in production use
tokio::join!/select!
See also: Ch 2 — The Future Trait for the trait definition, Ch 8 — Tokio Deep Dive for production-grade equivalents