docs/guide/state-and-deps.md
The TaskiqState is a global variable where you can keep the variables you want to use later.
For example, you want to open a database connection pool at a broker's startup.
This can be achieved by adding event handlers.
You can use one of these events:
WORKER_STARTUPCLIENT_STARTUPWORKER_SHUTDOWNCLIENT_SHUTDOWNWorker events are called when you start listening to the broker messages using taskiq.
Client events are called when you call the startup method of your broker from your code.
This is an example of code using event handlers:
::: tabs
@tab Annotated 3.10+
@tab default values
:::
::: tip Cool tip!
If you want to add handlers programmatically, you can use the broker.add_event_handler function.
:::
As you can see in this example, this worker will initialize the Redis pool at the startup. You can access the state from the context.
Using context directly is nice, but this way you won't get code-completion.
That's why we suggest you try TaskiqDependencies. The implementation is very similar to FastApi's dependencies. You can use classes, functions, and generators as dependencies.
We use the taskiq-dependencies package to provide autocompletion. You can easily integrate it in your own project.
You can use dependencies for better autocompletion and reduce the amount of code you write. Since the state is generic, we cannot guess the types of the state fields. Dependencies can be annotated with type hints and therefore provide better auto-completion.
Let's assume that you've stored a Redis connection pool in the state as in the example above.
@broker.on_event(TaskiqEvents.WORKER_STARTUP)
async def startup(state: TaskiqState) -> None:
# Here we store connection pool on startup for later use.
state.redis = ConnectionPool.from_url("redis://localhost/1")
You can access this variable by using the current execution context directly, like this:
::: tabs
@tab Annotated 3.10+
from typing import Annotated
@broker.task
async def my_task(context: Annotated[Context, TaskiqDepends()]) -> None:
async with Redis(connection_pool=context.state.redis, decode_responses=True) as redis:
await redis.set('key', 'value')
@tab default values
@broker.task
async def my_task(context: Context = TaskiqDepends()) -> None:
async with Redis(connection_pool=context.state.redis, decode_responses=True) as redis:
await redis.set('key', 'value')
:::
If you hit the TAB button after the context.state. expression, your IDE won't give you any auto-completion.
But we can create a dependency function to add auto-completion.
::: tabs
@tab Annotated 3.10+
from typing import Annotated
def redis_dep(context: Annotated[Context, TaskiqDepends()]) -> Redis:
return Redis(connection_pool=context.state.redis, decode_responses=True)
@broker.task
async def my_task(redis: Annotated[Redis, TaskiqDepends(redis_dep)]) -> None:
await redis.set('key', 'value')
@tab default values
def redis_dep(context: Context = TaskiqDepends()) -> Redis:
return Redis(connection_pool=context.state.redis, decode_responses=True)
@broker.task
async def my_task(redis: Redis = TaskiqDepends(redis_dep)) -> None:
await redis.set('key', 'value')
:::
Now, this dependency injection will be autocompleted. But, of course, state fields cannot be autocompleted, even in dependencies. But this way, you won't make any typos while writing tasks.
We build a graph of dependencies on startup. If the parameter of the function has
the default value of TaskiqDepends this parameter will be treated as a dependency.
Dependencies can also depend on something. Also dependencies are optimized to not evaluate things many times.
For example:
::: tabs
@tab Annotated 3.10+
@tab default values
:::
In this code, the dependency common_dep is going to be evaluated only once and the dep1 and the dep2 are going to receive the same value. You can control this behavior by using the use_cache=False parameter to you dependency. This parameter will force the
dependency to reevaluate all it's subdependencies.
In this example we cannot predict the result. Since the dep2 doesn't use cache for the common_dep function.
::: tabs
@tab Annotated 3.10+
@tab default values
:::
The graph for cached dependencies looks like this:
graph TD
A[common_dep]
B[dep1]
C[dep2]
D[my_task]
A --> B
A --> C
B --> D
C --> D
The dependencies graph for my_task where dep2 doesn't use cached value for common_dep looks like this:
graph TD
A[common_dep]
B[dep1]
D[my_task]
C[dep2]
subgraph without cache
A1[common_dep]
end
A --> B
A1 --> C
B --> D
C --> D
You can use classes as dependencies, and they can also use other dependencies too.
Let's see an example:
::: tabs
@tab Annotated 3.10+
@tab default values
:::
As you can see, the dependency for my_task function is declared with TaskiqDependency().
It's because you can omit the class if it's declared in type-hint for the parameter. This feature doesn't
work with dependency functions, it's only for classes.
You can pass dependencies for classes in the constructor.
Generator dependencies are used to perform startup before task execution and teardown after the task execution.
::: tabs
@tab Annotated 3.10+
@tab default values
:::
In this example, we can do something at startup before the execution and at shutdown after the task is completed.
If you want to do something asynchronously, convert this function to an asynchronous generator. Like this:
::: tabs
@tab Annotated 3.10+
@tab default values
:::
Generator dependencies can handle exceptions that happen in tasks. This feature is handy if you want your system to be more atomic.
For example, if you open a database transaction in your dependency and want to commit it only if the function you execute is completed successfully.
::: tabs
@tab Annotated 3.10+
from typing import Annotated
async def get_transaction(
db_driver: Annotated[DBDriver, TaskiqDepends(get_driver)],
) -> AsyncGenerator[Transaction, None]:
trans = db_driver.begin_transaction():
try:
# Here we give transaction to our dependant function.
yield trans
# If exception was found in dependant function,
# we rollback our transaction.
except Exception:
await trans.rollback()
return
# Here we commit if everything is fine.
await trans.commit()
@tab default values
async def get_transaction(
db_driver: DBDriver = TaskiqDepends(get_driver),
) -> AsyncGenerator[Transaction, None]:
trans = db_driver.begin_transaction():
try:
# Here we give transaction to our dependant function.
yield trans
# If exception was found in dependant function,
# we rollback our transaction.
except Exception:
await trans.rollback()
return
# Here we commit if everything is fine.
await trans.commit()
:::
If you don't want to propagate exceptions in dependencies, you can add --no-propagate-errors option to worker command.
taskiq worker my_file:broker --no-propagate-errors
In this case, no exception will ever going to be propagated to any dependency.
Taskiq supports generic dependencies. You can create a generic class that is generic over another class and takskiq will be able to resolve generics based on type annotations.
By default taskiq has only two dependencies:
taskiq.context.Contexttaskiq.state.TaskiqStateYou can expand default list of available dependencies for you application. Taskiq have an ability to add new first-level dependencies using brokers.
The AsyncBroker interface has a function called add_dependency_context and you can add
more default dependencies to the taskiq. This may be useful for libraries if you want to
add new dependencies to users.