Friday, 10 March 2017

Streaming market data from native python IB API

This the third in a series of posts on using the native python API for interactive brokers. You should read the first, and the second, before this one.

It is an updated version of this older post, which used a third party API (swigibpy) which wraps around the C++ API. I've changed the code, but not the poor attempts at humour.

In my last post we looked at getting a single snapshot of historical prices. In this one we will look at streamed prices - 'market tick data'. Rather than wait until the historical price feed ends with streaming prices we need to tell the streamer when to stop.

Note: This post has been updated to use a more robust method for dealing with concurrency.


No stream rises higher than its source


Get the source code from this gist.

You'll also need the pandas library.

You may want to read the documentation.


No stream drives anything without being confined

The example code begins in a similar fashion to the historical data example; we make one of these weird client objects containing a server wrapper connection, make one of these slightly less weird contract objects (here it is for December 2018 Eurodollar futures), resolve it into a populated contract object (explained more fully here) and then shove that into a request for market data.


from __main function:

app = TestApp("127.0.0.1", 4001, 1)

ibcontract = IBcontract()
ibcontract.secType = "FUT"
ibcontract.lastTradeDateOrContractMonth="201706"
ibcontract.symbol="GBL"
ibcontract.exchange="DTB"

## resolve the contract
resolved_ibcontract=app.resolve_ib_contract(ibcontract)

tickerid = app.start_getting_IB_market_data(resolved_ibcontract)

time.sleep(30)


Unlike the other functions we've looked at so far there isn't an internal loop here; instead we deliberately hang around whilst some price data comes in.


from TestClient.start_getting_IB_market_data() method:

def start_getting_IB_market_data(self, resolved_ibcontract, 
    tickerid=DEFAULT_MARKET_DATA_ID):
    """
    Kick off market data streaming
    :param resolved_ibcontract: a Contract object    
    :param tickerid: the identifier for the request    

    :return: tickerid    """   
    
    
    self._market_data_q_dict[tickerid] = self.wrapper.init_market_data(tickerid)
    self.reqMktData(tickerid, resolved_ibcontract, "", False, False, [])

    return tickerid

Ah yes its the usual stuff of setting up space within the self.wrapper instance to get the data and then call the tws server request function (strictly speaking its one of those 'EClient' whatdoyoucallits again). However one difference is that we have to store the TestClients pointer to the market data queue (in the dict self._market_data_q), since we'll be returning to it later.



Only dead fish swim with the stream...

 

We now look inside the server wrapper object which gets populated as an instance into self.wrapper. As before there are a few EWrapper functions which get triggered whenever the market data arrives.

There are in fact several methods for 'tickString', 'tickGeneric', 'tickSize' and 'tickPrice'; it seems a bit stochastic (quant speak: english translation completely bloody random and arbitrary) which of these methods gets called when a tick arrives (a tick could be an update to a price or to a quoted size on the top level of the order book). Lets look at the most generic of these:

def tickGeneric(self, tickerid, tickType, value):
    ## overriden method
    this_tick_data=IBtick(self.get_time_stamp(),tickType, value)
    self._my_market_data_dict[tickerid].put(this_tick_data)

All the code does is identify which type of tick it is and then add it to the Queue that lives in the appropriate part of self._my_market_data. You can look at the classes IBtick and tick to see how this is done. I'm using local time as the timestamp here, but again you can change this if you want.


Dipping our toe into the metaphorical market stream


from __main function:
market_data1=app.get_IB_market_data(tickerid)


from TestClient.get_IB_market_data() method:

def get_IB_market_data(self, tickerid):
   ## how long to wait for next item   MAX_WAIT_MARKETDATEITEM = 5   market_data_q = self._market_data_q_dict[tickerid]

   market_data=[]
   finished=False
   while not finished:
      try:
         market_data.append(market_data_q.get(timeout=MAX_WAIT_MARKETDATEITEM))
      except queue.Empty:
         ## no more data         finished=True
   return stream_of_ticks(market_data)

We can see what data we have received so far. This also clears the queue of data that has been transmitted out of the app.wrapper storage. You can write this differently if you like of course.

An individual tick looks like this:

>>> print(market_data1[1])
                            ask_price  ask_size  bid_price  bid_size  \
2017-03-10 11:07:51.564816        NaN       NaN        NaN       NaN   
                           canAutoExecute ignorabletick  last_trade_price  \
2017-03-10 11:07:51.564816           None          None             98.03   
                            last_trade_size pastLimit  
2017-03-10 11:07:51.564816              NaN      None  



In this case the tick was a trade, rather than a quote. The size of the trade arrives in the next tick.


>>> print(market_data1[2])

                            ask_price  ask_size  bid_price  bid_size  \
2017-03-10 11:07:51.564899        NaN       NaN        NaN       NaN   
                           canAutoExecute ignorabletick  last_trade_price  \
2017-03-10 11:07:51.564899           None          None               NaN   
                            last_trade_size pastLimit  
2017-03-10 11:07:51.564899              200      None  


Notice they're shown as a single row of a pandas Data Frame. This is so we can do this:

market_data1_as_df=market_data1.as_pdDataFrame()
print(market_data1_as_df)

The advantage of this approach will be clear later, when I discuss interpretation.


Once in the stream of history you can't get out 

 

If we just let that baby run we'd be receiving streams of prices until the cows came home. So what we do back in the client world is say STOP I've had enough after a preset amount of time (we could also STOP when the N'th tick has arrived, or when there all the slots in the marketdata are tuple are filled, which would be easy enough to code up).

from __main function:
time.sleep(30)
market_data2 = app.stop_getting_IB_market_data(tickerid)


from TestClient.stop_geting_IB_market_data() method:

def stop_getting_IB_market_data(self, tickerid):
    """    
    Stops the stream of market data and returns all the data we've had since we last asked for it
    :param tickerid: identifier for the request    

    :return: market data    
    """

    self.cancelMktData(tickerid)

    ## Sometimes a lag whilst this happens, this prevents 'orphan' ticks appearing 
    time.sleep(5)

    market_data = self.get_IB_market_data(tickerid)
    ## output any errors    while self.wrapper.is_error():
       print(self.get_error())
return market_data


This will also return any data that we haven't yet captured with a previous call to get_IB_market_data. Again feel free to change this.


Making the results meaningful


To understand the results we can use the power of pandas to resample the dataframe. First of all lets glue together the two seperate buckets of data we've captured:

market_data2_as_df=market_data2.as_pdDataFrame()
all_market_data_as_df=pd.concat([market_data1_as_df, market_data2_as_df])

Now to see the bid-ask quoting activity, resolved to a one second resolution:

some_quotes = all_market_data_as_df.resample("1S").last()[["bid_size","bid_price",
                                                    "ask_price",  "ask_size"]]
print(some_quotes.head(10))



                     bid_size  bid_price  ask_price  ask_size
2017-03-10 11:07:51    9952.0     98.030     98.040    3736.0
2017-03-10 11:07:52    2653.0        NaN        NaN     212.0
2017-03-10 11:07:53   17250.0     98.025     98.045    9500.0
2017-03-10 11:07:54    3607.0     98.030     98.040     424.0
2017-03-10 11:07:55   12992.0        NaN        NaN    5920.0
2017-03-10 11:07:56   10073.0        NaN        NaN    3743.0
2017-03-10 11:07:57    9746.0        NaN        NaN    3726.0
2017-03-10 11:07:58    8280.0        NaN        NaN    4110.0
2017-03-10 11:07:59      17.0        NaN        NaN    1723.0
2017-03-10 11:08:00    2920.0        NaN        NaN    3248.0



You could safely forward fill the prices (a nan is shown when there is no updated value).

Or the first few trades, resolved to 10 milliseconds:

some_trades = all_market_data_as_df.resample("10L").last()[["last_trade_price", "last_trade_size"]]
print(some_trades.head(10))

                         last_trade_price  last_trade_size
2017-03-10 11:07:51.560             98.03            200.0
2017-03-10 11:07:51.570               NaN              NaN
2017-03-10 11:07:51.580               NaN              NaN
2017-03-10 11:07:51.590               NaN              NaN

Here I'm using the 'last' method. You could also use an average.

By the way it can be a bit dangerous to average prices too much; for example if you sample prices throughout the day and then take an average as your input into your trading algorithm you will underestimate the actual amount of volatility in the market. Similarly if you are trading high frequency stuff you will be using the active state of the order book and averaging average real time bars is probably not going to be a very wise thing to do. Over this short time period relative to my typical trading speed however its probably okay as mostly all we are going to be removing is a little illusory volatility caused by 'bid-ask' bounce.

Also even with this averaging its still worth running your prices through a 'jump detector' to make sure you don't trade off dirty prices showing spuriously large moves; I see these about once a month for each instrument I trade!

Much much more on this subject in this post


Islands in the stream...


That is it for prices. I use the historical data function whenever I start trading a particular contract but also every day as it gets close prices. This makes my system self recovering since even if it drops for a few days I will end up with daily prices at least being infilled. Also often non actively traded contracts still have close prices, useful if you are using intra contract spreads as a data input. Just be careful how you treat intraday and closing prices if you append them together.

Much much more on this subject in this post

I use market data to get intraday prices where a system requires that, and when I am just about to trade to check the market is liquid enough for what I want to do (or even just to check it is open since I don't bother keeping a holidays calendar for all my markets - I wouldn't want to spend more than 10 minutes of my time a day running this system now would I?). Plus it allows me to dis-aggregate my trading costs into what is coming from the inside spread, the cost of processing / execution delays and having to drop deeper into the order book.

Next on the menu will be placing an order! I will leave the trivial task of building a system which decides what the orders will be to the reader (hint: you might want to use the price in some way).

This is the third in a series of posts. The first two posts are:

http://qoppac.blogspot.co.uk/2017/03/interactive-brokers-native-python-api.html

http://qoppac.blogspot.co.uk/2017/03/historic-data-from-native-ib-pyhon-api.html

The next post on placing orders is:
http://qoppac.blogspot.co.uk/2017/03/placing-orders-in-native-python-ib-api.html


31 comments:

  1. Nice articel! Thanks for helping getting to know the new PythonAPI. But I have a question: Does anyone else get the error message "AttributeError: 'IBtick' object has no attribute 'canAutoExecute'" when trying to execute the code?

    ReplyDelete
    Replies
    1. I haven't had that error but you can remove that line if you're not going to use it.

      Delete
  2. hi, thanks for the article it's very helpful. However I get a error "list index out of range" on line 833 when printing the results. I've no clue why. the connection is fine. thanks

    ReplyDelete
    Replies
    1. Hmm... there appear to be only 433 lines in the gist https://gist.github.com/robcarver17/9c411d2b88eab4232cc66a9c7f6988fc

      Can you paste the full stack trace of the error?

      Delete
  3. Hi thank you so much for your post! I am trying to get the OptionGreeks values (delta, gamma, vega, theta). I think they can be obtained through the default reqMktData() method but I'll have to overwrite tickOptionComputation() function and I tried to replicate the tickPrice() function written in the gist

    def tickOptionComputation(self, tickerid, tickType,
    impliedVol:float, delta:float, optPrice:float, pvDividend:float,
    gamma:float, vega:float, theta:float, undPrice:float):
    ## overriden method

    if tickType == 10:
    this_tick_data = IBtick(self.get_time_stamp(), tickType, delta)
    if tickType == 11:
    this_tick_data = IBtick(self.get_time_stamp(), tickType, gamma)
    if tickType == 12:
    this_tick_data = IBtick(self.get_time_stamp(), tickType, vega)
    if tickType == 13:
    this_tick_data = IBtick(self.get_time_stamp(),tickType, theta)

    self._my_market_data_dict[tickerid].put(this_tick_data)


    Do you think this will work?

    Thank you so much!

    ReplyDelete
    Replies
    1. Probably - I don't really use the option code. But you will also need to add these numbers (10,11,... 13) to the resolve_tickids method in the IBtick class. And also modify the tick class to include these attributes.

      Delete
    2. This works, but the tick types for the greeks don't have separate tickType, e.g. 10 are all BID greeks and 11 are all ASK greeks at once.

      And as Rob stated, you have to modify the method and class. Then it is relative straight forward.

      Delete
  4. when I tried
    ibcontract = IBcontract()
    ibcontract.secType = "CASH"
    #ibcontract.lastTradeDateOrContractMonth="201812"
    ibcontract.symbol="EUR"
    ibcontract.currency = "USD"
    ibcontract.exchange="IDEALPRO"

    The reuest contract details from ib stuck forever and program stops after X seconds. Any ideas plz?

    ReplyDelete
    Replies
    1. Nothing obviously wrong. You could ask here https://groups.io/g/twsapi/topics

      Delete
  5. Thank you very much for your excellent introduction in the IB API with python. I made some experiences and a lot went well. Unfortunately, I can’t figure out how to solve the following problem and I hope that you can help me.
    When running the script and asking for Futures everything is fine. When I ask for Options I got an error message in the API log: “Part of requested market data is not subscribed. Subscription-independent ticks are still active.DAX DAX 30 Index (Deutsche Aktien Xchange 30)/TOP/ALL”. Then, then API connection is closed. Market data farm connection is still ok and I got the requested market data (I have a level 1 subscription for Options). So fine so good.
    When I try to ask for more than one Contract (Option), I got no market data. I guess, the missing API connection is here the problem, because I can’t ask for the resolved ibcontract information. It seems that the rest of the code struggles therefore. How can I avoid this problem? Is it possible to check whether the API is connected and if not reconnect?
    The full error code:
    Getting full contract details from the server...
    Exception in thread Thread-2:
    Traceback (most recent call last):
    File "P:\Python35\lib\threading.py", line 914, in _bootstrap_inner
    self.run()
    File "P:\Python35\lib\threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
    File "P:\Python35\lib\site-packages\ibapi-9.73.2-py3.5.egg\ibapi\client.py", line 235, in run
    self.decoder.interpret(fields)
    File "P:\Python35\lib\site-packages\ibapi-9.73.2-py3.5.egg\ibapi\decoder.py", line 1152, in interpret
    self.interpretWithSignature(fields, handleInfo)
    File "P:\Python35\lib\site-packages\ibapi-9.73.2-py3.5.egg\ibapi\decoder.py", line 1133, in interpretWithSignature
    method(self.wrapper, *args)
    File "P:\11 Code\API\03 Beispiel API ansprechen - Echtzeitkurse abfragen v3.py", line 217, in error
    self._my_errors.put(errormsg)
    AttributeError: 'TestApp' object has no attribute '_my_errors'

    ReplyDelete
    Replies
    1. That's weird. I have to be honest I've never used options with IB so I don't know what would work. There is a method IsConnected you can use to see if something is still connected.

      Delete
  6. Hi Rob, thank you so much for all those great posts, really appreciate it!
    I do have a problem when running your code here. its about the following line of code: resolved_ibcontract=new_contract_details.summary

    The full stack trace is the following:

    Getting full contract details from the server...
    Traceback (most recent call last):
    File "/Users/maximelas/Documents/twsapi_macunix/IBJts/source/pythonclient/ibapi/marketData.py", line 384, in
    resolved_ibcontract = app.resolve_ib_contract(ibcontract)
    File "/Users/maximelas/Documents/twsapi_macunix/IBJts/source/pythonclient/ibapi/marketData.py", line 294, in resolve_ib_contract
    resolved_ibcontract=new_contract_details.summary
    AttributeError: 'ContractDetails' object has no attribute 'summary'

    I checked the contract.py file and couldn't find any 'summary' attribute to the ContractDetails object. (In the previous example with historicaldata you used the 'contract' attribute, why even change here to 'summary'?)
    Any idea what I am missing here?
    Thanks a lot!

    ReplyDelete
    Replies
    1. API has changed. If you reload the gist it should work

      Delete
  7. Thank you for the quick reply Rob!

    I got it to work now. However, I was stuck in a never ending program and finally understood what had to be changed. The problem is:
    'MAX_WAIT_MARKETDATEITEM = 5' in the 'get_IB_market_data' method.
    Five seconds is apparently too long because in 'market_data.append(market_data_q.get(timeout=MAX_WAIT_MARKETDATEITEM))'
    even after reaching the end of the queue the server will always have time to put another item in the market_data_q queue and thus the while loop will never finish and keep appending new items to 'marker.data'.

    Even when assigning it 2 seconds I enter a seemingly never ending while loop. 1 second seems to do the trick for me.

    I am wondering how this has apprently worked fine for everyone else. Let me know if you have any thoughts on this one.

    Thanks!

    ReplyDelete
    Replies
    1. I had exactly the same problem as you. Or rather sometimes it works and not at other times. Sometimes I get errors or "broken pipe" messages and other times not. Using Gateway 9.72.

      Delete
    2. I have just run the above mentioned code and tracked the infinite loop down to the same point. One sec works for me, too. Tested it with MES Dec 19' contract.

      Delete
  8. Hi Rob, or anyone else,

    I wonder if you have experienced my problem too and I need some guidance.

    Essentially, my live market data download only goes on for about 15-20 seconds and then it stops downloading any more data. I get the same behaviour even if I increase the 'sleep time' from the current 30s to something much longer.

    I know the market data has stopped for sure because I have put a print inside both tickPrice() and tickSize() so that as soon as there is a callback by the Server on one of these two methods I will print out the latest price and size. This printing goes on for 15-20 seconds and then prints no more.

    I don't think there's any problem with my market data subscription.

    Have you experienced this problem before?

    Thanks

    ReplyDelete
    Replies
    1. I too only receive 20 seconds of live data until it inexplicably stops. I've increased / removed the 'sleep times' and still only receive 20 seconds at a time.

      I'm guessing its not a market data subscription issue, as I'm pulling market depth and it sounds like above is pulling level 1 data. But not sure, where in the code exactly 20 seconds is being determined.

      Delete
    2. Is this with a demo, test, or live account?

      Delete
    3. This is with a paper trading account (so test account)

      Delete
    4. It's an issue with API Client connection, it get's disconnected after 20 sec because of the Thread(target = self.run), if you comment the threading part out the API will remain connected however you will not be able to receive the market data since the app in not running on the background. I'm still trying to figure it out how to fix this.

      Delete
    5. Another user said that upgrading their API code made it work, if that helps. You have to run the thread or it won't work!!

      Delete
  9. Hi Rob,

    We are trying to import delayed real time market data and are facing the following error error code 504 string Not connected.
    How can we resolve this error. We are getting the entire outputs as Nan.

    Thanks

    ReplyDelete
    Replies
    1. You have a problem with your IB connection. You need to talk to IB, not me.

      Delete
  10. Hello, Thanks Rob for the great blog: it helps a lot in figuring out the IB API!

    Provided that I also seem to have the same 20s connection error, I am writing for another reason: what is the difference between reqMktData and reqTickByTick data? Is there a reason why you chose the former instead of the latter?

    Thanks again for everything you are doing here!

    ReplyDelete
  11. Thanks so much for these interesting posts. I'm getting this error despite updating the contract details, but I don't understand it. Any ideas what's going on?

    "IB error id 43 errorcode 200 string No security definition has been found for the request"

    ReplyDelete
    Replies
    1. Can you post the line(s) of code where you define the security?

      Delete
  12. Hi Rob. What's your opinion about getting price data from multiple sources (IB and other brokers)?

    For a trading system that would ideally trade every N days or weeks, and for which hourly or even half-hourly streaming data would be fetched... from the heights of my inexperience does not sound like one would want to worry about. Right?

    I'm referring to level 1 price data here, mind you.

    Thanks in advance

    ReplyDelete
    Replies
    1. Multiple sources is fine, and a good idea. I don't bother personally because of the extra work and cost. Just think about what you would do if the prices conflicted. With two prices, you could take an average, or have to manually choose (if they were miles apart). With three prices you could let them vote (think about the Apollo missions)

      Delete
    2. This is good advice. Thanks, this is much appreciated.

      Delete

Comments are moderated. So there will be a delay before they are published. Don't bother with spam, it wastes your time and mine.