crates/coglet-python/README.md
PyO3 bindings that bridge the Rust coglet library to Python. This crate implements
the PredictHandler trait by wrapping Python predictor classes.
┌─────────────────────────────────────────────────────────────────────────────┐
│ coglet-python │
│ │
│ ┌──────────────────────────────────────────────────────────────────────┐ │
│ │ lib.rs │ │
│ │ Python module: serve(), active(), _run_worker(), _is_cancelable() │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────────────┼───────────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────┐ ┌─────────────────────┐ ┌──────────────────┐ │
│ │ worker_bridge.rs │ │ predictor.rs │ │ log_writer.rs │ │
│ │ PredictHandler │ │ PythonPredictor │ │ SlotLogWriter │ │
│ │ impl for Python │ │ load/setup/predict │ │ ContextVar │ │
│ └─────────────────────┘ └─────────────────────┘ └──────────────────┘ │
│ │ │ │ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────┐ ┌─────────────────────┐ ┌──────────────────┐ │
│ │ input.rs │ │ output.rs │ │ audit.rs │ │
│ │ Pydantic/ADT │ │ JSON serialization │ │ TeeWriter │ │
│ │ input processing │ │ make_encodeable │ │ stream protect │ │
│ └─────────────────────┘ └─────────────────────┘ └──────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────────────────────┐ │
│ │ cancel.rs │ │
│ │ SIGUSR1 handling, CancelableGuard, KeyboardInterrupt injection │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
coglet-python/
├── Cargo.toml
├── coglet.pyi # Type stubs for Python IDE support
└── src/
├── lib.rs # Python module definition, serve/active/_run_worker
├── predictor.rs # PythonPredictor: wraps Python Predictor class
├── worker_bridge.rs # PythonPredictHandler: implements PredictHandler
├── input.rs # Input processing (Pydantic validation, ADT)
├── output.rs # Output processing (make_encodeable, upload_files)
├── log_writer.rs # SlotLogWriter, ContextVar routing, SetupLogSender
├── audit.rs # Audit hook, TeeWriter for stream protection
└── cancel.rs # Cancellation: SIGUSR1, CancelableGuard
active() Flagimport coglet
if coglet.server.active:
# Running inside worker subprocess
# stdout/stderr are captured, print goes to slot routing
else:
# Running in parent or standalone
# Normal stdout/stderr behavior
Set to True at the start of _run_worker(). Used by user code and cog internals
to detect worker context.
Async predictors run on a single Python asyncio event loop, created at worker startup. All slots share this loop.
Worker Subprocess
┌─────────────────────────────────────────────────────────────┐
│ Tokio Runtime (Rust) │
│ └─ run_worker event loop │
│ └─ For each SlotRequest::Predict: │
│ └─ tokio::spawn prediction task │
│ └─ Python::attach (acquire GIL) │
│ └─ asyncio.run_coroutine_threadsafe() │
│ └─ Predictor.predict() coroutine │
└─────────────────────────────────────────────────────────────┘
asyncio event loop (Python)
┌─────────────────────────────────────────────────────────────┐
│ Single event loop, started once at worker init │
│ - concurrent.futures.Future per async prediction │
│ - ContextVar propagates prediction_id to spawned tasks │
│ - Cancellation via future.cancel() │
└─────────────────────────────────────────────────────────────┘
Why single loop?
run_coroutine_threadsafe to submit from Rust/Tokiomax_concurrency)Sync Predictors:
SlotRequest::Predict arrives
│
├─▶ Python::attach (acquire GIL)
├─▶ set_sync_prediction_id(id) # For log routing
├─▶ predictor.predict(input) # Blocking call
├─▶ set_sync_prediction_id(None)
└─▶ Return PredictResult
Async Predictors:
SlotRequest::Predict arrives
│
├─▶ Python::attach (acquire GIL)
├─▶ Create wrapped coroutine:
│ async def _ctx_wrapper(coro, prediction_id, contextvar):
│ contextvar.set(prediction_id) # Set in this task's context
│ return await coro
│
├─▶ asyncio.run_coroutine_threadsafe(wrapper, loop)
├─▶ py.detach() (release GIL)
├─▶ future.result() (block Rust task, Python runs)
└─▶ Return PredictResult
All output from user code must be captured and routed through the slot socket.
Architecture:
sys.stdout = SlotLogWriter(stdout)
sys.stderr = SlotLogWriter(stderr)
SlotLogWriter.write(data)
│
├─▶ Get current prediction_id from:
│ 1. SYNC_PREDICTION_ID static (for sync predictors)
│ 2. ContextVar (for async predictors/spawned tasks)
│
├─▶ Look up SlotSender in PREDICTION_REGISTRY
│
└─▶ Route:
Found sender → slot_sender.send_log(source, data)
No sender → Check setup sender (during setup)
Neither → Log as orphan to stderr
Line Buffering:
SlotLogWriter buffers writes until a newline. This coalesces Python's print()
which does separate writes for content and \n.
User code might replace sys.stdout:
sys.stdout = open("mylog.txt", "w")
We can't prevent this, but we can intercept it with a Python audit hook.
Strategy: TeeWriter
User replaces sys.stdout
│
├─▶ Audit hook fires on object.__setattr__(sys, "stdout", value)
│
├─▶ Check: is value already SlotLogWriter? → Allow (it's us)
│
├─▶ Check: is value already TeeWriter? → Allow (already wrapped)
│
├─▶ Create TeeWriter(inner=SlotLogWriter, user_stream=value)
│
└─▶ Schedule: sys.stdout = tee (via Timer to avoid recursion)
TeeWriter.write(data)
│
├─▶ inner.write(data) # Our SlotLogWriter (routing works)
└─▶ user_stream.write(data) # User's stream (their code works)
Result: Both our log routing AND the user's stream receive the data.
Sync Predictors:
Parent: ControlRequest::Cancel { slot }
│
├─▶ Worker: handler.cancel(slot)
│ └─▶ Set CANCEL_REQUESTED flag for slot
│
├─▶ Worker: send SIGUSR1 to self
│
└─▶ Signal handler: raise KeyboardInterrupt (if in cancelable region)
Prediction code:
with CancelableGuard(): # Sets CANCELABLE=true
predictor.predict() # Can be interrupted
# CANCELABLE=false on exit
Async Predictors:
Parent: ControlRequest::Cancel { slot }
│
└─▶ Worker: handler.cancel(slot)
│
├─▶ Get future from slot state
└─▶ future.cancel()
│
└─▶ Python raises asyncio.CancelledError
During setup (before any prediction), logs go through the control channel:
worker_bridge.setup()
│
├─▶ register_setup_sender(tx) # Control channel sender
│
├─▶ predictor.load() + predictor.setup()
│ │
│ └─▶ print("Loading model...")
│ │
│ └─▶ SlotLogWriter.write()
│ │
│ ├─▶ No prediction_id (not in prediction)
│ └─▶ get_setup_sender() → ControlResponse::Log
│
└─▶ unregister_setup_sender()
Worker Startup:
set_active() - Mark as worker subprocessinit_tracing() - Configure logging (stderr, COG_LOG_LEVEL env)install_slot_log_writers() - Replace sys.stdout/stderrinstall_audit_hook() - Protect streamsinstall_signal_handler() - SIGUSR1 for cancellationhandler.setup() - Load and initialize predictorShutdown:
Error Handling: