bindings/python/py-bindings-sync-aio.mdx
Turso - is the SQLite compatible database written in Rust. One of the important features of the Turso - is native ability to sync database with the Cloud in both directions (push local changes and pull remote changes).
Your task is to generate ASYNC wrapper for synchronization EXTRA functionality built on top of the existing Python driver which will extend regular embedded with sync capability. Do not modify existing driver - its already implemented in the lib.py and lib_sync.py. Your task is to write extra code which will use abstractions and provide async API support in the Python on top of it in the lib_sync_aio.py file.
General rules for driver implementation you MUST follow and never go against these rules:
# ALL imports MUST be at the beginning - no imports in the middle of function
from typing import ...
from queue import SimpleQueue
from .worker import Worker
from .lib_sync import (
ConnectionSync as BlockingConnectionSync
...
)
from .lib_aio import (
Connection as NonBlockingConnection
...
)
class ConnectionSync(NonBlockingConnection):
def __init__(connector: Callable[[], BlockingConnectionSync]): ...
# internal worker MUST be stopped when connection goes out of context scope
def close(...): ...
# make ConnectionSync instance awaitable
# emit proper typings because delegation of __await__ to the base class will change the return type and make extra methods (pull/push/etc) unavailable
def __await__(...): ...
async def __aenter__(...): ...
async def __aexit__(...):
await self.close()
# returns True of new updates were pulled; False if no new updates were fetched; determine changes by inspecting .empty() method of changes
async def pull(self) -> bool: ...
async def push(self) -> None: ...
async def checkpoint(self) -> None: ...
async def stats(self) -> None: ...
# connect is not async because it returns awaitable ConnectionSync
# same signature as in the lib_sync.py
def connect_sync(...) -> ConnectionSync: ...
# create future for completion
future = asyncio.get_event_loop().create_future()
# put the work to the unbounded queue
queue.put_nowait((future, function))
For turso sync execution you MUST reuse blocking implementation:
<File path="./turso/lib_sync.py" /> <File path="./turso/__init__.py" />You must integrate with non-blocking async db driver implementation:
<File path="./turso/lib_aio.py" /> </Code> </Output>