這是示例瞭如何使用多線程處理API消息。 app.run()作爲單獨的線程啓動,它正在偵聽TWS API響應。然後主程序向ContractDetails發送5個請求,然後主程序等待響應10秒。當響應準備好處理時,TWS API消息存儲在應用實例和簡單的信號量信號中。
這是我的第一個多線程程序,歡迎任何評論。
from ibapi import wrapper
from ibapi.client import EClient
from ibapi.utils import iswrapper #just for decorator
from ibapi.common import *
from ibapi.contract import *
from ibapi.ticktype import *
#from OrderSamples import OrderSamples
import threading
import time
class myThread (threading.Thread):
def __init__(self, app, threadID, name):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.app = app
def run(self):
print ("Starting application in separate thread:", self.name, "threadID:", self.threadID )
self.app.run()
print ("Exiting " + self.name)
class TestApp(wrapper.EWrapper, EClient):
def __init__(self):
wrapper.EWrapper.__init__(self)
EClient.__init__(self, wrapper=self)
self.started = False
self.nextValidOrderId = 0
self.reqData = {} # store data returned by requests
self.reqStatus = {} # semaphore of requests - status End will indicate request is finished
@iswrapper
def nextValidId(self, orderId:int):
print("setting nextValidOrderId: %d", orderId)
self.nextValidOrderId = orderId
@iswrapper
def error(self, reqId:TickerId, errorCode:int, errorString:str):
print("Error. Id: " , reqId, " Code: " , errorCode , " Msg: " , errorString)
@iswrapper
# ! [contractdetails]
def contractDetails(self, reqId: int, contractDetails: ContractDetails):
super().contractDetails(reqId, contractDetails)
# store response in reqData dict, for each request several objects are appended into list
if not reqId in self.reqData:
self.reqData[reqId] = []
self.reqData[reqId].append(contractDetails) # put returned data into data storage dict
# ! [contractdetails]
@iswrapper
# ! [contractdetailsend]
def contractDetailsEnd(self, reqId: int):
super().contractDetailsEnd(reqId)
print("ContractDetailsEnd. ", reqId, "\n") # just info
self.reqStatus[reqId] = 'End' # indicates the response is ready for further processing
# ! [contractdetailsend]
def main():
app = TestApp()
app.connect("127.0.0.1", 4001, clientId=123)
print("serverVersion:%s connectionTime:%s" % (app.serverVersion(),
app.twsConnectionTime()))
thread1App = myThread(app, 1, "Thread-1") # define thread for sunning app
thread1App.start() # start app.run(] as infitnite loop in separate thread
print('Requesting MSFT contract details:')
contract = Contract()
contract.symbol = "MSFT"
contract.secType = "STK"
contract.currency = "USD"
contract.exchange = ""
app.reqStatus[210] = 'Sent' # set request status to "sent to TWS"
app.reqContractDetails(210, contract)
print('Requesting IBM contract details:')
contract.symbol = "IBM"
app.reqStatus[211] = 'Sent'
app.reqContractDetails(211, contract)
print('Requesting IBM contract details:')
contract.symbol = "GE"
app.reqStatus[212] = 'Sent'
app.reqContractDetails(212, contract)
print('Requesting IBM contract details:')
contract.symbol = "GM"
app.reqStatus[213] = 'Sent'
app.reqContractDetails(213, contract)
print('Requesting IBM contract details:')
contract.symbol = "BAC"
app.reqStatus[214] = 'Sent'
app.reqContractDetails(214, contract)
i = 0
while i < 100: # exit loop after 10 sec (100 x time.sleep(0.1)
i = i+1
for reqId in app.reqStatus:
if app.reqStatus[reqId] == 'End':
for contractDetails in app.reqData[reqId]:
print("ContractDetails. ReqId:", reqId, contractDetails.summary.symbol,
contractDetails.summary.secType, "ConId:", contractDetails.summary.conId,
"@", contractDetails.summary.exchange)
app.reqStatus[reqId] = 'Processed'
time.sleep(0.1)
app.done = True # this stops app.run() loop
if __name__ == "__main__":
main()
非常感謝你爲你的輸入。我急於嘗試,但由於某種原因,我收到「服務器錯誤」信息,此時無法登錄TWS。我會回來標記這個答案,一旦我連接並能夠嘗試你的代碼。謝謝。 –
您可以使用edemo/demouser登錄進行測試。這就是我所做的。我注意到您使用的是7496,這意味着一個真實賬戶或模擬賬戶。如果您有一個(紙張通常是端口7497),我建議您使用演示或紙質帳戶進行測試。 – brian
與代碼:self.done = True,函數運行中斷。並引發錯誤:線程Thread-1中的異常:第113行,在_recvAllMsg中,buf = self.socket.recv(4096)AttributeError:'NoneType'對象沒有屬性'recv'。我認爲這是由.disconnection引起的。你看到同樣的問題嗎?任何想法如何我可以安全斷開並退出程序?謝謝。 –