ib-underscore-async-underscore-modules-ib-underscore-async-wrapper.md
"""Wrapper to handle incoming messages."""import asyncioimport loggingimport timefrom collections importdefaultdictfrom contextlib importsuppressfrom dataclasses importdataclass,fieldfrom datetime importdatetimefrom typing importTYPE\_CHECKING,Any,Final,TypeAlias,castfrom ib\_async.contract import(Contract,ContractDescription,ContractDetails,DeltaNeutralContract,ScanData,)from ib\_async.objects import(AccountValue,BarData,BarDataList,CommissionReport,DepthMktDataDescription,Dividends,DOMLevel,EfpData,Execution,FamilyCode,Fill,FundamentalRatios,HistogramData,HistoricalNews,HistoricalSchedule,HistoricalSession,HistoricalTick,HistoricalTickBidAsk,HistoricalTickLast,IBDefaults,MktDepthData,NewsArticle,NewsBulletin,NewsProvider,NewsTick,OptionChain,OptionComputation,PnL,PnLSingle,PortfolioItem,Position,PriceIncrement,RealTimeBar,RealTimeBarList,SoftDollarTier,TickAttribBidAsk,TickAttribLast,TickByTickAllLast,TickByTickBidAsk,TickByTickMidPoint,TickData,TradeLogEntry,)from ib\_async.order importOrder,OrderState,OrderStatus,Tradefrom ib\_async.ticker importTickerfrom ib\_async.util import(UNSET\_DOUBLE,UNSET\_INTEGER,dataclassAsDict,dataclassUpdate,getLoop,globalErrorEvent,parseIBDatetime,)ifTYPE\_CHECKING:from ib\_async.ib importIBOrderKeyType:TypeAlias=int|tuple[int,int]TickDict:TypeAlias=dict[int,str]PRICE\_TICK\_MAP:Final[TickDict]={6:"high",72:"high",7:"low",73:"low",9:"close",75:"close",14:"open",76:"open",15:"low13week",16:"high13week",17:"low26week",18:"high26week",19:"low52week",20:"high52week",35:"auctionPrice",37:"markPrice",50:"bidYield",103:"bidYield",51:"askYield",104:"askYield",52:"lastYield",57:"lastRthTrade",78:"creditmanMarkPrice",79:"creditmanSlowMarkPrice",92:"etfNavClose",93:"etfNavPriorClose",94:"etfNavBid",95:"etfNavAsk",96:"etfNavLast",97:"etfFrozenNavLast",98:"etfNavHigh",99:"etfNavLow",101:"estimatedIpoMidpoint",102:"finalIpoLast",}SIZE\_TICK\_MAP:Final[TickDict]={8:"volume",74:"volume",63:"volumeRate3Min",64:"volumeRate5Min",65:"volumeRate10Min",21:"avVolume",22:"openInterest",27:"callOpenInterest",28:"putOpenInterest",29:"callVolume",30:"putVolume",34:"auctionVolume",36:"auctionImbalance",61:"regulatoryImbalance",86:"futuresOpenInterest",87:"avOptionVolume",89:"shortableShares",}GENERIC\_TICK\_MAP:Final[TickDict]={23:"histVolatility",24:"impliedVolatility",31:"indexFuturePremium",46:"shortable",49:"halted",54:"tradeCount",55:"tradeRate",56:"volumeRate",58:"rtHistVolatility",60:"bondFactorMultiplier",90:"delayedHalted",}GREEKS\_TICK\_MAP:Final[TickDict]={10:"bidGreeks",80:"bidGreeks",11:"askGreeks",81:"askGreeks",12:"lastGreeks",82:"lastGreeks",13:"modelGreeks",83:"modelGreeks",53:"custGreeks",}EFP\_TICK\_MAP:Final[TickDict]={38:"bidEfp",39:"askEfp",40:"lastEfp",41:"openEfp",42:"highEfp",43:"lowEfp",44:"closeEfp",}STRING\_TICK\_MAP:Final[TickDict]={25:"optionBidExch",26:"optionAskExch",32:"bidExchange",33:"askExchange",84:"lastExchange",85:"lastRegTime",91:"reutersMutualFunds",100:"socialMarketAnalytics",}TIMESTAMP\_TICK\_MAP:Final[TickDict]={45:"lastTimestamp",88:"delayedLastTimestamp",}RT\_VOLUME\_TICK\_MAP:Final[TickDict]={48:"rtVolume",77:"rtTradeVolume",}
[[docs]](../../api.html#ib_async.wrapper.RequestError)class RequestError(Exception):""" Exception to raise when the API reports an error that can be tied to a single request. """def \_\_init\_\_(self, reqId: int, code: int, message: str):""" Args: reqId: Original request ID. code: Original error code. message: Original error message. """super().\_\_init\_\_(f"[reqId {reqId}] API error: {code}: {message}")self.reqId = reqIdself.code = codeself.message = message
@dataclassclass Wrapper:"""Wrapper implementation for use with the IB class."""# reference back to IB so wrapper can access API methodsib:"IB"accountValues:dict[tuple,AccountValue]=field(init=False)""" (account, tag, currency, modelCode) -\> AccountValue """acctSummary:dict[tuple,AccountValue]=field(init=False)""" (account, tag, currency) -\> AccountValue """portfolio:dict[str,dict[int,PortfolioItem]]=field(init=False)""" account -\> conId -\> PortfolioItem """positions:dict[str,dict[int,Position]]=field(init=False)""" account -\> conId -\> Position """trades:dict[OrderKeyType,Trade]=field(init=False)""" (client, orderId) or permId -\> Trade """permId2Trade:dict[int,Trade]=field(init=False)""" permId -\> Trade """fills:dict[str,Fill]=field(init=False)""" execId -\> Fill """newsTicks:list[NewsTick]=field(init=False)msgId2NewsBulletin:dict[int,NewsBulletin]=field(init=False)""" msgId -\> NewsBulletin """tickers:dict[int,Ticker]=field(init=False)""" hash(Contract) -\> Ticker """pendingTickers:set[Ticker]=field(init=False)reqId2Ticker:dict[int,Ticker]=field(init=False)""" reqId -\> Ticker """ticker2ReqId:dict[int|str,dict[Ticker,int]]=field(init=False)""" tickType -\> Ticker -\> reqId """reqId2Subscriber:dict[int,Any]=field(init=False)""" live bars or live scan data """reqId2PnL:dict[int,PnL]=field(init=False)""" reqId -\> PnL """reqId2PnlSingle:dict[int,PnLSingle]=field(init=False)""" reqId -\> PnLSingle """pnlKey2ReqId:dict[tuple,int]=field(init=False)""" (account, modelCode) -\> reqId """pnlSingleKey2ReqId:dict[tuple,int]=field(init=False)""" (account, modelCode, conId) -\> reqId """lastTime:datetime=field(init=False)""" UTC time of last network packet arrival. """# Like 'lastTime' but in time.time() float format instead of a datetime object# (not to be confused with 'lastTimestamp' of Ticker objects which is the timestamp# of the last trade event)time:float=field(init=False)accounts:list[str]=field(init=False)clientId:int=field(init=False)wshMetaReqId:int=field(init=False)wshEventReqId:int=field(init=False)\_reqId2Contract:dict[int,Contract]=field(init=False)\_timeout:float=field(init=False)\_futures:dict[Any,asyncio.Future]=field(init=False)""" \_futures and \_results are linked by key. """\_results:dict[Any,Any]=field(init=False)""" \_futures and \_results are linked by key. """\_logger:logging.Logger=field(default\_factory=lambda:logging.getLogger("ib\_async.wrapper"))\_timeoutHandle:asyncio.TimerHandle|None=None# value used when a field has missing, empty, or not populated datadefaults:IBDefaults=field(default\_factory=IBDefaults)def \_\_post\_init\_\_(self):# extract values from defaults objects just to use locallyself.defaultTimezone=self.defaults.timezoneself.defaultEmptyPrice=self.defaults.emptyPriceself.defaultEmptySize=self.defaults.emptySizeself.reset()def reset(self):self.accountValues={}self.acctSummary={}self.portfolio=defaultdict(dict)self.positions=defaultdict(dict)self.trades={}self.permId2Trade={}self.fills={}self.newsTicks=[]self.msgId2NewsBulletin={}self.tickers={}self.pendingTickers=set()self.reqId2Ticker={}self.ticker2ReqId=defaultdict(dict)self.reqId2Subscriber={}self.reqId2PnL={}self.reqId2PnlSingle={}self.pnlKey2ReqId={}self.pnlSingleKey2ReqId={}self.lastTime=datetime.minself.time=-1self.accounts=[]self.clientId=-1self.wshMetaReqId=0self.wshEventReqId=0self.\_reqId2Contract={}self.\_timeout=0self.\_futures={}self.\_results={}self.setTimeout(0)def setEventsDone(self):"""Set all subscription-type events as done."""events=[ticker.updateEventfortickerinself.tickers.values()]events+=[sub.updateEventforsubinself.reqId2Subscriber.values()]fortradeinself.trades.values():events+=[trade.statusEvent,trade.modifyEvent,trade.fillEvent,trade.filledEvent,trade.commissionReportEvent,trade.cancelEvent,trade.cancelledEvent,]foreventinevents:event.set\_done()def connectionClosed(self):error=ConnectionError("Socket disconnect")forfutureinself.\_futures.values():ifnotfuture.done():future.set\_exception(error)globalErrorEvent.emit(error)self.reset()def startReq(self,key,contract=None,container=None):""" Start a new request and return the future that is associated with the key and container. The container is a list by default. """future:asyncio.Future=asyncio.Future()self.\_futures[key]=futureself.\_results[key]=containerifcontainerisnotNoneelse[]ifcontract:self.\_reqId2Contract[key]=contractreturnfuturedef \_endReq(self,key,result=None,success=True):""" Finish the future of corresponding key with the given result. If no result is given then it will be popped of the general results. """future=self.\_futures.pop(key,None)self.\_reqId2Contract.pop(key,None)iffuture:ifresultisNone:result=self.\_results.pop(key,[])ifnotfuture.done():ifsuccess:future.set\_result(result)else:future.set\_exception(result)def startTicker(self,reqId:int,contract:Contract,tickType:int|str):""" Start a tick request that has the reqId associated with the contract. Return the ticker. """ticker=self.tickers.get(hash(contract))ifnotticker:ticker=Ticker(contract=contract,defaults=self.defaults)self.tickers[hash(contract)]=tickerself.reqId2Ticker[reqId]=tickerself.\_reqId2Contract[reqId]=contractself.ticker2ReqId[tickType][ticker]=reqIdreturntickerdef endTicker(self,ticker:Ticker,tickType:int|str):reqId=self.ticker2ReqId[tickType].pop(ticker,0)self.\_reqId2Contract.pop(reqId,None)returnreqIddef startSubscription(self,reqId,subscriber,contract=None):"""Register a live subscription."""self.\_reqId2Contract[reqId]=contractself.reqId2Subscriber[reqId]=subscriberdef endSubscription(self,subscriber):"""Unregister a live subscription."""self.\_reqId2Contract.pop(subscriber.reqId,None)self.reqId2Subscriber.pop(subscriber.reqId,None)def orderKey(self,clientId:int,orderId:int,permId:int)-\>OrderKeyType:key:OrderKeyTypeiforderId\<=0:# order is placed manually from TWSkey=permIdelse:key=(clientId,orderId)returnkeydef setTimeout(self,timeout:float):self.lastTime=datetime.now(self.defaultTimezone)ifself.\_timeoutHandle:self.\_timeoutHandle.cancel()self.\_timeoutHandle=Noneself.\_timeout=timeoutiftimeout:self.\_setTimer(timeout)def \_setTimer(self,delay:float=0):ifself.lastTime==datetime.min:returnnow=datetime.now(self.defaultTimezone)diff=(now-self.lastTime).total\_seconds()ifnotdelay:delay=self.\_timeout-diffifdelay\>0:loop=getLoop()self.\_timeoutHandle=loop.call\_later(delay,self.\_setTimer)else:self.\_logger.debug("Timeout")self.setTimeout(0)self.ib.timeoutEvent.emit(diff)# Helper methods for tick processingdef \_processTimestampTick(self,ticker:Ticker,fieldName:str,value:str):"""Convert timestamp string to datetime and set on ticker field."""timestamp=int(value)# Only populate if timestamp isn't '0' (we don't want to report "last trade: 20,000 days ago")iftimestamp:setattr(ticker,fieldName,datetime.fromtimestamp(timestamp,self.defaultTimezone),)def \_processRtVolumeTick(self,ticker:Ticker,tickType:int,value:str)-\>tuple[float,float]|None:""" Parse RT Volume or RT Trade Volume tick. Format: price;size;ms since epoch;total volume;VWAP;single trade Example: 701.28;1;1348075471534;67854;701.46918464;true Returns (price, size) tuple if valid, None otherwise. """priceStr,sizeStr,rtTime,volume,vwap,\_=value.split(";")ifvolume:# Set volume field based on tick typevolumeField=RT\_VOLUME\_TICK\_MAP[tickType]setattr(ticker,volumeField,float(volume))ifvwap:ticker.vwap=float(vwap)ifrtTime:ticker.rtTime=datetime.fromtimestamp(int(rtTime)/1000,self.defaultTimezone)ifpriceStr=="":returnNonereturn(float(priceStr),float(sizeStr))# wrapper methodsdef connectAck(self):passdef nextValidId(self,reqId:int):passdef managedAccounts(self,accountsList:str):self.accounts=[aforainaccountsList.split(",")ifa]def updateAccountTime(self,timestamp:str):passdef updateAccountValue(self,tag:str,val:str,currency:str,account:str):key=(account,tag,currency,"")acctVal=AccountValue(account,tag,val,currency,"")self.accountValues[key]=acctValself.ib.accountValueEvent.emit(acctVal)def accountDownloadEnd(self,\_account:str):# sent after updateAccountValue and updatePortfolio both finishedself.\_endReq("accountValues")def accountUpdateMulti(self,reqId:int,account:str,modelCode:str,tag:str,val:str,currency:str,):key=(account,tag,currency,modelCode)acctVal=AccountValue(account,tag,val,currency,modelCode)self.accountValues[key]=acctValself.ib.accountValueEvent.emit(acctVal)def accountUpdateMultiEnd(self,reqId:int):self.\_endReq(reqId)def accountSummary(self,\_reqId:int,account:str,tag:str,value:str,currency:str):key=(account,tag,currency)acctVal=AccountValue(account,tag,value,currency,"")self.acctSummary[key]=acctValself.ib.accountSummaryEvent.emit(acctVal)def accountSummaryEnd(self,reqId:int):self.\_endReq(reqId)def updatePortfolio(self,contract:Contract,posSize:float,marketPrice:float,marketValue:float,averageCost:float,unrealizedPNL:float,realizedPNL:float,account:str,):contract=Contract.recreate(contract)portfItem=PortfolioItem(contract,posSize,marketPrice,marketValue,averageCost,unrealizedPNL,realizedPNL,account,)portfolioItems=self.portfolio[account]ifposSize==0:portfolioItems.pop(contract.conId,None)else:portfolioItems[contract.conId]=portfItemself.\_logger.info(f"updatePortfolio: {portfItem}")self.ib.updatePortfolioEvent.emit(portfItem)def position(self,account:str,contract:Contract,posSize:float,avgCost:float):contract=Contract.recreate(contract)position=Position(account,contract,posSize,avgCost)positions=self.positions[account]# if this updates position to 0 quantity, remove the positionifposSize==0:positions.pop(contract.conId,None)else:# else, add or replace the position in-placepositions[contract.conId]=positionself.\_logger.info(f"position: {position}")results=self.\_results.get("positions")ifresultsisnotNone:results.append(position)self.ib.positionEvent.emit(position)def positionEnd(self):self.\_endReq("positions")def positionMulti(self,reqId:int,account:str,modelCode:str,contract:Contract,pos:float,avgCost:float,):passdef positionMultiEnd(self,reqId:int):passdef pnl(self,reqId:int,dailyPnL:float,unrealizedPnL:float,realizedPnL:float):pnl=self.reqId2PnL.get(reqId)ifnotpnl:returnpnl.dailyPnL=dailyPnLpnl.unrealizedPnL=unrealizedPnLpnl.realizedPnL=realizedPnLself.ib.pnlEvent.emit(pnl)def pnlSingle(self,reqId:int,pos:int,dailyPnL:float,unrealizedPnL:float,realizedPnL:float,value:float,):pnlSingle=self.reqId2PnlSingle.get(reqId)ifnotpnlSingle:returnpnlSingle.position=pospnlSingle.dailyPnL=dailyPnLpnlSingle.unrealizedPnL=unrealizedPnLpnlSingle.realizedPnL=realizedPnLpnlSingle.value=valueself.ib.pnlSingleEvent.emit(pnlSingle)def openOrder(self,orderId:int,contract:Contract,order:Order,orderState:OrderState):""" This wrapper is called to: \* feed in open orders at startup; \* feed in open orders or order updates from other clients and TWS if clientId=master id; \* feed in manual orders and order updates from TWS if clientId=0; \* handle openOrders and allOpenOrders responses. """iforder.whatIf:# response to whatIfOrderiffloat(orderState.initMarginChange)!=UNSET\_DOUBLE:self.\_endReq(order.orderId,orderState)else:key=self.orderKey(order.clientId,order.orderId,order.permId)trade=self.trades.get(key)iftrade:trade.order.permId=order.permIdtrade.order.totalQuantity=order.totalQuantitytrade.order.lmtPrice=order.lmtPricetrade.order.auxPrice=order.auxPricetrade.order.orderType=order.orderTypetrade.order.orderRef=order.orderRefelse:# ignore '?' values in the orderorder=Order(\*\*{k:vfork,vindataclassAsDict(order).items()ifv!="?"})contract=Contract.recreate(contract)orderStatus=OrderStatus(orderId=orderId,status=orderState.status)trade=Trade(contract,order,orderStatus,[],[])self.trades[key]=tradeself.\_logger.info(f"openOrder: {trade}")self.permId2Trade.setdefault(order.permId,trade)results=self.\_results.get("openOrders")ifresultsisNone:self.ib.openOrderEvent.emit(trade)else:# response to reqOpenOrders or reqAllOpenOrdersresults.append(trade)# make sure that the client issues order ids larger than any# order id encountered (even from other clients) to avoid# "Duplicate order id" errorself.ib.client.updateReqId(orderId+1)def openOrderEnd(self):self.\_endReq("openOrders")def completedOrder(self,contract:Contract,order:Order,orderState:OrderState):contract=Contract.recreate(contract)orderStatus=OrderStatus(orderId=order.orderId,status=orderState.status)trade=Trade(contract,order,orderStatus,[],[])self.\_results["completedOrders"].append(trade)iforder.permIdnotinself.permId2Trade:self.trades[order.permId]=tradeself.permId2Trade[order.permId]=tradedef completedOrdersEnd(self):self.\_endReq("completedOrders")def orderStatus(self,orderId:int,status:str,filled:float,remaining:float,avgFillPrice:float,permId:int,parentId:int,lastFillPrice:float,clientId:int,whyHeld:str,mktCapPrice:float=0.0,):key=self.orderKey(clientId,orderId,permId)trade=self.trades.get(key)iftrade:msg:str|NoneoldStatus=trade.orderStatus.statusnew=dict(status=status,filled=filled,remaining=remaining,avgFillPrice=avgFillPrice,permId=permId,parentId=parentId,lastFillPrice=lastFillPrice,clientId=clientId,whyHeld=whyHeld,mktCapPrice=mktCapPrice,)curr=dataclassAsDict(trade.orderStatus)isChanged=curr!={\*\*curr,\*\*new}ifisChanged:dataclassUpdate(trade.orderStatus,\*\*new)msg=""elif(status=="Submitted"andtrade.logandtrade.log[-1].message=="Modify"):# order modifications are acknowledgedmsg="Modified"else:msg=NoneifmsgisnotNone:logEntry=TradeLogEntry(self.lastTime,status,msg)trade.log.append(logEntry)self.\_logger.info(f"orderStatus: {trade}")self.ib.orderStatusEvent.emit(trade)trade.statusEvent.emit(trade)ifstatus!=oldStatus:ifstatus==OrderStatus.Filled:trade.filledEvent.emit(trade)elifstatus==OrderStatus.Cancelled:trade.cancelledEvent.emit(trade)else:self.\_logger.error("orderStatus: No order found for orderId %s and clientId %s",orderId,clientId,)def execDetails(self,reqId:int,contract:Contract,execution:Execution):""" This wrapper handles both live fills and responses to reqExecutions. """self.\_logger.info(f"execDetails {execution}")ifexecution.orderId==UNSET\_INTEGER:# bug in TWS: executions of manual orders have unset valueexecution.orderId=0trade=self.permId2Trade.get(execution.permId)ifnottrade:key=self.orderKey(execution.clientId,execution.orderId,execution.permId)trade=self.trades.get(key)# TODO: debug why spread contracts aren't fully detailed here. They have no legs in execDetails, but they do in orderStatus?iftradeandcontract==trade.contract:contract=trade.contractelse:contract=Contract.recreate(contract)execId=execution.execIdisLive=reqIdnotinself.\_futurestime=self.lastTimeifisLiveelseexecution.timefill=Fill(contract,execution,CommissionReport(),time)ifexecIdnotinself.fills:# first time we see this execution so add itself.fills[execId]=filliftrade:trade.fills.append(fill)logEntry=TradeLogEntry(time,trade.orderStatus.status,f"Fill {execution.shares}@{execution.price}",)trade.log.append(logEntry)ifisLive:self.\_logger.info(f"execDetails: {fill}")self.ib.execDetailsEvent.emit(trade,fill)trade.fillEvent(trade,fill)ifnotisLive:self.\_results[reqId].append(fill)def execDetailsEnd(self,reqId:int):self.\_endReq(reqId)def commissionReport(self,commissionReport:CommissionReport):ifcommissionReport.yield\_==UNSET\_DOUBLE:commissionReport.yield\_=0.0ifcommissionReport.realizedPNL==UNSET\_DOUBLE:commissionReport.realizedPNL=0.0fill=self.fills.get(commissionReport.execId)iffill:report=dataclassUpdate(fill.commissionReport,commissionReport)self.\_logger.info(f"commissionReport: {report}")trade=self.permId2Trade.get(fill.execution.permId)iftrade:self.ib.commissionReportEvent.emit(trade,fill,report)trade.commissionReportEvent.emit(trade,fill,report)else:# this is not a live execution and the order was filled# before this connection startedpasselse:# commission report is not for this clientpassdef orderBound(self,reqId:int,apiClientId:int,apiOrderId:int):passdef contractDetails(self,reqId:int,contractDetails:ContractDetails):self.\_results[reqId].append(contractDetails)bondContractDetails=contractDetailsdef contractDetailsEnd(self,reqId:int):self.\_endReq(reqId)def symbolSamples(self,reqId:int,contractDescriptions:list[ContractDescription]):self.\_endReq(reqId,contractDescriptions)def marketRule(self,marketRuleId:int,priceIncrements:list[PriceIncrement]):self.\_endReq(f"marketRule-{marketRuleId}",priceIncrements)def marketDataType(self,reqId:int,marketDataId:int):ticker=self.reqId2Ticker.get(reqId)ifticker:ticker.marketDataType=marketDataIddef realtimeBar(self,reqId:int,time:int,open\_:float,high:float,low:float,close:float,volume:float,wap:float,count:int,):dt=datetime.fromtimestamp(time,self.defaultTimezone)bar=RealTimeBar(dt,-1,open\_,high,low,close,volume,wap,count)bars=self.reqId2Subscriber.get(reqId)ifbarsisnotNone:bars.append(bar)self.ib.barUpdateEvent.emit(bars,True)bars.updateEvent.emit(bars,True)def historicalData(self,reqId:int,bar:BarData):results=self.\_results.get(reqId)ifresultsisnotNone:bar.date=parseIBDatetime(bar.date)# type: ignoreresults.append(bar)def historicalDataEnd(self,reqId,\_start:str,\_end:str):self.\_endReq(reqId)def historicalDataUpdate(self,reqId:int,bar:BarData):bars=self.reqId2Subscriber.get(reqId)ifnotbars:returnbar.date=parseIBDatetime(bar.date)# type: ignorelastDate=bars[-1].dateifbar.date\<lastDate:returnhasNewBar=len(bars)==0orbar.date\>lastDateifhasNewBar:bars.append(bar)elifbars[-1]!=bar:bars[-1]=barelse:returnself.ib.barUpdateEvent.emit(bars,hasNewBar)bars.updateEvent.emit(bars,hasNewBar)def headTimestamp(self,reqId:int,headTimestamp:str):try:dt=parseIBDatetime(headTimestamp)self.\_endReq(reqId,dt)exceptValueErrorasexc:self.\_endReq(reqId,exc,False)def historicalTicks(self,reqId:int,ticks:list[HistoricalTick],done:bool):result=self.\_results.get(reqId)ifresultisnotNone:result+=ticksifdone:self.\_endReq(reqId)def historicalTicksBidAsk(self,reqId:int,ticks:list[HistoricalTickBidAsk],done:bool):result=self.\_results.get(reqId)ifresultisnotNone:result+=ticksifdone:self.\_endReq(reqId)def historicalTicksLast(self,reqId:int,ticks:list[HistoricalTickLast],done:bool):result=self.\_results.get(reqId)ifresultisnotNone:result+=ticksifdone:self.\_endReq(reqId)# additional wrapper method provided by Clientdef priceSizeTick(self,reqId:int,tickType:int,price:float,size:float):ticker=self.reqId2Ticker.get(reqId)ifnotticker:self.\_logger.error(f"priceSizeTick: Unknown reqId: {reqId}")return# self.\_logger.error(f"WHAT R U DOING: {tickType=} {price=} {size=}")# Allow overwriting IBKR's default "empty price" of -1 when there is no qty/size on a side.# https://interactivebrokers.github.io/tws-api/tick\_types.htmliftickTypein{1,66}:# Note: Keep these size==0 overrides INSIDE each tickType where it is needed because# other tickTypes like open/high/low/close are values with size=0 but those# are still valid prices to receive.# Bid/Ask updates always have a Price+Size delivered at the same time, while the# other properties are mainly price-only delivery methods.ifsize==0:price=self.defaultEmptyPricesize=self.defaultEmptySizeticker.prevBid=ticker.bidticker.prevBidSize=ticker.bidSizeticker.bid=priceticker.bidSize=sizeeliftickTypein{2,67}:ifsize==0:price=self.defaultEmptyPricesize=self.defaultEmptySizeticker.prevAsk=ticker.askticker.prevAskSize=ticker.askSizeticker.ask=priceticker.askSize=sizeeliftickTypein{4,68}:# for 'last' values, price can be valid with size=0 for updates like 'last SPX price' since SPX doesn't trade# Workaround: for TICK-NYSE, it is valid to have price=-1, size=0 because it can float between -10,000 and 10,000# and it also never reports a size. As a workaround, check if ticker.close exists as a proxy for "not TICK-NYSE"# because TICK-NYSE never has open/close values populated.ifprice==-1andsize==0andticker.close\>0:price=self.defaultEmptyPricesize=self.defaultEmptySize# BUG? IBKR is sometimes sending a GOOD VALUE followed by a PREVIOUS value all under tickType=4?# e.g. I get the SPX close delivered first with size=0, then I get another data point with size=1 priced one point lower,# but since the older price is delivered second, it replaces the "last" value with a wrong value? Not sure if it's# an IBKR data problem or a logic problem somewhere here?# More research: IBKR also shows the bad value in their own app, so there is a data bug in their own server logic somewhere.# self.\_logger.error(f"[{tickType=}] updating last price size: {price=} {size=} :: BEFORE {ticker=}")# self.\_logger.error(f"[{tickType=}] SETTING {ticker.prevLast=} = {ticker.last=}; {ticker.prevLastSize=} = {ticker.lastSize=}")ticker.prevLast=ticker.lastticker.prevLastSize=ticker.lastSizeticker.last=priceticker.lastSize=size# self.\_logger.error(f"[{tickType=}] SET {ticker.prevLast=} = {ticker.last=}; {ticker.prevLastSize=} = {ticker.lastSize=}")# self.\_logger.error(f"[{tickType=}] updating last price size: {price=} {size=} :: AFTER {ticker=}")else:asserttickTypeinPRICE\_TICK\_MAP,(f"Received tick {tickType=} {price=} but we don't have an attribute mapping for it? Triggered from {ticker.contract=}")setattr(ticker,PRICE\_TICK\_MAP[tickType],price)ifpriceorsize:tick=TickData(self.lastTime,tickType,price,size)ticker.ticks.append(tick)self.pendingTickers.add(ticker)def tickSize(self,reqId:int,tickType:int,size:float):ticker=self.reqId2Ticker.get(reqId)ifnotticker:self.\_logger.error(f"tickSize: Unknown reqId: {reqId}")returnprice=self.defaultEmptyPrice# self.\_logger.error(# f"tickSize with tickType {tickType}: " f"processing value: {size!r}"# )# https://interactivebrokers.github.io/tws-api/tick\_types.htmliftickTypein{0,69}:ifsize==ticker.bidSize:returnticker.prevBidSize=ticker.bidSizeifsize==0:ticker.bid=self.defaultEmptyPriceticker.bidSize=self.defaultEmptySizeelse:price=ticker.bidticker.bidSize=sizeeliftickTypein{3,70}:ifsize==ticker.askSize:returnticker.prevAskSize=ticker.askSizeifsize==0:ticker.ask=self.defaultEmptyPriceticker.askSize=self.defaultEmptySizeelse:price=ticker.askticker.askSize=sizeeliftickTypein{5,71}:price=ticker.lastifticker.isUnset(price):returnifsize!=ticker.lastSize:ticker.prevLastSize=ticker.lastSizeticker.lastSize=sizeelse:asserttickTypeinSIZE\_TICK\_MAP,(f"Received tick {tickType=} {size=} but we don't have an attribute mapping for it? Triggered from {ticker.contract=}")setattr(ticker,SIZE\_TICK\_MAP[tickType],size)ifpriceorsize:tick=TickData(self.lastTime,tickType,price,size)ticker.ticks.append(tick)self.pendingTickers.add(ticker)def tickSnapshotEnd(self,reqId:int):self.\_endReq(reqId)def tickByTickAllLast(self,reqId:int,tickType:int,time:int,price:float,size:float,tickAttribLast:TickAttribLast,exchange,specialConditions,):ticker=self.reqId2Ticker.get(reqId)ifnotticker:self.\_logger.error(f"tickByTickAllLast: Unknown reqId: {reqId}")returnifprice==-1andsize==0:price=self.defaultEmptyPricesize=self.defaultEmptySizeticker.prevLast=ticker.lastticker.prevLastSize=ticker.lastSizeticker.last=priceticker.lastSize=sizetick=TickByTickAllLast(tickType,self.lastTime,price,size,tickAttribLast,exchange,specialConditions,)ticker.tickByTicks.append(tick)self.pendingTickers.add(ticker)def tickByTickBidAsk(self,reqId:int,time:int,bidPrice:float,askPrice:float,bidSize:float,askSize:float,tickAttribBidAsk:TickAttribBidAsk,):ticker=self.reqId2Ticker.get(reqId)ifnotticker:self.\_logger.error(f"tickByTickBidAsk: Unknown reqId: {reqId}")returnifbidPrice!=ticker.bid:ticker.prevBid=ticker.bidticker.bid=bidPriceifbidPrice\>0elseself.defaultEmptyPriceifbidSize!=ticker.bidSize:ticker.prevBidSize=ticker.bidSizeticker.bidSize=bidSizeifbidSize\>0elseself.defaultEmptySizeifaskPrice!=ticker.ask:ticker.prevAsk=ticker.askticker.ask=askPriceifaskPrice\>0elseself.defaultEmptyPriceifaskSize!=ticker.askSize:ticker.prevAskSize=ticker.askSizeticker.askSize=askSizeifaskSize\>0elseself.defaultEmptySizetick=TickByTickBidAsk(self.lastTime,bidPrice,askPrice,bidSize,askSize,tickAttribBidAsk)ticker.tickByTicks.append(tick)self.pendingTickers.add(ticker)def tickByTickMidPoint(self,reqId:int,time:int,midPoint:float):ticker=self.reqId2Ticker.get(reqId)ifnotticker:self.\_logger.error(f"tickByTickMidPoint: Unknown reqId: {reqId}")returntick=TickByTickMidPoint(self.lastTime,midPoint)ticker.tickByTicks.append(tick)self.pendingTickers.add(ticker)def tickString(self,reqId:int,tickType:int,value:str):ifnot(ticker:=self.reqId2Ticker.get(reqId)):returntry:# Simple string assignments (O(1) dict lookup)iftickTypeinSTRING\_TICK\_MAP:setattr(ticker,STRING\_TICK\_MAP[tickType],value)eliftickTypeinTIMESTAMP\_TICK\_MAP:# Timestamp conversion (O(1) dict lookup)self.\_processTimestampTick(ticker,TIMESTAMP\_TICK\_MAP[tickType],value)eliftickType==47:# https://web.archive.org/web/20200725010343/https://interactivebrokers.github.io/tws-api/fundamental\_ratios\_tags.htmld=dict(t.split("=")fortinvalue.split(";")ift# type: ignore)# type: ignorefork,vind.items():withsuppress(ValueError):ifv=="-99999.99":v="nan"d[k]=float(v)# type: ignored[k]=int(v)# type: ignoreticker.fundamentalRatios=FundamentalRatios(\*\*d)eliftickTypeinRT\_VOLUME\_TICK\_MAP:# RT Volume or RT Trade Volume (O(1) dict lookup + helper)result=self.\_processRtVolumeTick(ticker,tickType,value)ifresult:price,size=resultticker.prevLast=ticker.lastticker.prevLastSize=ticker.lastSizeticker.last=priceticker.lastSize=sizetick=TickData(self.lastTime,tickType,price,size)ticker.ticks.append(tick)eliftickType==59:# Dividend tick:# https://interactivebrokers.github.io/tws-api/tick\_types.html#ib\_dividends# example value: '0.83,0.92,20130219,0.23'past12,next12,nextDate,nextAmount=value.split(",")ticker.dividends=Dividends(float(past12)ifpast12elseNone,float(next12)ifnext12elseNone,parseIBDatetime(nextDate)ifnextDateelseNone,float(nextAmount)ifnextAmountelseNone,)else:self.\_logger.error(f"tickString with tickType {tickType}: unhandled value: {value!r}")self.pendingTickers.add(ticker)exceptValueError:self.\_logger.error(f"tickString with tickType {tickType}: malformed value: {value!r}")def tickGeneric(self,reqId:int,tickType:int,value:float):ticker=self.reqId2Ticker.get(reqId)ifnotticker:returntry:value=float(value)value=valueifvalue\>0elseself.defaultEmptySizeexceptValueError:self.\_logger.error(f"[tickType {tickType}] genericTick: malformed value: {value!r}")returnasserttickTypeinGENERIC\_TICK\_MAP,(f"Received tick {tickType=} {value=} but we don't have an attribute mapping for it? Triggered from {ticker.contract=}")setattr(ticker,GENERIC\_TICK\_MAP[tickType],value)tick=TickData(self.lastTime,tickType,value,0)ticker.ticks.append(tick)self.pendingTickers.add(ticker)def tickReqParams(self,reqId:int,minTick:float,bboExchange:str,snapshotPermissions:int):ticker=self.reqId2Ticker.get(reqId)ifnotticker:returnticker.minTick=minTickticker.bboExchange=bboExchangeticker.snapshotPermissions=snapshotPermissionsdef smartComponents(self,reqId,components):self.\_endReq(reqId,components)def mktDepthExchanges(self,depthMktDataDescriptions:list[DepthMktDataDescription]):self.\_endReq("mktDepthExchanges",depthMktDataDescriptions)def updateMktDepth(self,reqId:int,position:int,operation:int,side:int,price:float,size:float,):self.updateMktDepthL2(reqId,position,"",operation,side,price,size)def updateMktDepthL2(self,reqId:int,position:int,marketMaker:str,operation:int,side:int,price:float,size:float,isSmartDepth:bool=False,):# operation: 0 = insert, 1 = update, 2 = delete# side: 0 = ask, 1 = bidticker=self.reqId2Ticker[reqId]# 'dom' is a dict so we can address position updates directlydom=ticker.domBidsDictifsideelseticker.domAsksDict# if you're curious when these operations run and what they do, enable this too:# fmt: off# print("BID" if side else "ASK", "OPERATION", operation, "at position", position, "for price", price, "at qty", size)# assert list(dom.keys()) == list(range(0, len(dom))), f"Keys aren't sequential? {dom} :: {ticker}"# fmt: onifoperationin{0,1}:# '0' is INSERT NEW# '1' is UPDATE EXISTING# We are using the same operation for "insert or overwrite" directly.dom[position]=DOMLevel(price,size,marketMaker)elifoperation==2:# '2' is DELETE EXISTINGsize=0try:level=dom.pop(position)price=level.priceexceptExceptionas\_:# invalid position requested for removal, so ignore the requestpass# To retain the original API structure, we convert all sorted dict# values into lists for users to consume.# Users can also read ticker.domBidsDict or ticker.domAsksDict directly.values=list(dom.values())ifside:# Update BID for usersticker.domBids=valueselse:# Update ASK for usersticker.domAsks=values# TODO: add optional debugging check. In a correctly working system, we should# technically always have sequential bid and ask position entries, but# in the past we have seen gaps or missing values.tick=MktDepthData(self.lastTime,position,marketMaker,operation,side,price,size)ticker.domTicks.append(tick)self.pendingTickers.add(ticker)def tickOptionComputation(self,reqId:int,tickType:int,tickAttrib:int,impliedVol:float,delta:float,optPrice:float,pvDividend:float,gamma:float,vega:float,theta:float,undPrice:float,):comp=OptionComputation(tickAttrib,impliedVolifimpliedVol!=-1elseNone,deltaifdelta!=-2elseNone,optPriceifoptPrice!=-1elseNone,pvDividendifpvDividend!=-1elseNone,gammaifgamma!=-2elseNone,vegaifvega!=-2elsevega,thetaiftheta!=-2elsetheta,undPriceifundPrice!=-1elseNone,)ticker=self.reqId2Ticker.get(reqId)ifticker:# reply from reqMktData# https://interactivebrokers.github.io/tws-api/tick\_types.htmlasserttickTypeinGREEKS\_TICK\_MAP,(f"Received tick {tickType=} {tickAttrib=} but we don't have an attribute mapping for it? Triggered from {ticker.contract=}")setattr(ticker,GREEKS\_TICK\_MAP[tickType],comp)self.pendingTickers.add(ticker)elifreqIdinself.\_futures:# reply from calculateImpliedVolatility or calculateOptionPriceself.\_endReq(reqId,comp)else:self.\_logger.error(f"tickOptionComputation: Unknown reqId: {reqId}")def deltaNeutralValidation(self,reqId:int,dnc:DeltaNeutralContract):passdef fundamentalData(self,reqId:int,data:str):self.\_endReq(reqId,data)def scannerParameters(self,xml:str):self.\_endReq("scannerParams",xml)def scannerData(self,reqId:int,rank:int,contractDetails:ContractDetails,distance:str,benchmark:str,projection:str,legsStr:str,):data=ScanData(rank,contractDetails,distance,benchmark,projection,legsStr)dataList=self.reqId2Subscriber.get(reqId)ifdataListisNone:dataList=self.\_results.get(reqId)ifdataListisnotNone:ifrank==0:dataList.clear()dataList.append(data)def scannerDataEnd(self,reqId:int):dataList=self.\_results.get(reqId)ifdataListisnotNone:self.\_endReq(reqId)else:dataList=self.reqId2Subscriber.get(reqId)ifdataListisnotNone:self.ib.scannerDataEvent.emit(dataList)dataList.updateEvent.emit(dataList)def histogramData(self,reqId:int,items:list[HistogramData]):result=[HistogramData(item.price,item.count)foriteminitems]self.\_endReq(reqId,result)def securityDefinitionOptionParameter(self,reqId:int,exchange:str,underlyingConId:int,tradingClass:str,multiplier:str,expirations:list[str],strikes:list[float],):chain=OptionChain(exchange,underlyingConId,tradingClass,multiplier,expirations,strikes)self.\_results[reqId].append(chain)def securityDefinitionOptionParameterEnd(self,reqId:int):self.\_endReq(reqId)def newsProviders(self,newsProviders:list[NewsProvider]):newsProviders=[NewsProvider(code=p.code,name=p.name)forpinnewsProviders]self.\_endReq("newsProviders",newsProviders)def tickNews(self,\_reqId:int,timeStamp:int,providerCode:str,articleId:str,headline:str,extraData:str,):news=NewsTick(timeStamp,providerCode,articleId,headline,extraData)self.newsTicks.append(news)self.ib.tickNewsEvent.emit(news)def newsArticle(self,reqId:int,articleType:int,articleText:str):article=NewsArticle(articleType,articleText)self.\_endReq(reqId,article)def historicalNews(self,reqId:int,time:str,providerCode:str,articleId:str,headline:str):dt=parseIBDatetime(time)dt=cast(datetime,dt)article=HistoricalNews(dt,providerCode,articleId,headline)self.\_results[reqId].append(article)def historicalNewsEnd(self,reqId,\_hasMore:bool):self.\_endReq(reqId)def updateNewsBulletin(self,msgId:int,msgType:int,message:str,origExchange:str):bulletin=NewsBulletin(msgId,msgType,message,origExchange)self.msgId2NewsBulletin[msgId]=bulletinself.ib.newsBulletinEvent.emit(bulletin)def receiveFA(self,\_faDataType:int,faXmlData:str):self.\_endReq("requestFA",faXmlData)def currentTime(self,time:int):dt=datetime.fromtimestamp(time,self.defaultTimezone)self.\_endReq("currentTime",dt)def tickEFP(self,reqId:int,tickType:int,basisPoints:float,formattedBasisPoints:str,totalDividends:float,holdDays:int,futureLastTradeDate:str,dividendImpact:float,dividendsToLastTradeDate:float,):ticker=self.reqId2Ticker.get(reqId)ifnotticker:return# Create EFP data object with all available information# Note: totalDividends parameter is actually the implied future price per IBKR docsefpData=EfpData(basisPoints=basisPoints,formattedBasisPoints=formattedBasisPoints,impliedFuture=totalDividends,holdDays=holdDays,futureLastTradeDate=futureLastTradeDate,dividendImpact=dividendImpact,dividendsToLastTradeDate=dividendsToLastTradeDate,)# Store in appropriate field based on tick type (O(1) dict lookup)iftickTypeinEFP\_TICK\_MAP:setattr(ticker,EFP\_TICK\_MAP[tickType],efpData)self.pendingTickers.add(ticker)def historicalSchedule(self,reqId:int,startDateTime:str,endDateTime:str,timeZone:str,sessions:list[HistoricalSession],):schedule=HistoricalSchedule(startDateTime,endDateTime,timeZone,sessions)self.\_endReq(reqId,schedule)def wshMetaData(self,reqId:int,dataJson:str):self.ib.wshMetaEvent.emit(dataJson)self.\_endReq(reqId,dataJson)def wshEventData(self,reqId:int,dataJson:str):self.ib.wshEvent.emit(dataJson)self.\_endReq(reqId,dataJson)def userInfo(self,reqId:int,whiteBrandingId:str):self.\_endReq(reqId)def softDollarTiers(self,reqId:int,tiers:list[SoftDollarTier]):passdef familyCodes(self,familyCodes:list[FamilyCode]):passdef error(self,reqId:int,errorCode:int,errorString:str,advancedOrderRejectJson:str):# https://interactivebrokers.github.io/tws-api/message\_codes.html# https://ibkrcampus.com/campus/ibkr-api-page/twsapi-doc/#api-error-codesisRequest=reqIdinself.\_futurestrade=None# reqId is a local orderId, but is delivered as -1 if this is a non-order-related errorifreqId!=-1:trade=self.trades.get((self.clientId,reqId))# Warnings are currently:# 105 - Order being modified does not match the original order. (?)# 110 - The price does not conform to the minimum price variation for this contract.# 165 - Historical market Data Service query message.# 321 - Server error when validating an API client request.# 329 - Order modify failed. Cannot change to the new order type.# 399 - Order message error# 404 - Shares for this order are not immediately available for short sale. The order will be held while we attempt to locate the shares.# 434 - The order size cannot be zero.# 492 - ? not listed# 10167 ? not listed# Note: error 321 means error validing, but if the message is the result of a MODIFY, the order \_is still live\_ and we must not delete it.# TODO: investigate if error 321 happens on \_new\_ order placement with incorrect parameters too, then we should probably delete the order.# Previously this was included as a Warning condition, but 202 is literally "Order Canceled" error status, so now it is an order-delete error:# 202 - Order cancelled - Reason:warningCodes=frozenset({105,110,165,321,329,399,404,434,492,10167})isWarning=errorCodeinwarningCodesor2100\<=errorCode\<2200iferrorCode==110andisRequest:# whatIf request failedisWarning=Falseif(errorCode==110andtradeandtrade.orderStatus.status==OrderStatus.PendingSubmit):# invalid price for a new order must cancel itisWarning=Falsemsg=f"{'Warning' if isWarning else 'Error'} {errorCode}, reqId {reqId}: {errorString}"contract=self.\_reqId2Contract.get(reqId)ifcontract:msg+=f", contract: {contract}"ifisWarning:# Record warnings into the trade object, but unlike the \_error\_ case,# DO NOT delete the trade object because the order is STILL LIVE at the broker.iftrade:status=trade.orderStatus.status=OrderStatus.ValidationErrorlogEntry=TradeLogEntry(self.lastTime,status,msg,errorCode)trade.log.append(logEntry)self.\_logger.warning(f"IBKR API validation warning: {trade}")self.ib.orderStatusEvent.emit(trade)trade.statusEvent.emit(trade)else:# else, this is a non-trade-related warning messageself.\_logger.info(msg)else:self.\_logger.error(msg)ifisRequest:# the request failedifself.ib.RaiseRequestErrors:error=RequestError(reqId,errorCode,errorString)self.\_endReq(reqId,error,success=False)else:self.\_endReq(reqId)eliftrade:# something is wrong with the order, cancel itifadvancedOrderRejectJson:trade.advancedError=advancedOrderRejectJson# Errors can mean two things:# - new order is REJECTED# - existing order is server-canceled (DAY orders, margin problems)# - modification to \*existing\* order just has an update error, but the order is STILL LIVEifnottrade.isDone():status=trade.orderStatus.status=OrderStatus.CancelledlogEntry=TradeLogEntry(self.lastTime,status,msg,errorCode)trade.log.append(logEntry)self.\_logger.warning(f"Canceled order: {trade}")self.ib.orderStatusEvent.emit(trade)trade.statusEvent.emit(trade)trade.cancelledEvent.emit(trade)iferrorCode==165:# for scan data subscription there are no longer matching resultsdataList=self.reqId2Subscriber.get(reqId)ifdataList:dataList.clear()dataList.updateEvent.emit(dataList)eliferrorCode==317:# Market depth data has been RESETticker=self.reqId2Ticker.get(reqId)ifticker:# clear all DOM levelsticker.domTicks+=[MktDepthData(self.lastTime,0,"",2,0,level.price,0)forlevelinticker.domAsks]ticker.domTicks+=[MktDepthData(self.lastTime,0,"",2,1,level.price,0)forlevelinticker.domBids]ticker.domAsks.clear()ticker.domBids.clear()ticker.domBidsDict.clear()ticker.domAsksDict.clear()self.pendingTickers.add(ticker)eliferrorCode==10225:# Bust event occurred, current subscription is deactivated.# Please resubscribe real-time bars immediatelybars=self.reqId2Subscriber.get(reqId)ifisinstance(bars,RealTimeBarList):self.ib.client.cancelRealTimeBars(reqId)self.ib.client.reqRealTimeBars(reqId,bars.contract,bars.barSize,bars.whatToShow,bars.useRTH,bars.realTimeBarsOptions,)elifisinstance(bars,BarDataList):self.ib.client.cancelHistoricalData(reqId)self.ib.client.reqHistoricalData(reqId,bars.contract,bars.endDateTime,bars.durationStr,bars.barSizeSetting,bars.whatToShow,bars.useRTH,bars.formatDate,bars.keepUpToDate,bars.chartOptions,)self.ib.errorEvent.emit(reqId,errorCode,errorString,contract)def tcpDataArrived(self):self.lastTime=datetime.now(self.defaultTimezone)self.time=time.time()fortickerinself.pendingTickers:ticker.ticks=[]ticker.tickByTicks=[]ticker.domTicks=[]self.pendingTickers=set()def tcpDataProcessed(self):self.ib.updateEvent.emit()ifself.pendingTickers:fortickerinself.pendingTickers:ticker.time=self.lastTimeticker.timestamp=self.timeticker.updateEvent.emit(ticker)self.ib.pendingTickersEvent.emit(self.pendingTickers)