scientific-skills/simpy/references/process-interaction.md
This guide covers the mechanisms for processes to interact and synchronize in SimPy simulations.
SimPy provides three primary ways for processes to interact:
Processes can share events to coordinate their execution.
import simpy
def controller(env, signal_event):
print(f'Controller: Preparing at {env.now}')
yield env.timeout(5)
print(f'Controller: Sending signal at {env.now}')
signal_event.succeed()
def worker(env, signal_event):
print(f'Worker: Waiting for signal at {env.now}')
yield signal_event
print(f'Worker: Received signal, starting work at {env.now}')
yield env.timeout(3)
print(f'Worker: Work complete at {env.now}')
env = simpy.Environment()
signal = env.event()
env.process(controller(env, signal))
env.process(worker(env, signal))
env.run()
Use cases:
Multiple processes can wait for the same signal event.
import simpy
def broadcaster(env, signal):
yield env.timeout(5)
print(f'Broadcasting signal at {env.now}')
signal.succeed(value='Go!')
def listener(env, name, signal):
print(f'{name}: Waiting at {env.now}')
msg = yield signal
print(f'{name}: Received "{msg}" at {env.now}')
yield env.timeout(2)
print(f'{name}: Done at {env.now}')
env = simpy.Environment()
broadcast_signal = env.event()
env.process(broadcaster(env, broadcast_signal))
for i in range(3):
env.process(listener(env, f'Listener {i+1}', broadcast_signal))
env.run()
import simpy
class Barrier:
def __init__(self, env, n):
self.env = env
self.n = n
self.count = 0
self.event = env.event()
def wait(self):
self.count += 1
if self.count >= self.n:
self.event.succeed()
return self.event
def worker(env, barrier, name, work_time):
print(f'{name}: Working at {env.now}')
yield env.timeout(work_time)
print(f'{name}: Reached barrier at {env.now}')
yield barrier.wait()
print(f'{name}: Passed barrier at {env.now}')
env = simpy.Environment()
barrier = Barrier(env, 3)
env.process(worker(env, barrier, 'Worker A', 3))
env.process(worker(env, barrier, 'Worker B', 5))
env.process(worker(env, barrier, 'Worker C', 7))
env.run()
Processes are events themselves, so you can yield them to wait for completion.
import simpy
def task(env, name, duration):
print(f'{name}: Starting at {env.now}')
yield env.timeout(duration)
print(f'{name}: Completed at {env.now}')
return f'{name} result'
def sequential_coordinator(env):
# Execute tasks sequentially
result1 = yield env.process(task(env, 'Task 1', 5))
print(f'Coordinator: {result1}')
result2 = yield env.process(task(env, 'Task 2', 3))
print(f'Coordinator: {result2}')
result3 = yield env.process(task(env, 'Task 3', 4))
print(f'Coordinator: {result3}')
env = simpy.Environment()
env.process(sequential_coordinator(env))
env.run()
import simpy
def task(env, name, duration):
print(f'{name}: Starting at {env.now}')
yield env.timeout(duration)
print(f'{name}: Completed at {env.now}')
return f'{name} result'
def parallel_coordinator(env):
# Start all tasks
task1 = env.process(task(env, 'Task 1', 5))
task2 = env.process(task(env, 'Task 2', 3))
task3 = env.process(task(env, 'Task 3', 4))
# Wait for all to complete
results = yield task1 & task2 & task3
print(f'All tasks completed at {env.now}')
print(f'Task 1 result: {task1.value}')
print(f'Task 2 result: {task2.value}')
print(f'Task 3 result: {task3.value}')
env = simpy.Environment()
env.process(parallel_coordinator(env))
env.run()
import simpy
def server(env, name, processing_time):
print(f'{name}: Starting request at {env.now}')
yield env.timeout(processing_time)
print(f'{name}: Completed at {env.now}')
return name
def load_balancer(env):
# Send request to multiple servers
server1 = env.process(server(env, 'Server 1', 5))
server2 = env.process(server(env, 'Server 2', 3))
server3 = env.process(server(env, 'Server 3', 7))
# Wait for first to respond
result = yield server1 | server2 | server3
# Get the winner
winner = list(result.values())[0]
print(f'Load balancer: {winner} responded first at {env.now}')
env = simpy.Environment()
env.process(load_balancer(env))
env.run()
Processes can be interrupted using process.interrupt(), which throws an Interrupt exception.
import simpy
def worker(env):
try:
print(f'Worker: Starting long task at {env.now}')
yield env.timeout(10)
print(f'Worker: Task completed at {env.now}')
except simpy.Interrupt as interrupt:
print(f'Worker: Interrupted at {env.now}')
print(f'Interrupt cause: {interrupt.cause}')
def interrupter(env, target_process):
yield env.timeout(5)
print(f'Interrupter: Interrupting worker at {env.now}')
target_process.interrupt(cause='Higher priority task')
env = simpy.Environment()
worker_process = env.process(worker(env))
env.process(interrupter(env, worker_process))
env.run()
Process can re-yield the same event after interruption to continue waiting.
import simpy
def resumable_worker(env):
work_left = 10
while work_left > 0:
try:
print(f'Worker: Working ({work_left} units left) at {env.now}')
start = env.now
yield env.timeout(work_left)
work_left = 0
print(f'Worker: Completed at {env.now}')
except simpy.Interrupt:
work_left -= (env.now - start)
print(f'Worker: Interrupted! {work_left} units left at {env.now}')
def interrupter(env, worker_proc):
yield env.timeout(3)
worker_proc.interrupt()
yield env.timeout(2)
worker_proc.interrupt()
env = simpy.Environment()
worker_proc = env.process(resumable_worker(env))
env.process(interrupter(env, worker_proc))
env.run()
import simpy
def machine(env, name):
while True:
try:
print(f'{name}: Operating at {env.now}')
yield env.timeout(5)
except simpy.Interrupt as interrupt:
if interrupt.cause == 'maintenance':
print(f'{name}: Maintenance required at {env.now}')
yield env.timeout(2)
print(f'{name}: Maintenance complete at {env.now}')
elif interrupt.cause == 'emergency':
print(f'{name}: Emergency stop at {env.now}')
break
def maintenance_scheduler(env, machine_proc):
yield env.timeout(7)
machine_proc.interrupt(cause='maintenance')
yield env.timeout(10)
machine_proc.interrupt(cause='emergency')
env = simpy.Environment()
machine_proc = env.process(machine(env, 'Machine 1'))
env.process(maintenance_scheduler(env, machine_proc))
env.run()
import simpy
def user(env, name, resource, priority, duration):
with resource.request(priority=priority) as req:
try:
yield req
print(f'{name} (priority {priority}): Got resource at {env.now}')
yield env.timeout(duration)
print(f'{name}: Done at {env.now}')
except simpy.Interrupt:
print(f'{name}: Preempted at {env.now}')
env = simpy.Environment()
resource = simpy.PreemptiveResource(env, capacity=1)
env.process(user(env, 'Low priority user', resource, priority=10, duration=10))
env.process(user(env, 'High priority user', resource, priority=1, duration=5))
env.run()
import simpy
class Buffer:
def __init__(self, env, capacity):
self.env = env
self.capacity = capacity
self.items = []
self.item_available = env.event()
def put(self, item):
if len(self.items) < self.capacity:
self.items.append(item)
if not self.item_available.triggered:
self.item_available.succeed()
return True
return False
def get(self):
if self.items:
return self.items.pop(0)
return None
def producer(env, buffer):
item_id = 0
while True:
yield env.timeout(2)
item = f'Item {item_id}'
if buffer.put(item):
print(f'Producer: Added {item} at {env.now}')
item_id += 1
def consumer(env, buffer):
while True:
if buffer.items:
item = buffer.get()
print(f'Consumer: Retrieved {item} at {env.now}')
yield env.timeout(3)
else:
print(f'Consumer: Waiting for items at {env.now}')
yield buffer.item_available
buffer.item_available = env.event()
env = simpy.Environment()
buffer = Buffer(env, capacity=5)
env.process(producer(env, buffer))
env.process(consumer(env, buffer))
env.run(until=20)
import simpy
def sender(env, request_event, acknowledge_event):
for i in range(3):
print(f'Sender: Sending request {i} at {env.now}')
request_event.succeed(value=f'Request {i}')
yield acknowledge_event
print(f'Sender: Received acknowledgment at {env.now}')
# Reset events for next iteration
request_event = env.event()
acknowledge_event = env.event()
yield env.timeout(1)
def receiver(env, request_event, acknowledge_event):
for i in range(3):
request = yield request_event
print(f'Receiver: Got {request} at {env.now}')
yield env.timeout(2) # Process request
acknowledge_event.succeed()
print(f'Receiver: Sent acknowledgment at {env.now}')
# Reset for next iteration
request_event = env.event()
acknowledge_event = env.event()
env = simpy.Environment()
request = env.event()
ack = env.event()
env.process(sender(env, request, ack))
env.process(receiver(env, request, ack))
env.run()
Choose the right mechanism:
Exception handling: Always wrap interrupt-prone code in try-except blocks
Event lifecycle: Remember that events can only be triggered once; create new events for repeated signaling
Process references: Store process objects if you need to interrupt them later
Cause information: Use interrupt causes to communicate why interruption occurred
Resumable patterns: Track progress to enable resumption after interruption
Avoid deadlocks: Ensure at least one process can make progress at any time