Back to Posthog

lifecycle

rust/common/lifecycle/README.md

1.43.132.2 KB
Original Source

lifecycle

Unified app lifecycle management for K8s services. All features are opt-in — use only what your app needs.

Core (always active): signal trapping, component registration with RAII drop guards, coordinated global shutdown, readiness probe,

Opt-in: active health reporting with stall detection (with_liveness_deadline), graceful shutdown with per-component timeouts, liveness probe, pre-stop file polling.

Quick Start

Summary of how the library works and can be intergrated into your Rust services. All referenced features, APIs, and config details are documented below this section.

  • Create a lifecycle::Manager and configure:
    • Optional k8s preStop handling
    • Override global graceful shutdown max timeout
  • Register each app component on Manager and configure:
    • Optional active health reporting (usually not needed)
    • Optional graceful shutdown timeout (overrides global)
    • If handle is for metrics server (etc.) set builder.is_observability(true)
    • Returns a Handle
  • Pass or clone the health handle to the component prior to blocking in main:
    • If component is a function, clone handle into it's scope
    • If component is a struct, store handle as a field
    • If the component runs for the full app lifecycle:
      • Create a drop guard at the top of the processing loop
      • Example: let _completed = handle.process_scope();
      • Optional: call handle.work_completed(); prior to a clean exit
      • Return an error or panic to signal completion and app shutdown
    • If using active health reporting:
      • Call handle.report_healthy(); regularly
      • Call handle.report_unhealthy(); before unclean exit
      • Note: these can be called from sync or async they are fast and nonblocking
      • Ensure you report health within the configured interval and number of allowed unhealthy polls
  • Prior to the end of main.rs, call manager.monitor(); or monitor_background()
  • Shutdown coordination:
    • Component checks health handle for app shutdown at logical work boundaries
    • If shutdown signaled, perform cleanup as needed and return
    • Handle has APIs for async and sync shutdown checks
    • Handle can cloned or referenced as needed for checks in nested calls
  • Processing loop should break on shutdown signal:
    • Graceful: perform cleanup prior to returning from loop function

Manager setup

Create a manager via Manager::builder("name"), register components, then run the monitor. Register all components before starting the monitor.

Minimal (shutdown coordination only)

rust
use lifecycle::{ComponentOptions, Manager};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut manager = Manager::builder("my-service").build();

    let handle = manager.register("consumer", ComponentOptions::new());

    // ... spawn tasks using the handle ...
    manager.monitor().await?;
    Ok(())
}
rust
use std::time::Duration;
use lifecycle::{ComponentOptions, Manager};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut manager = Manager::builder("my-service")
        .with_global_shutdown_timeout(Duration::from_secs(30))
        .build();

    let handle = manager.register(
        "consumer",
        ComponentOptions::new()
            .with_graceful_shutdown(Duration::from_secs(10))  // per-component shutdown budget
            .with_liveness_deadline(Duration::from_secs(30))  // must call report_healthy() within this
            .with_stall_threshold(2),                         // 2 consecutive stalled checks before shutdown
    );

    // ... spawn component tasks, wire up HTTP routes ...

    // Option A: blocking — monitor runs until all components finish or time out
    manager.monitor().await?;

    // Option B: background — returns a guard; await it after your HTTP server exits
    // let guard = manager.monitor_background();
    // axum::serve(...).with_graceful_shutdown(shutdown).await?;
    // guard.wait().await?;

    Ok(())
}

Manager::builder()

All options except name have sensible defaults.

MethodEffectDefault
Manager::builder(name)Start building a manager. name is emitted as the service_name label on all metrics.— (required)
.with_global_shutdown_timeout(duration)Hard ceiling on total shutdown. Monitor returns ShutdownTimeout if exceeded. Prevents indefinite hangs if a component doesn't check for cancellation. (see test global_timeout_fires_when_component_hangs)60s
.with_trap_signals(bool)Install SIGINT/SIGTERM handlers. Set false in tests.true
.with_prestop_check(bool)Poll for pre-stop shutdown file (K8s pre-stop hook pattern).true
.with_prestop_path(path)Override the pre-stop file path./tmp/shutdown
.with_health_poll_interval(duration)Override health monitor poll frequency. The health monitor is automatically active when any component has with_liveness_deadline. (see test stall_triggers_shutdown)5s
.with_shutdown_token(token)Caller supplies an external CancellationToken. Use in tests to trigger deterministic, app-global shutdown (simulate SIGTERM from k8s etc.)None
.build()Consume the builder and produce a Manager.

register() / ComponentOptions

register(tag, options)tag is a &str identifier for the component (used in metrics/logs). options is built with the ComponentOptions builder:

MethodEffectDefault
ComponentOptions::new()Base options with defaults for all fields.
.is_observability(bool)Mark as an observability handle (e.g. metrics server). Shut down after all standard components finish. Cannot combine with with_liveness_deadline. (see test observability_handle_shuts_down_after_standard_handles)false
.is_advisory(bool)Mark as advisory. Participates in health monitoring (gauge, is_healthy()) but stalls do NOT trigger shutdown. Monitor does not wait for advisory handles during shutdown. Requires with_liveness_deadline. Cannot combine with is_observability. (see test advisory_handle_stall_does_not_trigger_shutdown)false
.with_graceful_shutdown(duration)Max time for this component to clean up after shutdown begins. Exceeded = marked timed out. (see test component_timeout_then_late_drop_preserves_timeout)None — waits indefinitely (bounded by global_shutdown_timeout). Observability handles default to 1s if unset.
.with_liveness_deadline(duration)Component must call report_healthy() within this interval or the health monitor considers it stalled. After stall_threshold consecutive stalled checks, the manager triggers global shutdown. (see test stall_triggers_shutdown)None — no health monitoring
.with_stall_threshold(n)Number of consecutive stalled health checks before the manager triggers global shutdown. Set higher for tolerance of transient hiccups. Only meaningful with with_liveness_deadline. (see test stall_threshold_allows_recovery)1 — immediate shutdown on first stall

K8s readiness and liveness

Readiness (/_readiness) returns 200 until shutdown begins, then 503. K8s uses this to stop routing traffic to the pod. No per-component logic — it's purely "is the app accepting work?" (see test readiness_200_until_shutdown_then_503)

Liveness (/_liveness) always returns 200 — it means "the process is reachable." Health monitoring is handled internally by the manager's health poll task, not by K8s. When a component's heartbeat deadline expires, the health monitor increments a stall counter. After stall_threshold consecutive stalled checks, the manager triggers global shutdown via the same ComponentEvent::Failure path as signal_failure(). This ensures the app always gets coordinated graceful shutdown instead of K8s surprise-killing the pod. (see tests stall_triggers_shutdown, stall_threshold_exceeded_triggers_shutdown, stall_threshold_allows_recovery)

Components in Starting state (never called report_healthy()) are skipped by the health monitor — they won't trigger stall detection until they've reported healthy at least once. (see test starting_component_does_not_trigger_stall)

Axum route setup

rust
let metrics_handle = manager.register(
    "metrics",
    ComponentOptions::new().is_observability(true),
);
let readiness = manager.readiness_handler();
let liveness = manager.liveness_handler();

let app = Router::new()
    .route("/_readiness", get({
        let r = readiness.clone();
        move || async move { r.check().await }
    }))
    .route("/_liveness", get({
        let l = liveness.clone();
        move || async move { l.check().into_response() }
    }));

let guard = manager.monitor_background();

let listener = TcpListener::bind("0.0.0.0:8080").await?;
axum::serve(listener, app)
    .with_graceful_shutdown(metrics_handle.shutdown_signal())
    .await?;
metrics_handle.work_completed();

guard.wait().await?;

Using the handle

With process_scope() (struct-held handle)

Use when your component is a struct that owns a Handle and has a blocking/looping process() method. Call process_scope() at the top of process() — when the guard is dropped (process returns), the manager is notified once. (see tests component_a_clean_shutdown, component_b_clean_shutdown_with_do_work)

The handle can be freely passed by reference or clone into child methods. Child methods can call report_healthy(), report_unhealthy(), signal_failure(), or return errors that cause process() to return (which drops the guard). None of this interferes with the guard. (see test component_b_do_work_signals_failure for error propagation from a child method)

rust
struct MyConsumer {
    handle: lifecycle::Handle,
}

impl MyConsumer {
    async fn process(&self) {
        let _guard = self.handle.process_scope();
        loop {
            tokio::select! {
                _ = self.handle.shutdown_recv() => return,
                result = self.do_work() => {
                    match result {
                        Ok(()) => self.handle.report_healthy(),
                        Err(reason) => {
                            self.handle.signal_failure(reason);
                            return; // guard dropped → manager notified
                        }
                    }
                }
            }
        }
    }

    // Single select! — checks for cancellation alongside real work.
    // Returning does NOT trigger the guard; only process() returning does.
    async fn do_work(&self) -> Result<(), String> {
        tokio::select! {
            _ = self.handle.shutdown_recv() => Ok(()),
            result = self.fetch_and_process() => result,
        }
    }

    async fn fetch_and_process(&self) -> Result<(), String> { /* ... */ Ok(()) }
}

After process() returns, the struct can be dropped later without sending a duplicate event — the guard already signalled once. (see test process_scope_prevents_double_signal_from_struct)

Without process_scope() (handle moved into task)

Use when you tokio::spawn and move the handle into the async block. The task IS the scope — when it returns, the last handle clone is dropped, and the drop guard notifies the manager. (see test direct_handle_drop_during_shutdown_is_completion)

rust
async fn consumer_loop(handle: lifecycle::Handle) {
    loop {
        tokio::select! {
            _ = handle.shutdown_recv() => return, // drop during shutdown = completion
            msg = recv_message() => {
                if let Err(e) = process(msg).await {
                    handle.signal_failure(e.to_string());
                    return; // signal_failure triggers shutdown; drop is fine
                }
                handle.report_healthy();
            }
        }
    }
}

After signal_failure(), just return — the manager records the failure immediately and the subsequent handle drop is harmlessly ignored. For normal shutdown or request_shutdown(), just return too — drop during shutdown is treated as completion. For one-shot/finite work that completes during normal operation, call work_completed() to prevent the drop from signaling "died". (see test direct_work_completed_prevents_died_on_drop)

Pull-based worker pattern (poll-for-work loop)

Use when your component polls for work (e.g. claim a job from a queue, process it, loop) rather than receiving pushed messages. The key difference from the push-based patterns above: there's no inner tokio::select! on the work itself — instead, check is_shutting_down() at logical boundaries between units of work. Active health reporting (with_liveness_deadline) is typically not needed here — individual network clients (S3, Kafka, PG) should have their own timeouts as the defense against hangs.

rust
let job_handle = manager.register(
    "job-loop",
    ComponentOptions::new()
        .with_graceful_shutdown(Duration::from_secs(30)),
);
let monitor = manager.monitor_background();

tokio::spawn(async move {
    let _guard = job_handle.process_scope();

    while !job_handle.is_shutting_down() {
        let Some(job) = claim_next_job().await else {
            // No work available — sleep, but wake immediately on shutdown
            tokio::select! {
                _ = tokio::time::sleep(Duration::from_secs(5)) => {},
                _ = job_handle.shutdown_recv() => break,
            }
            continue;
        };

        // Process the job; check shutdown at logical boundaries
        if let Err(e) = job.process().await {
            tracing::error!("job failed: {e}");
        }
    }
});

monitor.wait().await?;

Key points:

  • process_scope() in the spawned task — the manager is notified when the task returns.
  • is_shutting_down() as the while-loop condition — sync check, no allocation.
  • tokio::select! on idle sleep + shutdown_recv() — responsive wakeup instead of sleeping through a shutdown signal.
  • No with_liveness_deadline — the worker is pull-based and may legitimately idle. Client-level timeouts (S3, PG, Kafka) prevent indefinite hangs in network calls.

Handle API summary

MethodUse when
shutdown_recv()In tokio::select! to react to shutdown. Returns a borrowed future — zero allocation per call. (see test component_a_clean_shutdown)
shutdown_signal()Owned 'static future for passing to axum::serve(...).with_graceful_shutdown(handle.shutdown_signal()). Clones the internal token once. (see test handle_shutdown_signal_method)
is_shutting_down()Sync check (e.g. in a hot loop) to bail out.
signal_failure(reason)Fatal error; triggers global shutdown. Just return after calling it.
request_shutdown()Request clean shutdown (non-fatal). Then return (drop is enough).
work_completed()One-shot/finite work that completes during normal operation (prevents the handle drop from signaling "died"). Not needed for long-running components — drop during shutdown is treated as completion.
process_scope()Returns a ProcessScopeGuard. Ties lifecycle signaling to a method scope instead of handle drop. Use when your struct owns the handle.
report_healthy()Liveness heartbeat. Must be called more often than liveness_deadline. Missed deadlines increment a stall counter; after stall_threshold consecutive stalled checks, global shutdown is triggered.
report_unhealthy()Mark this component unhealthy. Treated the same as a stalled heartbeat by the health monitor. For immediate shutdown, use signal_failure() instead.
is_healthy()With liveness_deadline: returns the health poll task's latest result (~1ns AtomicBool load). Without liveness_deadline: returns !is_shutting_down(). Safe for hot paths. (see test advisory_handle_is_healthy)

Common pitfalls

  1. Drop during normal operation — If the last handle (or process scope guard) is dropped while shutdown is not in progress, the manager treats it as "component died" and triggers shutdown. This catches panics and early returns. Dropping after shutdown begins is treated as normal completion. (see test panic_in_task_with_process_scope_signals_died)
  2. Register order — Register all components before calling monitor() or monitor_background(). The manager is consumed by those calls.
  3. Health monitoring — Activated by with_liveness_deadline on any component. You must call report_healthy() more frequently than liveness_deadline, or the health monitor triggers global shutdown after stall_threshold consecutive stalled checks. Components that haven't called report_healthy() yet (Starting state) are skipped. Use with_health_poll_interval on the builder to tune poll frequency (default 5s).
  4. Struct-held handles — If your struct owns the handle and process() is the run method, use process_scope(). Otherwise the manager is only notified when the struct is dropped, not when process() returns.

Observability handles

Observability handles (is_observability(true)) implement two-phase shutdown: they stay alive while standard components drain, so metrics/health endpoints remain available throughout graceful shutdown.

text
Phase 1: Normal operation — all components running
Phase 2: Standard drain — standard shutdown_token cancelled, standard components drain
Phase 3: Observability drain — observability shutdown_token cancelled, obs components drain (default 1s timeout)

When to use

Use is_observability(true) for components that serve infrastructure endpoints (metrics, health, pprof) and should outlive the app's business logic during shutdown. The canonical example is a metrics HTTP server.

Behavior

  • Separate CancellationToken: Observability handles receive a different token than standard handles. handle.shutdown_recv() and handle.shutdown_signal() resolve at the right time for each tier automatically.
  • No health monitoring: Observability handles cannot use with_liveness_deadline (panics at registration). They're lightweight infrastructure; active heartbeating would add complexity for no benefit.
  • No metrics during Phase 3: The manager only emits tracing logs for observability component outcomes. Since the metrics server itself may be draining, metric emission would be unreliable.
  • Default 1s timeout: Observability handles without explicit with_graceful_shutdown get a 1-second timeout. Override with .with_graceful_shutdown(duration) if your metrics server needs longer.
  • Failures during normal operation: An observability handle calling signal_failure() during Phase 1 triggers global shutdown for all components, same as a standard handle. (see test observability_failure_during_normal_operation_triggers_shutdown)

Example: metrics server as observability handle

rust
let mut manager = Manager::builder("my-service").build();

let consumer_handle = manager.register("consumer", ComponentOptions::new()
    .with_graceful_shutdown(Duration::from_secs(10))
    .with_liveness_deadline(Duration::from_secs(30)));

let metrics_handle = manager.register("metrics", ComponentOptions::new()
    .is_observability(true));

let readiness = manager.readiness_handler();
let liveness = manager.liveness_handler();
let guard = manager.monitor_background();

// Consumer task — uses standard shutdown token
tokio::spawn(async move {
    let _scope = consumer_handle.process_scope();
    loop {
        tokio::select! {
            _ = consumer_handle.shutdown_recv() => return,
            msg = recv_message() => {
                process(msg).await;
                consumer_handle.report_healthy();
            }
        }
    }
});

// Metrics server — uses observability shutdown token, stays alive during consumer drain
let app = Router::new()
    .route("/_readiness", get(move || async move { readiness.check().await }))
    .route("/_liveness", get(move || async move { liveness.check().into_response() }));

let listener = TcpListener::bind("0.0.0.0:9090").await?;
axum::serve(listener, app)
    .with_graceful_shutdown(metrics_handle.shutdown_signal())
    .await?;
metrics_handle.work_completed();

guard.wait().await?;

Advisory handles

Advisory handles (is_advisory(true)) participate in health monitoring — the health poll task updates their lifecycle_component_healthy gauge and their is_healthy() flag — but stalls do not trigger global shutdown. The monitor does not wait for advisory handles during shutdown.

When to use

Use is_advisory(true) when one component needs to observe another component's health without coupling it to the app's shutdown decision. The canonical example is FallbackSink: it needs to know if the primary Kafka sink is healthy to decide whether to route traffic to S3, but a Kafka stall shouldn't kill the app when a fallback is available.

Behavior

  • Health gauge updates: The poll task still writes lifecycle_component_healthy for advisory handles — dashboards see the same metrics as standard handles.
  • is_healthy() works: Returns the poll task's latest health assessment via a shared AtomicBool (~1ns read). Starts true (Starting state is healthy). Flips to false after stall_threshold consecutive stalled/unhealthy polls. Flips back on recovery. Lags behind report_healthy() by up to health_poll_interval.
  • Stall counting without shutdown: Stall counts are tracked the same as standard handles — stall_threshold gates when is_healthy() flips to false. The only difference is that advisory handles never send ComponentEvent::Failure, so they never trigger global shutdown.
  • Not waited on during shutdown: Advisory handles are not in the component maps, so the monitor's drain phases ignore them. The app shuts down as soon as all standard (and observability) components finish.
  • Standard shutdown token: Advisory handles receive the standard shutdown_token, so shutdown_recv() and is_shutting_down() work normally for cooperative cleanup.
  • Drop guard: Events from advisory handle drops are sent to the monitor channel but harmlessly ignored (no entry in component maps).
  • Requires with_liveness_deadline: The health poll task needs a deadline to evaluate. Registration panics without it.
  • Cannot combine with is_observability: Advisory and observability are mutually exclusive.
  • Cannot use with_graceful_shutdown: Advisory handles are not in the component maps, so graceful shutdown timeouts would be silently ignored. Registration panics if both are set.

Example: FallbackSink health observation

rust
let mut manager = Manager::builder("capture").build();

// Primary sink — standard handle with liveness monitoring
let kafka_handle = manager.register("sink", ComponentOptions::new()
    .with_liveness_deadline(Duration::from_secs(45))
    .with_stall_threshold(4));

// Advisory handle for FallbackSink to observe Kafka health without
// triggering app shutdown when Kafka stalls
let advisory_handle = manager.register("sink-advisory", ComponentOptions::new()
    .is_advisory(true)
    .with_liveness_deadline(Duration::from_secs(45))
    .with_stall_threshold(4));

// Both handles are fed by the same rdkafka stats callback:
//   kafka_handle.report_healthy();    // keeps the standard handle alive
//   advisory_handle.report_healthy(); // keeps the advisory handle's is_healthy() current
//
// FallbackSink calls advisory_handle.is_healthy() on every send() to
// decide whether to route to the primary Kafka sink or the S3 fallback.

Metrics

The crate emits metrics via the metrics facade (no recorder installed by this crate; the parent app does that). All metrics are segmented by **service_name**: set the name in Manager::builder("name") to your app's service name (e.g. K8s service name or logical app name) so dashboards and alerts can filter by service.

MetricTypeLabelsWhen emitted
lifecycle_shutdown_initiated_totalCounterservice_name, trigger_component, trigger_reasonOnce when shutdown begins
lifecycle_component_shutdown_duration_secondsHistogramservice_name, component, resultOnce per component at completion/timeout/death
lifecycle_component_shutdown_result_totalCounterservice_name, component, resultOnce per component at completion/timeout/death
lifecycle_shutdown_completed_totalCounterservice_name, cleanOnce when monitor returns successfully
lifecycle_component_healthyGaugeservice_name, componentContinuously during normal operation

Label values: trigger_reason = signal, failure, requested, died; result = completed, timeout, died; clean = true / false.

lifecycle_shutdown_completed_total is not emitted on global timeout or if the process is killed; that asymmetry with lifecycle_shutdown_initiated_total is how incomplete shutdowns (e.g. SIGKILL) are detected.

Grafana / Prometheus setup

  1. Service name — When creating the manager, set the name in Manager::builder("kafka-deduplicator"). This value is emitted as the service_name label on every lifecycle metric.
  2. Grafana variable — In dashboards, define a variable (e.g. service_name) of type Query that lists values: label_values(lifecycle_shutdown_initiated_total, service_name) so users can filter panels by service.
  3. Prometheus scrape — Ensure your app exposes the same metrics endpoint Prometheus scrapes (e.g. /metrics); the lifecycle crate only emits to the metrics facade; the app wires the recorder/exporter.
  • What triggered shutdown? (table) increase(lifecycle_shutdown_initiated_total{service_name="$service_name"}[$__range]) by trigger_component, trigger_reason.
  • Component shutdown duration (heatmap) histogram_quantile(0.95, rate(lifecycle_component_shutdown_duration_seconds_bucket{service_name="$service_name"}[5m])) by component, result.
  • Shutdown result breakdown (stacked bar) sum by (component, result) increase(lifecycle_component_shutdown_result_total{service_name="$service_name"}[$__range]).
  • Incomplete shutdowns (SIGKILL detection) (stat; alert if > 0) increase(lifecycle_shutdown_initiated_total{service_name="$service_name"}[1h]) - increase(lifecycle_shutdown_completed_total{service_name="$service_name"}[1h]).
  • Component liveness (table) lifecycle_component_healthy{service_name="$service_name"} (one row per component).

Alert rules (segment by service_name)

  • Component timeout rate > 0 over 5m: increase(lifecycle_component_shutdown_result_total{service_name="$service_name", result="timeout"}[5m]) > 0
  • Incomplete shutdown count > 0 over 1h: increase(lifecycle_shutdown_initiated_total{service_name="$service_name"}[1h]) - increase(lifecycle_shutdown_completed_total{service_name="$service_name"}[1h]) > 0
  • Component unhealthy for > 2 consecutive scrapes: e.g. alert when lifecycle_component_healthy{service_name="$service_name"} is 0 for a given component for 2 scrape intervals.

Testing

For deterministic shutdown in tests, use with_shutdown_token on the builder. The test holds the token and calls token.cancel() to trigger shutdown:

rust
use tokio_util::sync::CancellationToken;

let shutdown_token = CancellationToken::new();
let mut manager = Manager::builder("test-service")
    .with_trap_signals(false)
    .with_prestop_check(false)
    .with_shutdown_token(shutdown_token.clone())
    .build();

// ... register components, spawn monitor_background, spawn component tasks ...

shutdown_token.cancel(); // test triggers shutdown
guard.wait().await?;

SIGKILL detection

SIGKILL cannot be trapped. Detection is by absence:

  • Logs: "Lifecycle: shutdown initiated" present but "Lifecycle: shutdown complete" absent within the expected window → process was killed mid-shutdown. In Loki: {app="my-worker"} |= "Lifecycle: shutdown initiated" and absence of |= "Lifecycle: shutdown complete".
  • Metrics: increase(lifecycle_shutdown_initiated_total[1h]) - increase(lifecycle_shutdown_completed_total[1h]) > 0 means some shutdowns did not complete.
  • K8s: Pod exit code 137 is the authoritative SIGKILL signal (infrastructure-level, not emitted by this crate).