ib-underscore-async-underscore-modules-ib-underscore-async-util.md
"""Utilities."""import asyncioimport datetime as dtimport loggingimport mathimport signalimport sysimport timefrom collections.abc importAsyncIterator,Awaitable,Callable,Iteratorfrom dataclasses importfields,is\_dataclassfrom typing import(Any,Final,TypeAlias,)from zoneinfo importZoneInfoimport eventkit as evglobalErrorEvent=ev.Event()"""Event to emit global exceptions."""EPOCH:Final=dt.datetime(1970,1,1,tzinfo=dt.timezone.utc)UNSET\_INTEGER:Final=2\*\*31-1UNSET\_DOUBLE:Final=sys.float\_info.maxTime\_t:TypeAlias=dt.time|dt.datetime
[[docs]](../../api.html#ib_async.ib.df)def df(objs, labels: list[str] | None = None):""" Create pandas DataFrame from the sequence of same-type objects. Args: labels: If supplied, retain only the given labels and drop the rest. """import pandas as pdfrom .objects import DynamicObjectif objs:objs = list(objs)obj = objs[0]if is\_dataclass(obj):df = pd.DataFrame.from\_records(dataclassAsTuple(o) for o in objs)df.columns = [field.name for field in fields(obj)]elif isinstance(obj, DynamicObject):df = pd.DataFrame.from\_records(o.\_\_dict\_\_ for o in objs)else:df = pd.DataFrame.from\_records(objs)if isinstance(obj, tuple):\_fields = getattr(obj, "\_fields", None)if \_fields:# assume it's a namedtupledf.columns = \_fieldselse:df = Noneif labels:exclude = [label for label in df if label not in labels]df = df.drop(exclude, axis=1)return df
[[docs]](../../api.html#ib_async.ib.dataclassAsDict)def dataclassAsDict(obj) -\> dict:""" Return dataclass values as ``dict``. This is a non-recursive variant of ``dataclasses.asdict``. """if not is\_dataclass(obj):raise TypeError(f"Object {obj} is not a dataclass")return {field.name: getattr(obj, field.name) for field in fields(obj)}
[[docs]](../../api.html#ib_async.ib.dataclassAsTuple)def dataclassAsTuple(obj) -\> tuple[Any, ...]:""" Return dataclass values as ``tuple``. This is a non-recursive variant of ``dataclasses.astuple``. """if not is\_dataclass(obj):raise TypeError(f"Object {obj} is not a dataclass")return tuple(getattr(obj, field.name) for field in fields(obj))
[[docs]](../../api.html#ib_async.ib.dataclassNonDefaults)def dataclassNonDefaults(obj) -\> dict[str, Any]:""" For a ``dataclass`` instance get the fields that are different from the default values and return as ``dict``. """if not is\_dataclass(obj):raise TypeError(f"Object {obj} is not a dataclass")values = [getattr(obj, field.name) for field in fields(obj)]return {field.name: valuefor field, value in zip(fields(obj), values)if value is not Noneand value != field.defaultand value == valueand not ((isinstance(value, list) and value == [])or (isinstance(value, dict) and value == {}))}
[[docs]](../../api.html#ib_async.ib.dataclassUpdate)def dataclassUpdate(obj, \*srcObjs, \*\*kwargs) -\> object:""" Update fields of the given ``dataclass`` object from zero or more ``dataclass`` source objects and/or from keyword arguments. """if not is\_dataclass(obj):raise TypeError(f"Object {obj} is not a dataclass")for srcObj in srcObjs:obj.\_\_dict\_\_.update(dataclassAsDict(srcObj))# type: ignoreobj.\_\_dict\_\_.update(\*\*kwargs)# type: ignorereturn obj
[[docs]](../../api.html#ib_async.ib.dataclassRepr)def dataclassRepr(obj) -\> str:""" Provide a culled representation of the given ``dataclass`` instance, showing only the fields with a non-default value. """attrs = dataclassNonDefaults(obj)clsName = obj.\_\_class\_\_.\_\_qualname\_\_kwargs = ", ".join(f"{k}={v!r}" for k, v in attrs.items())return f"{clsName}({kwargs})"
[[docs]](../../api.html#ib_async.ib.isnamedtupleinstance)def isnamedtupleinstance(x):"""From https://stackoverflow.com/a/2166841/6067848"""t = type(x)b = t.\_\_bases\_\_if len(b) != 1 or not isinstance(x, tuple):return Falsef = getattr(t, "\_fields", None)if not isinstance(f, tuple):return Falsereturn all(isinstance(n, str) for n in f)
[[docs]](../../api.html#ib_async.ib.tree)def tree(obj):""" Convert object to a tree of lists, dicts and simple values. The result can be serialized to JSON. """if isinstance(obj, bool | int | float | str | bytes):return objif isinstance(obj, dt.date | dt.time):return obj.isoformat()if isinstance(obj, dict):return {k: tree(v) for k, v in obj.items()}if isnamedtupleinstance(obj):return {f: tree(getattr(obj, f)) for f in obj.\_fields}if isinstance(obj, list | tuple | set):return [tree(i) for i in obj]if is\_dataclass(obj):return {obj.\_\_class\_\_.\_\_qualname\_\_: tree(dataclassNonDefaults(obj))}return str(obj)
[[docs]](../../api.html#ib_async.ib.barplot)def barplot(bars, title="", upColor="blue", downColor="red"):""" Create candlestick plot for the given bars. The bars can be given as a DataFrame or as a list of bar objects. """import matplotlib.pyplot as pltimport pandas as pdfrom matplotlib.lines import Line2Dfrom matplotlib.patches import Rectangleif isinstance(bars, pd.DataFrame):ohlcTups = [tuple(v) for v in bars[["open", "high", "low", "close"]].values]elif bars and hasattr(bars[0], "open\_"):ohlcTups = [(b.open\_, b.high, b.low, b.close) for b in bars]else:ohlcTups = [(b.open, b.high, b.low, b.close) for b in bars]fig, ax = plt.subplots()ax.set\_title(title)ax.grid(True)fig.set\_size\_inches(10, 6)for n, (open\_, high, low, close) in enumerate(ohlcTups):if close \>= open\_:color = upColorbodyHi, bodyLo = close, open\_else:color = downColorbodyHi, bodyLo = open\_, closeline = Line2D(xdata=(n, n), ydata=(low, bodyLo), color=color, linewidth=1)ax.add\_line(line)line = Line2D(xdata=(n, n), ydata=(high, bodyHi), color=color, linewidth=1)ax.add\_line(line)rect = Rectangle(xy=(n - 0.3, bodyLo),width=0.6,height=bodyHi - bodyLo,edgecolor=color,facecolor=color,alpha=0.4,antialiased=True,)ax.add\_patch(rect)ax.autoscale\_view()return fig
[[docs]](../../api.html#ib_async.ib.allowCtrlC)def allowCtrlC():"""Allow Control-C to end program."""signal.signal(signal.SIGINT, signal.SIG\_DFL)
[[docs]](../../api.html#ib_async.ib.logToFile)def logToFile(path, level=logging.INFO):"""Create a log handler that logs to the given file."""logger = logging.getLogger()if logger.handlers:logging.getLogger("ib\_async").setLevel(level)else:logger.setLevel(level)formatter = logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s")handler = logging.FileHandler(path)handler.setFormatter(formatter)logger.addHandler(handler)
[[docs]](../../api.html#ib_async.ib.logToConsole)def logToConsole(level=logging.INFO):"""Create a log handler that logs to the console."""logger = logging.getLogger()stdHandlers = [hfor h in logger.handlersif type(h) is logging.StreamHandler and h.stream is sys.stderr]if stdHandlers:# if a standard stream handler already exists, use it and# set the log level for the ib\_async namespace onlylogging.getLogger("ib\_async").setLevel(level)else:# else create a new handlerlogger.setLevel(level)formatter = logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s")handler = logging.StreamHandler()handler.setFormatter(formatter)logger.addHandler(handler)
[[docs]](../../api.html#ib_async.ib.isNan)def isNan(x: float) -\> bool:"""Not a number test."""return x != x
[[docs]](../../api.html#ib_async.ib.formatSI)def formatSI(n: float) -\> str:"""Format the integer or float n to 3 significant digits + SI prefix."""s = ""if n \< 0:n = -ns += "-"if isinstance(n, int) and n \< 1000:s = str(n) + " "elif n \< 1e-22:s = "0.00 "else:assert n \< 9.99e26log = int(math.floor(math.log10(n)))i, j = divmod(log, 3)for \_try in range(2):templ = f"%.{2 - j}f"val = templ % (n \* 10 \*\* (-3 \* i))if val != "1000":breaki += 1j = 0s += val + " "if i != 0:s += "yzafpnum kMGTPEZY"[i + 8]return s
[[docs]](../../api.html#ib_async.ib.timeit)class timeit:"""Context manager for timing."""def \_\_init\_\_(self, title="Run"):self.title = titledef \_\_enter\_\_(self):self.t0 = time.time()def \_\_exit\_\_(self, \*\_args):print(self.title + " took " + formatSI(time.time() - self.t0) + "s")
[[docs]](../../api.html#ib_async.ib.run)def run(\*awaitables: Awaitable, timeout: float | None = None):""" By default run the event loop forever. When awaitables (like Tasks, Futures or coroutines) are given then run the event loop until each has completed and return their results. An optional timeout (in seconds) can be given that will raise asyncio.TimeoutError if the awaitables are not ready within the timeout period. """loop = getLoop()if not awaitables:if loop.is\_running():returnloop.run\_forever()result = Noneall\_tasks = asyncio.all\_tasks(loop)# type: ignoreif all\_tasks:# cancel pending tasksf = asyncio.gather(\*all\_tasks)f.cancel()try:loop.run\_until\_complete(f)except asyncio.CancelledError:passelse:if len(awaitables) == 1:future = awaitables[0]else:future = asyncio.gather(\*awaitables)if timeout:future = asyncio.wait\_for(future, timeout)# Pass loop explicitly to avoid deprecation warnings in Python 3.10+task = asyncio.ensure\_future(future, loop=loop)def onError(\_):task.cancel()globalErrorEvent.connect(onError)try:result = loop.run\_until\_complete(task)except asyncio.CancelledError as e:raise globalErrorEvent.value() or efinally:globalErrorEvent.disconnect(onError)return result
def \_fillDate(time:Time\_t)-\>dt.datetime:# use today if date is absentifisinstance(time,dt.time):t=dt.datetime.combine(dt.date.today(),time)else:t=timereturnt
[[docs]](../../api.html#ib_async.ib.schedule)def schedule(time: Time\_t, callback: Callable, \*args):""" Schedule the callback to be run at the given time with the given arguments. This will return the Event Handle. Args: time: Time to run callback. If given as :py:class:`datetime.time` then use today as date. callback: Callable scheduled to run. args: Arguments for to call callback with. """t = \_fillDate(time)now = dt.datetime.now(t.tzinfo)delay = (t - now).total\_seconds()loop = getLoop()return loop.call\_later(delay, callback, \*args)
[[docs]](../../api.html#ib_async.ib.sleep)def sleep(secs: float = 0.02) -\> bool:""" Wait for the given amount of seconds while everything still keeps processing in the background. Never use time.sleep(). Args: secs (float): Time in seconds to wait. """run(asyncio.sleep(secs))return True
[[docs]](../../api.html#ib_async.ib.timeRange)def timeRange(start: Time\_t, end: Time\_t, step: float) -\> Iterator[dt.datetime]:""" Iterator that waits periodically until certain time points are reached while yielding those time points. Args: start: Start time, can be specified as datetime.datetime, or as datetime.time in which case today is used as the date end: End time, can be specified as datetime.datetime, or as datetime.time in which case today is used as the date step (float): The number of seconds of each period """assert step \> 0delta = dt.timedelta(seconds=step)t = \_fillDate(start)tz = dt.timezone.utc if t.tzinfo else Nonenow = dt.datetime.now(tz)while t \< now:t += deltawhile t \<= \_fillDate(end):waitUntil(t)yield tt += delta
[[docs]](../../api.html#ib_async.ib.waitUntil)def waitUntil(t: Time\_t) -\> bool:""" Wait until the given time t is reached. Args: t: The time t can be specified as datetime.datetime, or as datetime.time in which case today is used as the date. """now = dt.datetime.now(t.tzinfo)secs = (\_fillDate(t) - now).total\_seconds()run(asyncio.sleep(secs))return True
[[docs]](../../api.html#ib_async.ib.timeRangeAsync)async def timeRangeAsync(start: Time\_t, end: Time\_t, step: float) -\> AsyncIterator[dt.datetime]:"""Async version of :meth:`timeRange`."""assert step \> 0delta = dt.timedelta(seconds=step)t = \_fillDate(start)tz = dt.timezone.utc if t.tzinfo else Nonenow = dt.datetime.now(tz)while t \< now:t += deltawhile t \<= \_fillDate(end):await waitUntilAsync(t)yield tt += delta
[[docs]](../../api.html#ib_async.ib.waitUntilAsync)async def waitUntilAsync(t: Time\_t) -\> bool:"""Async version of :meth:`waitUntil`."""now = dt.datetime.now(t.tzinfo)secs = (\_fillDate(t) - now).total\_seconds()await asyncio.sleep(secs)return True
[[docs]](../../api.html#ib_async.ib.patchAsyncio)def patchAsyncio():"""Patch asyncio to allow nested event loops."""import nest\_asyncionest\_asyncio.apply()
[[docs]](../../api.html#ib_async.ib.getLoop)def getLoop():""" Get asyncio event loop with smart fallback handling. This function is designed for use in synchronous contexts or when the execution context is unknown. It will: 1. Try to get the currently running event loop (if in async context) 2. Fall back to getting the current thread's event loop via policy 3. Create a new event loop if none exists or if the existing one is closed For performance-critical async code paths, prefer using asyncio.get\_running\_loop() directly instead of this function. Note: This function does NOT cache the loop to avoid stale loop bugs when loops are closed and recreated (e.g., in testing, Jupyter notebooks). """try:# Fast path: we're in an async context (coroutine or callback)loop = asyncio.get\_running\_loop()return loopexcept RuntimeError:pass# We're in a sync context or no loop is running# Use the event loop policy to get the loop for this thread# This avoids deprecation warnings from get\_event\_loop() in Python 3.10+try:loop = asyncio.get\_event\_loop\_policy().get\_event\_loop()except RuntimeError:# No event loop exists for this thread, create oneloop = asyncio.new\_event\_loop()asyncio.set\_event\_loop(loop)return loop# Check if the loop we got is closed - if so, create a new oneif loop.is\_closed():loop = asyncio.new\_event\_loop()asyncio.set\_event\_loop(loop)return loop
[[docs]](../../api.html#ib_async.ib.startLoop)def startLoop():"""Use nested asyncio event loop for Jupyter notebooks."""patchAsyncio()
[[docs]](../../api.html#ib_async.ib.useQt)def useQt(qtLib: str = "PyQt5", period: float = 0.01):""" Run combined Qt5/asyncio event loop. Args: qtLib: Name of Qt library to use: \* PyQt5 \* PyQt6 \* PySide2 \* PySide6 period: Period in seconds to poll Qt. """def qt\_step():loop.call\_later(period, qt\_step)if not stack:qloop = qc.QEventLoop()timer = qc.QTimer()timer.timeout.connect(qloop.quit)stack.append((qloop, timer))qloop, timer = stack.pop()timer.start(0)qloop.exec() if qtLib == "PyQt6" else qloop.exec\_()timer.stop()stack.append((qloop, timer))qApp.processEvents()# type: ignoreif qtLib not in {"PyQt5", "PyQt6", "PySide2", "PySide6"}:raise RuntimeError(f"Unknown Qt library: {qtLib}")from importlib import import\_moduleqc = import\_module(qtLib + ".QtCore")qw = import\_module(qtLib + ".QtWidgets")global qAppqApp = (# type: ignoreqw.QApplication.instance() or qw.QApplication(sys.argv)# type: ignore)# type: ignoreloop = getLoop()stack: list = []qt\_step()
[[docs]](../../api.html#ib_async.ib.formatIBDatetime)def formatIBDatetime(t: dt.date | dt.datetime | str | None) -\> str:"""Format date or datetime to string that IB uses."""if not t:s = ""elif isinstance(t, dt.datetime):# convert to UTC timezonet = t.astimezone(tz=dt.timezone.utc)s = t.strftime("%Y%m%d %H:%M:%S UTC")elif isinstance(t, dt.date):t = dt.datetime(t.year, t.month, t.day, 23, 59, 59).astimezone(tz=dt.timezone.utc)s = t.strftime("%Y%m%d %H:%M:%S UTC")else:s = treturn s
[[docs]](../../api.html#ib_async.ib.parseIBDatetime)def parseIBDatetime(s: str) -\> dt.date | dt.datetime:"""Parse string in IB date or datetime format to datetime."""if len(s) == 8:# YYYYmmddy = int(s[0:4])m = int(s[4:6])d = int(s[6:8])t = dt.date(y, m, d)elif s.isdigit():t = dt.datetime.fromtimestamp(int(s), dt.timezone.utc)elif s.count(" ") \>= 2 and " " not in s:# 20221125 10:00:00 Europe/Amsterdams0, s1, s2 = s.split(" ", 2)t = dt.datetime.strptime(s0 + s1, "%Y%m%d%H:%M:%S")t = t.replace(tzinfo=ZoneInfo(s2))else:# YYYYmmdd HH:MM:SS# or# YYYY-mm-dd HH:MM:SS.0ss = s.replace(" ", "").replace("-", "")[:16]t = dt.datetime.strptime(ss, "%Y%m%d%H:%M:%S")return t