rust/common/lifecycle/README.md
Unified app lifecycle management for K8s services. All features are opt-in — use only what your app needs.
Core (always active): signal trapping, component registration with RAII drop guards, coordinated global shutdown, readiness probe,
Opt-in: active health reporting with stall detection (with_liveness_deadline), graceful shutdown with per-component timeouts, liveness probe, pre-stop file polling.
Summary of how the library works and can be intergrated into your Rust services. All referenced features, APIs, and config details are documented below this section.
lifecycle::Manager and configure:
preStop handlingManager and configure:
builder.is_observability(true)Handlemain:
let _completed = handle.process_scope();handle.work_completed(); prior to a clean exithandle.report_healthy(); regularlyhandle.report_unhealthy(); before unclean exitmain.rs, call manager.monitor(); or monitor_background()returnCreate a manager via Manager::builder("name"), register components, then run the monitor. Register all components before starting the monitor.
use lifecycle::{ComponentOptions, Manager};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut manager = Manager::builder("my-service").build();
let handle = manager.register("consumer", ComponentOptions::new());
// ... spawn tasks using the handle ...
manager.monitor().await?;
Ok(())
}
use std::time::Duration;
use lifecycle::{ComponentOptions, Manager};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut manager = Manager::builder("my-service")
.with_global_shutdown_timeout(Duration::from_secs(30))
.build();
let handle = manager.register(
"consumer",
ComponentOptions::new()
.with_graceful_shutdown(Duration::from_secs(10)) // per-component shutdown budget
.with_liveness_deadline(Duration::from_secs(30)) // must call report_healthy() within this
.with_stall_threshold(2), // 2 consecutive stalled checks before shutdown
);
// ... spawn component tasks, wire up HTTP routes ...
// Option A: blocking — monitor runs until all components finish or time out
manager.monitor().await?;
// Option B: background — returns a guard; await it after your HTTP server exits
// let guard = manager.monitor_background();
// axum::serve(...).with_graceful_shutdown(shutdown).await?;
// guard.wait().await?;
Ok(())
}
All options except name have sensible defaults.
| Method | Effect | Default |
|---|---|---|
Manager::builder(name) | Start building a manager. name is emitted as the service_name label on all metrics. | — (required) |
.with_global_shutdown_timeout(duration) | Hard ceiling on total shutdown. Monitor returns ShutdownTimeout if exceeded. Prevents indefinite hangs if a component doesn't check for cancellation. (see test global_timeout_fires_when_component_hangs) | 60s |
.with_trap_signals(bool) | Install SIGINT/SIGTERM handlers. Set false in tests. | true |
.with_prestop_check(bool) | Poll for pre-stop shutdown file (K8s pre-stop hook pattern). | true |
.with_prestop_path(path) | Override the pre-stop file path. | /tmp/shutdown |
.with_health_poll_interval(duration) | Override health monitor poll frequency. The health monitor is automatically active when any component has with_liveness_deadline. (see test stall_triggers_shutdown) | 5s |
.with_shutdown_token(token) | Caller supplies an external CancellationToken. Use in tests to trigger deterministic, app-global shutdown (simulate SIGTERM from k8s etc.) | None |
.build() | Consume the builder and produce a Manager. | — |
register(tag, options) — tag is a &str identifier for the component (used in metrics/logs). options is built with the ComponentOptions builder:
| Method | Effect | Default |
|---|---|---|
ComponentOptions::new() | Base options with defaults for all fields. | — |
.is_observability(bool) | Mark as an observability handle (e.g. metrics server). Shut down after all standard components finish. Cannot combine with with_liveness_deadline. (see test observability_handle_shuts_down_after_standard_handles) | false |
.is_advisory(bool) | Mark as advisory. Participates in health monitoring (gauge, is_healthy()) but stalls do NOT trigger shutdown. Monitor does not wait for advisory handles during shutdown. Requires with_liveness_deadline. Cannot combine with is_observability. (see test advisory_handle_stall_does_not_trigger_shutdown) | false |
.with_graceful_shutdown(duration) | Max time for this component to clean up after shutdown begins. Exceeded = marked timed out. (see test component_timeout_then_late_drop_preserves_timeout) | None — waits indefinitely (bounded by global_shutdown_timeout). Observability handles default to 1s if unset. |
.with_liveness_deadline(duration) | Component must call report_healthy() within this interval or the health monitor considers it stalled. After stall_threshold consecutive stalled checks, the manager triggers global shutdown. (see test stall_triggers_shutdown) | None — no health monitoring |
.with_stall_threshold(n) | Number of consecutive stalled health checks before the manager triggers global shutdown. Set higher for tolerance of transient hiccups. Only meaningful with with_liveness_deadline. (see test stall_threshold_allows_recovery) | 1 — immediate shutdown on first stall |
Readiness (/_readiness) returns 200 until shutdown begins, then 503. K8s uses this to stop routing traffic to the pod. No per-component logic — it's purely "is the app accepting work?" (see test readiness_200_until_shutdown_then_503)
Liveness (/_liveness) always returns 200 — it means "the process is reachable." Health monitoring is handled internally by the manager's health poll task, not by K8s. When a component's heartbeat deadline expires, the health monitor increments a stall counter. After stall_threshold consecutive stalled checks, the manager triggers global shutdown via the same ComponentEvent::Failure path as signal_failure(). This ensures the app always gets coordinated graceful shutdown instead of K8s surprise-killing the pod. (see tests stall_triggers_shutdown, stall_threshold_exceeded_triggers_shutdown, stall_threshold_allows_recovery)
Components in Starting state (never called report_healthy()) are skipped by the health monitor — they won't trigger stall detection until they've reported healthy at least once. (see test starting_component_does_not_trigger_stall)
let metrics_handle = manager.register(
"metrics",
ComponentOptions::new().is_observability(true),
);
let readiness = manager.readiness_handler();
let liveness = manager.liveness_handler();
let app = Router::new()
.route("/_readiness", get({
let r = readiness.clone();
move || async move { r.check().await }
}))
.route("/_liveness", get({
let l = liveness.clone();
move || async move { l.check().into_response() }
}));
let guard = manager.monitor_background();
let listener = TcpListener::bind("0.0.0.0:8080").await?;
axum::serve(listener, app)
.with_graceful_shutdown(metrics_handle.shutdown_signal())
.await?;
metrics_handle.work_completed();
guard.wait().await?;
process_scope() (struct-held handle)Use when your component is a struct that owns a Handle and has a blocking/looping process() method. Call process_scope() at the top of process() — when the guard is dropped (process returns), the manager is notified once. (see tests component_a_clean_shutdown, component_b_clean_shutdown_with_do_work)
The handle can be freely passed by reference or clone into child methods. Child methods can call report_healthy(), report_unhealthy(), signal_failure(), or return errors that cause process() to return (which drops the guard). None of this interferes with the guard. (see test component_b_do_work_signals_failure for error propagation from a child method)
struct MyConsumer {
handle: lifecycle::Handle,
}
impl MyConsumer {
async fn process(&self) {
let _guard = self.handle.process_scope();
loop {
tokio::select! {
_ = self.handle.shutdown_recv() => return,
result = self.do_work() => {
match result {
Ok(()) => self.handle.report_healthy(),
Err(reason) => {
self.handle.signal_failure(reason);
return; // guard dropped → manager notified
}
}
}
}
}
}
// Single select! — checks for cancellation alongside real work.
// Returning does NOT trigger the guard; only process() returning does.
async fn do_work(&self) -> Result<(), String> {
tokio::select! {
_ = self.handle.shutdown_recv() => Ok(()),
result = self.fetch_and_process() => result,
}
}
async fn fetch_and_process(&self) -> Result<(), String> { /* ... */ Ok(()) }
}
After process() returns, the struct can be dropped later without sending a duplicate event — the guard already signalled once. (see test process_scope_prevents_double_signal_from_struct)
process_scope() (handle moved into task)Use when you tokio::spawn and move the handle into the async block. The task IS the scope — when it returns, the last handle clone is dropped, and the drop guard notifies the manager. (see test direct_handle_drop_during_shutdown_is_completion)
async fn consumer_loop(handle: lifecycle::Handle) {
loop {
tokio::select! {
_ = handle.shutdown_recv() => return, // drop during shutdown = completion
msg = recv_message() => {
if let Err(e) = process(msg).await {
handle.signal_failure(e.to_string());
return; // signal_failure triggers shutdown; drop is fine
}
handle.report_healthy();
}
}
}
}
After signal_failure(), just return — the manager records the failure immediately and the subsequent handle drop is harmlessly ignored. For normal shutdown or request_shutdown(), just return too — drop during shutdown is treated as completion. For one-shot/finite work that completes during normal operation, call work_completed() to prevent the drop from signaling "died". (see test direct_work_completed_prevents_died_on_drop)
Use when your component polls for work (e.g. claim a job from a queue, process it, loop) rather than receiving pushed messages.
The key difference from the push-based patterns above: there's no inner tokio::select! on the work itself — instead, check is_shutting_down() at logical boundaries between units of work.
Active health reporting (with_liveness_deadline) is typically not needed here — individual network clients (S3, Kafka, PG) should have their own timeouts as the defense against hangs.
let job_handle = manager.register(
"job-loop",
ComponentOptions::new()
.with_graceful_shutdown(Duration::from_secs(30)),
);
let monitor = manager.monitor_background();
tokio::spawn(async move {
let _guard = job_handle.process_scope();
while !job_handle.is_shutting_down() {
let Some(job) = claim_next_job().await else {
// No work available — sleep, but wake immediately on shutdown
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(5)) => {},
_ = job_handle.shutdown_recv() => break,
}
continue;
};
// Process the job; check shutdown at logical boundaries
if let Err(e) = job.process().await {
tracing::error!("job failed: {e}");
}
}
});
monitor.wait().await?;
Key points:
process_scope() in the spawned task — the manager is notified when the task returns.is_shutting_down() as the while-loop condition — sync check, no allocation.tokio::select! on idle sleep + shutdown_recv() — responsive wakeup instead of sleeping through a shutdown signal.with_liveness_deadline — the worker is pull-based and may legitimately idle. Client-level timeouts (S3, PG, Kafka) prevent indefinite hangs in network calls.| Method | Use when |
|---|---|
shutdown_recv() | In tokio::select! to react to shutdown. Returns a borrowed future — zero allocation per call. (see test component_a_clean_shutdown) |
shutdown_signal() | Owned 'static future for passing to axum::serve(...).with_graceful_shutdown(handle.shutdown_signal()). Clones the internal token once. (see test handle_shutdown_signal_method) |
is_shutting_down() | Sync check (e.g. in a hot loop) to bail out. |
signal_failure(reason) | Fatal error; triggers global shutdown. Just return after calling it. |
request_shutdown() | Request clean shutdown (non-fatal). Then return (drop is enough). |
work_completed() | One-shot/finite work that completes during normal operation (prevents the handle drop from signaling "died"). Not needed for long-running components — drop during shutdown is treated as completion. |
process_scope() | Returns a ProcessScopeGuard. Ties lifecycle signaling to a method scope instead of handle drop. Use when your struct owns the handle. |
report_healthy() | Liveness heartbeat. Must be called more often than liveness_deadline. Missed deadlines increment a stall counter; after stall_threshold consecutive stalled checks, global shutdown is triggered. |
report_unhealthy() | Mark this component unhealthy. Treated the same as a stalled heartbeat by the health monitor. For immediate shutdown, use signal_failure() instead. |
is_healthy() | With liveness_deadline: returns the health poll task's latest result (~1ns AtomicBool load). Without liveness_deadline: returns !is_shutting_down(). Safe for hot paths. (see test advisory_handle_is_healthy) |
panic_in_task_with_process_scope_signals_died)monitor() or monitor_background(). The manager is consumed by those calls.with_liveness_deadline on any component. You must call report_healthy() more frequently than liveness_deadline, or the health monitor triggers global shutdown after stall_threshold consecutive stalled checks. Components that haven't called report_healthy() yet (Starting state) are skipped. Use with_health_poll_interval on the builder to tune poll frequency (default 5s).process() is the run method, use process_scope(). Otherwise the manager is only notified when the struct is dropped, not when process() returns.Observability handles (is_observability(true)) implement two-phase shutdown: they stay alive while standard components drain, so metrics/health endpoints remain available throughout graceful shutdown.
Phase 1: Normal operation — all components running
Phase 2: Standard drain — standard shutdown_token cancelled, standard components drain
Phase 3: Observability drain — observability shutdown_token cancelled, obs components drain (default 1s timeout)
Use is_observability(true) for components that serve infrastructure endpoints (metrics, health, pprof) and should outlive the app's business logic during shutdown. The canonical example is a metrics HTTP server.
CancellationToken: Observability handles receive a different token than standard handles. handle.shutdown_recv() and handle.shutdown_signal() resolve at the right time for each tier automatically.with_liveness_deadline (panics at registration). They're lightweight infrastructure; active heartbeating would add complexity for no benefit.tracing logs for observability component outcomes. Since the metrics server itself may be draining, metric emission would be unreliable.with_graceful_shutdown get a 1-second timeout. Override with .with_graceful_shutdown(duration) if your metrics server needs longer.signal_failure() during Phase 1 triggers global shutdown for all components, same as a standard handle. (see test observability_failure_during_normal_operation_triggers_shutdown)let mut manager = Manager::builder("my-service").build();
let consumer_handle = manager.register("consumer", ComponentOptions::new()
.with_graceful_shutdown(Duration::from_secs(10))
.with_liveness_deadline(Duration::from_secs(30)));
let metrics_handle = manager.register("metrics", ComponentOptions::new()
.is_observability(true));
let readiness = manager.readiness_handler();
let liveness = manager.liveness_handler();
let guard = manager.monitor_background();
// Consumer task — uses standard shutdown token
tokio::spawn(async move {
let _scope = consumer_handle.process_scope();
loop {
tokio::select! {
_ = consumer_handle.shutdown_recv() => return,
msg = recv_message() => {
process(msg).await;
consumer_handle.report_healthy();
}
}
}
});
// Metrics server — uses observability shutdown token, stays alive during consumer drain
let app = Router::new()
.route("/_readiness", get(move || async move { readiness.check().await }))
.route("/_liveness", get(move || async move { liveness.check().into_response() }));
let listener = TcpListener::bind("0.0.0.0:9090").await?;
axum::serve(listener, app)
.with_graceful_shutdown(metrics_handle.shutdown_signal())
.await?;
metrics_handle.work_completed();
guard.wait().await?;
Advisory handles (is_advisory(true)) participate in health monitoring — the health poll task updates their lifecycle_component_healthy gauge and their is_healthy() flag — but stalls do not trigger global shutdown.
The monitor does not wait for advisory handles during shutdown.
Use is_advisory(true) when one component needs to observe another component's health without coupling it to the app's shutdown decision.
The canonical example is FallbackSink: it needs to know if the primary Kafka sink is healthy to decide whether to route traffic to S3, but a Kafka stall shouldn't kill the app when a fallback is available.
lifecycle_component_healthy for advisory handles — dashboards see the same metrics as standard handles.is_healthy() works: Returns the poll task's latest health assessment via a shared AtomicBool (~1ns read). Starts true (Starting state is healthy). Flips to false after stall_threshold consecutive stalled/unhealthy polls. Flips back on recovery. Lags behind report_healthy() by up to health_poll_interval.stall_threshold gates when is_healthy() flips to false. The only difference is that advisory handles never send ComponentEvent::Failure, so they never trigger global shutdown.shutdown_token, so shutdown_recv() and is_shutting_down() work normally for cooperative cleanup.with_liveness_deadline: The health poll task needs a deadline to evaluate. Registration panics without it.is_observability: Advisory and observability are mutually exclusive.with_graceful_shutdown: Advisory handles are not in the component maps, so graceful shutdown timeouts would be silently ignored. Registration panics if both are set.let mut manager = Manager::builder("capture").build();
// Primary sink — standard handle with liveness monitoring
let kafka_handle = manager.register("sink", ComponentOptions::new()
.with_liveness_deadline(Duration::from_secs(45))
.with_stall_threshold(4));
// Advisory handle for FallbackSink to observe Kafka health without
// triggering app shutdown when Kafka stalls
let advisory_handle = manager.register("sink-advisory", ComponentOptions::new()
.is_advisory(true)
.with_liveness_deadline(Duration::from_secs(45))
.with_stall_threshold(4));
// Both handles are fed by the same rdkafka stats callback:
// kafka_handle.report_healthy(); // keeps the standard handle alive
// advisory_handle.report_healthy(); // keeps the advisory handle's is_healthy() current
//
// FallbackSink calls advisory_handle.is_healthy() on every send() to
// decide whether to route to the primary Kafka sink or the S3 fallback.
The crate emits metrics via the metrics facade (no recorder installed by this crate; the parent app does that). All metrics are segmented by **service_name**: set the name in Manager::builder("name") to your app's service name (e.g. K8s service name or logical app name) so dashboards and alerts can filter by service.
| Metric | Type | Labels | When emitted |
|---|---|---|---|
lifecycle_shutdown_initiated_total | Counter | service_name, trigger_component, trigger_reason | Once when shutdown begins |
lifecycle_component_shutdown_duration_seconds | Histogram | service_name, component, result | Once per component at completion/timeout/death |
lifecycle_component_shutdown_result_total | Counter | service_name, component, result | Once per component at completion/timeout/death |
lifecycle_shutdown_completed_total | Counter | service_name, clean | Once when monitor returns successfully |
lifecycle_component_healthy | Gauge | service_name, component | Continuously during normal operation |
Label values: trigger_reason = signal, failure, requested, died; result = completed, timeout, died; clean = true / false.
lifecycle_shutdown_completed_total is not emitted on global timeout or if the process is killed; that asymmetry with lifecycle_shutdown_initiated_total is how incomplete shutdowns (e.g. SIGKILL) are detected.
Manager::builder("kafka-deduplicator"). This value is emitted as the service_name label on every lifecycle metric.service_name) of type Query that lists values:
label_values(lifecycle_shutdown_initiated_total, service_name)
so users can filter panels by service./metrics); the lifecycle crate only emits to the metrics facade; the app wires the recorder/exporter.service_name)increase(lifecycle_shutdown_initiated_total{service_name="$service_name"}[$__range])
by trigger_component, trigger_reason.histogram_quantile(0.95, rate(lifecycle_component_shutdown_duration_seconds_bucket{service_name="$service_name"}[5m]))
by component, result.sum by (component, result) increase(lifecycle_component_shutdown_result_total{service_name="$service_name"}[$__range]).increase(lifecycle_shutdown_initiated_total{service_name="$service_name"}[1h]) - increase(lifecycle_shutdown_completed_total{service_name="$service_name"}[1h]).lifecycle_component_healthy{service_name="$service_name"}
(one row per component).service_name)increase(lifecycle_component_shutdown_result_total{service_name="$service_name", result="timeout"}[5m]) > 0increase(lifecycle_shutdown_initiated_total{service_name="$service_name"}[1h]) - increase(lifecycle_shutdown_completed_total{service_name="$service_name"}[1h]) > 0lifecycle_component_healthy{service_name="$service_name"} is 0 for a given component for 2 scrape intervals.For deterministic shutdown in tests, use with_shutdown_token on the builder. The test holds the token and calls token.cancel() to trigger shutdown:
use tokio_util::sync::CancellationToken;
let shutdown_token = CancellationToken::new();
let mut manager = Manager::builder("test-service")
.with_trap_signals(false)
.with_prestop_check(false)
.with_shutdown_token(shutdown_token.clone())
.build();
// ... register components, spawn monitor_background, spawn component tasks ...
shutdown_token.cancel(); // test triggers shutdown
guard.wait().await?;
SIGKILL cannot be trapped. Detection is by absence:
{app="my-worker"} |= "Lifecycle: shutdown initiated" and absence of |= "Lifecycle: shutdown complete".increase(lifecycle_shutdown_initiated_total[1h]) - increase(lifecycle_shutdown_completed_total[1h]) > 0 means some shutdowns did not complete.