Friday, 10 March 2017

Streaming market data from native python IB API

This the third in a series of posts on using the native python API for interactive brokers. You should read the first, and the second, before this one.

It is an updated version of this older post, which used a third party API (swigibpy) which wraps around the C++ API. I've changed the code, but not the poor attempts at humour.

In my last post we looked at getting a single snapshot of historical prices. In this one we will look at streamed prices - 'market tick data'. Rather than wait until the historical price feed ends with streaming prices we need to tell the streamer when to stop.

Note: This post has been updated to use a more robust method for dealing with concurrency.


No stream rises higher than its source


Get the source code from this gist.

You'll also need the pandas library.

You may want to read the documentation.


No stream drives anything without being confined

The example code begins in a similar fashion to the historical data example; we make one of these weird client objects containing a server wrapper connection, make one of these slightly less weird contract objects (here it is for December 2018 Eurodollar futures), resolve it into a populated contract object (explained more fully here) and then shove that into a request for market data.


from __main function:

app = TestApp("127.0.0.1", 4001, 1)

ibcontract = IBcontract()
ibcontract.secType = "FUT"
ibcontract.lastTradeDateOrContractMonth="201706"
ibcontract.symbol="GBL"
ibcontract.exchange="DTB"

## resolve the contract
resolved_ibcontract=app.resolve_ib_contract(ibcontract)

tickerid = app.start_getting_IB_market_data(resolved_ibcontract)

time.sleep(30)


Unlike the other functions we've looked at so far there isn't an internal loop here; instead we deliberately hang around whilst some price data comes in.


from TestClient.start_getting_IB_market_data() method:

def start_getting_IB_market_data(self, resolved_ibcontract, 
    tickerid=DEFAULT_MARKET_DATA_ID):
    """
    Kick off market data streaming
    :param resolved_ibcontract: a Contract object    
    :param tickerid: the identifier for the request    

    :return: tickerid    """   
    
    
    self._market_data_q_dict[tickerid] = self.wrapper.init_market_data(tickerid)
    self.reqMktData(tickerid, resolved_ibcontract, "", False, False, [])

    return tickerid

Ah yes its the usual stuff of setting up space within the self.wrapper instance to get the data and then call the tws server request function (strictly speaking its one of those 'EClient' whatdoyoucallits again). However one difference is that we have to store the TestClients pointer to the market data queue (in the dict self._market_data_q), since we'll be returning to it later.



Only dead fish swim with the stream...

 

We now look inside the server wrapper object which gets populated as an instance into self.wrapper. As before there are a few EWrapper functions which get triggered whenever the market data arrives.

There are in fact several methods for 'tickString', 'tickGeneric', 'tickSize' and 'tickPrice'; it seems a bit stochastic (quant speak: english translation completely bloody random and arbitrary) which of these methods gets called when a tick arrives (a tick could be an update to a price or to a quoted size on the top level of the order book). Lets look at the most generic of these:

def tickGeneric(self, tickerid, tickType, value):
    ## overriden method
    this_tick_data=IBtick(self.get_time_stamp(),tickType, value)
    self._my_market_data_dict[tickerid].put(this_tick_data)

All the code does is identify which type of tick it is and then add it to the Queue that lives in the appropriate part of self._my_market_data. You can look at the classes IBtick and tick to see how this is done. I'm using local time as the timestamp here, but again you can change this if you want.


Dipping our toe into the metaphorical market stream


from __main function:
market_data1=app.get_IB_market_data(tickerid)


from TestClient.get_IB_market_data() method:

def get_IB_market_data(self, tickerid):
   ## how long to wait for next item   MAX_WAIT_MARKETDATEITEM = 5   market_data_q = self._market_data_q_dict[tickerid]

   market_data=[]
   finished=False
   while not finished:
      try:
         market_data.append(market_data_q.get(timeout=MAX_WAIT_MARKETDATEITEM))
      except queue.Empty:
         ## no more data         finished=True
   return stream_of_ticks(market_data)

We can see what data we have received so far. This also clears the queue of data that has been transmitted out of the app.wrapper storage. You can write this differently if you like of course.

An individual tick looks like this:

>>> print(market_data1[1])
                            ask_price  ask_size  bid_price  bid_size  \
2017-03-10 11:07:51.564816        NaN       NaN        NaN       NaN   
                           canAutoExecute ignorabletick  last_trade_price  \
2017-03-10 11:07:51.564816           None          None             98.03   
                            last_trade_size pastLimit  
2017-03-10 11:07:51.564816              NaN      None  



In this case the tick was a trade, rather than a quote. The size of the trade arrives in the next tick.


>>> print(market_data1[2])

                            ask_price  ask_size  bid_price  bid_size  \
2017-03-10 11:07:51.564899        NaN       NaN        NaN       NaN   
                           canAutoExecute ignorabletick  last_trade_price  \
2017-03-10 11:07:51.564899           None          None               NaN   
                            last_trade_size pastLimit  
2017-03-10 11:07:51.564899              200      None  


Notice they're shown as a single row of a pandas Data Frame. This is so we can do this:

market_data1_as_df=market_data1.as_pdDataFrame()
print(market_data1_as_df)

The advantage of this approach will be clear later, when I discuss interpretation.


Once in the stream of history you can't get out 

 

If we just let that baby run we'd be receiving streams of prices until the cows came home. So what we do back in the client world is say STOP I've had enough after a preset amount of time (we could also STOP when the N'th tick has arrived, or when there all the slots in the marketdata are tuple are filled, which would be easy enough to code up).

from __main function:
time.sleep(30)
market_data2 = app.stop_getting_IB_market_data(tickerid)


from TestClient.stop_geting_IB_market_data() method:

def stop_getting_IB_market_data(self, tickerid):
    """    
    Stops the stream of market data and returns all the data we've had since we last asked for it
    :param tickerid: identifier for the request    

    :return: market data    
    """

    self.cancelMktData(tickerid)

    ## Sometimes a lag whilst this happens, this prevents 'orphan' ticks appearing 
    time.sleep(5)

    market_data = self.get_IB_market_data(tickerid)
    ## output any errors    while self.wrapper.is_error():
       print(self.get_error())
return market_data


This will also return any data that we haven't yet captured with a previous call to get_IB_market_data. Again feel free to change this.


Making the results meaningful


To understand the results we can use the power of pandas to resample the dataframe. First of all lets glue together the two seperate buckets of data we've captured:

market_data2_as_df=market_data2.as_pdDataFrame()
all_market_data_as_df=pd.concat([market_data1_as_df, market_data2_as_df])

Now to see the bid-ask quoting activity, resolved to a one second resolution:

some_quotes = all_market_data_as_df.resample("1S").last()[["bid_size","bid_price",
                                                    "ask_price",  "ask_size"]]
print(some_quotes.head(10))



                     bid_size  bid_price  ask_price  ask_size
2017-03-10 11:07:51    9952.0     98.030     98.040    3736.0
2017-03-10 11:07:52    2653.0        NaN        NaN     212.0
2017-03-10 11:07:53   17250.0     98.025     98.045    9500.0
2017-03-10 11:07:54    3607.0     98.030     98.040     424.0
2017-03-10 11:07:55   12992.0        NaN        NaN    5920.0
2017-03-10 11:07:56   10073.0        NaN        NaN    3743.0
2017-03-10 11:07:57    9746.0        NaN        NaN    3726.0
2017-03-10 11:07:58    8280.0        NaN        NaN    4110.0
2017-03-10 11:07:59      17.0        NaN        NaN    1723.0
2017-03-10 11:08:00    2920.0        NaN        NaN    3248.0



You could safely forward fill the prices (a nan is shown when there is no updated value).

Or the first few trades, resolved to 10 milliseconds:

some_trades = all_market_data_as_df.resample("10L").last()[["last_trade_price", "last_trade_size"]]
print(some_trades.head(10))

                         last_trade_price  last_trade_size
2017-03-10 11:07:51.560             98.03            200.0
2017-03-10 11:07:51.570               NaN              NaN
2017-03-10 11:07:51.580               NaN              NaN
2017-03-10 11:07:51.590               NaN              NaN

Here I'm using the 'last' method. You could also use an average.

By the way it can be a bit dangerous to average prices too much; for example if you sample prices throughout the day and then take an average as your input into your trading algorithm you will underestimate the actual amount of volatility in the market. Similarly if you are trading high frequency stuff you will be using the active state of the order book and averaging average real time bars is probably not going to be a very wise thing to do. Over this short time period relative to my typical trading speed however its probably okay as mostly all we are going to be removing is a little illusory volatility caused by 'bid-ask' bounce.

Also even with this averaging its still worth running your prices through a 'jump detector' to make sure you don't trade off dirty prices showing spuriously large moves; I see these about once a month for each instrument I trade!

Much much more on this subject in this post


Islands in the stream...


That is it for prices. I use the historical data function whenever I start trading a particular contract but also every day as it gets close prices. This makes my system self recovering since even if it drops for a few days I will end up with daily prices at least being infilled. Also often non actively traded contracts still have close prices, useful if you are using intra contract spreads as a data input. Just be careful how you treat intraday and closing prices if you append them together.

Much much more on this subject in this post

I use market data to get intraday prices where a system requires that, and when I am just about to trade to check the market is liquid enough for what I want to do (or even just to check it is open since I don't bother keeping a holidays calendar for all my markets - I wouldn't want to spend more than 10 minutes of my time a day running this system now would I?). Plus it allows me to dis-aggregate my trading costs into what is coming from the inside spread, the cost of processing / execution delays and having to drop deeper into the order book.

Next on the menu will be placing an order! I will leave the trivial task of building a system which decides what the orders will be to the reader (hint: you might want to use the price in some way).

This is the third in a series of posts. The first two posts are:

http://qoppac.blogspot.co.uk/2017/03/interactive-brokers-native-python-api.html

http://qoppac.blogspot.co.uk/2017/03/historic-data-from-native-ib-pyhon-api.html

The next post on placing orders is:
http://qoppac.blogspot.co.uk/2017/03/placing-orders-in-native-python-ib-api.html


10 comments:

  1. Nice articel! Thanks for helping getting to know the new PythonAPI. But I have a question: Does anyone else get the error message "AttributeError: 'IBtick' object has no attribute 'canAutoExecute'" when trying to execute the code?

    ReplyDelete
    Replies
    1. I haven't had that error but you can remove that line if you're not going to use it.

      Delete
  2. hi, thanks for the article it's very helpful. However I get a error "list index out of range" on line 833 when printing the results. I've no clue why. the connection is fine. thanks

    ReplyDelete
    Replies
    1. Hmm... there appear to be only 433 lines in the gist https://gist.github.com/robcarver17/9c411d2b88eab4232cc66a9c7f6988fc

      Can you paste the full stack trace of the error?

      Delete
  3. Hi thank you so much for your post! I am trying to get the OptionGreeks values (delta, gamma, vega, theta). I think they can be obtained through the default reqMktData() method but I'll have to overwrite tickOptionComputation() function and I tried to replicate the tickPrice() function written in the gist

    def tickOptionComputation(self, tickerid, tickType,
    impliedVol:float, delta:float, optPrice:float, pvDividend:float,
    gamma:float, vega:float, theta:float, undPrice:float):
    ## overriden method

    if tickType == 10:
    this_tick_data = IBtick(self.get_time_stamp(), tickType, delta)
    if tickType == 11:
    this_tick_data = IBtick(self.get_time_stamp(), tickType, gamma)
    if tickType == 12:
    this_tick_data = IBtick(self.get_time_stamp(), tickType, vega)
    if tickType == 13:
    this_tick_data = IBtick(self.get_time_stamp(),tickType, theta)

    self._my_market_data_dict[tickerid].put(this_tick_data)


    Do you think this will work?

    Thank you so much!

    ReplyDelete
    Replies
    1. Probably - I don't really use the option code. But you will also need to add these numbers (10,11,... 13) to the resolve_tickids method in the IBtick class. And also modify the tick class to include these attributes.

      Delete
  4. when I tried
    ibcontract = IBcontract()
    ibcontract.secType = "CASH"
    #ibcontract.lastTradeDateOrContractMonth="201812"
    ibcontract.symbol="EUR"
    ibcontract.currency = "USD"
    ibcontract.exchange="IDEALPRO"

    The reuest contract details from ib stuck forever and program stops after X seconds. Any ideas plz?

    ReplyDelete
    Replies
    1. Nothing obviously wrong. You could ask here https://groups.io/g/twsapi/topics

      Delete
  5. Thank you very much for your excellent introduction in the IB API with python. I made some experiences and a lot went well. Unfortunately, I can’t figure out how to solve the following problem and I hope that you can help me.
    When running the script and asking for Futures everything is fine. When I ask for Options I got an error message in the API log: “Part of requested market data is not subscribed. Subscription-independent ticks are still active.DAX DAX 30 Index (Deutsche Aktien Xchange 30)/TOP/ALL”. Then, then API connection is closed. Market data farm connection is still ok and I got the requested market data (I have a level 1 subscription for Options). So fine so good.
    When I try to ask for more than one Contract (Option), I got no market data. I guess, the missing API connection is here the problem, because I can’t ask for the resolved ibcontract information. It seems that the rest of the code struggles therefore. How can I avoid this problem? Is it possible to check whether the API is connected and if not reconnect?
    The full error code:
    Getting full contract details from the server...
    Exception in thread Thread-2:
    Traceback (most recent call last):
    File "P:\Python35\lib\threading.py", line 914, in _bootstrap_inner
    self.run()
    File "P:\Python35\lib\threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
    File "P:\Python35\lib\site-packages\ibapi-9.73.2-py3.5.egg\ibapi\client.py", line 235, in run
    self.decoder.interpret(fields)
    File "P:\Python35\lib\site-packages\ibapi-9.73.2-py3.5.egg\ibapi\decoder.py", line 1152, in interpret
    self.interpretWithSignature(fields, handleInfo)
    File "P:\Python35\lib\site-packages\ibapi-9.73.2-py3.5.egg\ibapi\decoder.py", line 1133, in interpretWithSignature
    method(self.wrapper, *args)
    File "P:\11 Code\API\03 Beispiel API ansprechen - Echtzeitkurse abfragen v3.py", line 217, in error
    self._my_errors.put(errormsg)
    AttributeError: 'TestApp' object has no attribute '_my_errors'

    ReplyDelete
    Replies
    1. That's weird. I have to be honest I've never used options with IB so I don't know what would work. There is a method IsConnected you can use to see if something is still connected.

      Delete