crates/coglet/README.md
Core Rust library for the coglet prediction server. Pure Rust with no Python
dependencies - the Python bindings live in coglet-python.
coglet
┌─────────────────────────────────────────────────────────────────┐
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ transport/http │ │
│ │ ┌──────────────┐ ┌─────────────────────────────────┐ │ │
│ │ │ server.rs │ │ routes.rs │ │ │
│ │ │ Axum setup │ │ /health, /predictions, /cancel │ │ │
│ │ └──────────────┘ └─────────────────────────────────┘ │ │
│ └───────────────────────────────┬─────────────────────────┘ │
│ │ │
│ ┌───────────────────────────────▼─────────────────────────┐ │
│ │ service.rs │ │
│ │ PredictionService: health, permits, state, webhooks │ │
│ └───────────────────────────────┬─────────────────────────┘ │
│ │ │
│ ┌────────────────────────┼────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌────────────────────┐ ┌──────────┐ │
│ │ permit/ │ │ orchestrator.rs │ │webhook.rs│ │
│ │ PermitPool │ │ Parent-side: │ │ Sender │ │
│ │ Slot alloc │ │ spawn, route │ │ Retry │ │
│ └─────────────┘ └─────────┬──────────┘ └──────────┘ │
│ │ │
│ ┌────────────────────────────▼────────────────────────────┐ │
│ │ bridge/ │ │
│ │ ┌──────────────┐ ┌─────────────┐ ┌────────────────┐ │ │
│ │ │ protocol.rs │ │ codec.rs │ │ transport.rs │ │ │
│ │ │ Message types│ │ JSON lines │ │ Unix sockets │ │ │
│ │ └──────────────┘ └─────────────┘ └────────────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ worker.rs │ │
│ │ Child-side: PredictHandler trait, run_worker loop │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
coglet/
└── src/
├── lib.rs # Public API exports
│
│ # Core Types
├── health.rs # Health, SetupStatus, SetupResult
├── prediction.rs # Prediction state machine
├── predictor.rs # PredictionResult, PredictionError, PredictionOutput
├── version.rs # VersionInfo
│
│ # Service Layer
├── service.rs # PredictionService - lifecycle, state, webhooks
├── webhook.rs # WebhookSender, webhook types
│
│ # Orchestrator (Parent Process)
├── orchestrator.rs # spawn_worker, OrchestratorHandle, event loop
│
│ # Worker (Child Process)
├── worker.rs # run_worker, PredictHandler trait, SetupError
│
│ # Concurrency Control
├── permit/
│ ├── mod.rs
│ ├── pool.rs # PermitPool - slot permit management
│ └── slot.rs # PredictionSlot - permit + prediction binding
│
│ # IPC Bridge
├── bridge/
│ ├── mod.rs
│ ├── protocol.rs # ControlRequest, ControlResponse, SlotRequest, SlotResponse
│ ├── codec.rs # JsonCodec - newline-delimited JSON
│ └── transport.rs # Unix socket transport, ChildTransportInfo
│
│ # HTTP Transport
└── transport/
├── mod.rs
└── http/
├── mod.rs
├── server.rs # ServerConfig, serve()
└── routes.rs # Route handlers, request/response types
service.rs)Single owner of prediction state. Manages:
let service = PredictionService::new_no_pool()
.with_health(Health::Starting)
.with_version(version);
// Later, after worker is ready:
service.set_orchestrator(pool, handle).await;
service.set_health(Health::Ready).await;
orchestrator.rs)Parent-side worker lifecycle management.
spawn_worker(config)
│
├─▶ Create Unix socket transport (N slots)
├─▶ Spawn: python -c "import coglet; coglet.server._run_worker()"
├─▶ Send Init message via stdin
├─▶ Wait for worker to connect sockets
├─▶ Wait for Ready message (with timeout)
├─▶ Populate PermitPool with slot writers
├─▶ Spawn event loop task
└─▶ Return OrchestratorReady {pool, schema, handle}
Event loop handles:
ControlResponse::Idle - Slot ready for next predictionControlResponse::Failed - Slot poisoned, mark unavailableSlotResponse::Log/Output/Done/Failed - Route to predictionworker.rs)Child-side event loop. Implements PredictHandler trait.
run_worker(handler, config)
│
├─▶ Connect to slot sockets (from env)
├─▶ Setup control channel (stdin/stdout)
├─▶ Run handler.setup() with log routing
├─▶ Send Ready {slots, schema}
├─▶ Enter event loop:
│ - ControlRequest::Cancel → handler.cancel(slot)
│ - ControlRequest::Shutdown → exit
│ - SlotRequest::Predict → spawn prediction task
└─▶ Exit on shutdown or all slots poisoned
permit/pool.rs)Slot-based concurrency control.
let pool = PermitPool::new(max_concurrency);
// Add slot with its socket writer
pool.add_permit(slot_id, writer);
// Acquire permit (returns None if at capacity)
let permit = pool.try_acquire()?;
// Send prediction request
permit.send(SlotRequest::Predict { id, input }).await?;
// Return permit when done
drop(permit);
bridge/protocol.rs)Message types for parent-worker communication.
Control Channel:
ControlRequest: Init, Cancel, ShutdownControlResponse: Ready, Log, Idle, Failed, Cancelled, ShuttingDownSlot Channel:
SlotRequest: PredictSlotResponse: Log, Output, Done, Failed, CancelledAll messages are JSON with {"type": "..."} discriminator.
Unknown ──▶ Starting ──┬──▶ Ready ◀──▶ Busy
│
└──▶ SetupFailed ──▶ Defunct
Starting ──▶ Processing ──┬──▶ Succeeded
├──▶ Failed
└──▶ Canceled
ControlRequest::Cancel { slot }handler.cancel(slot)future.cancel() on the asyncio taskSlotResponse::CancelledGraceful (SIGTERM with await_explicit_shutdown):
ControlRequest::ShutdownShuttingDown, exitsImmediate (SIGTERM without flag):
ControlRequest::ShutdownWorker crash:
If a slot socket has an error (write fails, etc.), the slot is marked poisoned. It won't receive new predictions. If all slots are poisoned, worker exits.
enum SlotOutcome {
Idle(SlotId), // Ready for next prediction
Poisoned { slot, error }, // Slot is dead
}