It is an updated version of this older post, which used a third party API (swigibpy) which wraps around the C++ API. I've changed the code, but not the poor attempts at humour.
In my last post we looked at getting a single snapshot of historical prices. In this one we will look at streamed prices - 'market tick data'. Rather than wait until the historical price feed ends with streaming prices we need to tell the streamer when to stop.
Note: This post has been updated to use a more robust method for dealing with concurrency.
No stream rises higher than its source
Get the source code from this gist.
You'll also need the pandas library.
You may want to read the documentation.
No stream drives anything without being confinedThe example code begins in a similar fashion to the historical data example; we make one of these weird client objects containing a server wrapper connection, make one of these slightly less weird contract objects (here it is for December 2018 Eurodollar futures), resolve it into a populated contract object (explained more fully here) and then shove that into a request for market data.
from __main function:
app = TestApp("127.0.0.1", 4001, 1) ibcontract = IBcontract() ibcontract.secType = "FUT"
## resolve the contract
resolved_ibcontract=app.resolve_ib_contract(ibcontract) tickerid = app.start_getting_IB_market_data(resolved_ibcontract) time.sleep(30)
Unlike the other functions we've looked at so far there isn't an internal loop here; instead we deliberately hang around whilst some price data comes in.
from TestClient.start_getting_IB_market_data() method:
def start_getting_IB_market_data(self, resolved_ibcontract,
tickerid=DEFAULT_MARKET_DATA_ID): """ Kick off market data streaming :param resolved_ibcontract: a Contract object
:param tickerid: the identifier for the request
:return: tickerid """
self._market_data_q_dict[tickerid] = self.wrapper.init_market_data(tickerid)
self.reqMktData(tickerid, resolved_ibcontract, "", False, False, ) return tickerid
Ah yes its the usual stuff of setting up space within the self.wrapper instance to get the data and then call the tws server request function (strictly speaking its one of those 'EClient' whatdoyoucallits again). However one difference is that we have to store the TestClients pointer to the market data queue (in the dict self._market_data_q), since we'll be returning to it later.
Only dead fish swim with the stream...
We now look inside the server wrapper object which gets populated as an instance into self.wrapper. As before there are a few EWrapper functions which get triggered whenever the market data arrives.
There are in fact several methods for 'tickString', 'tickGeneric', 'tickSize' and 'tickPrice'; it seems a bit stochastic (quant speak: english translation completely bloody random and arbitrary) which of these methods gets called when a tick arrives (a tick could be an update to a price or to a quoted size on the top level of the order book). Lets look at the most generic of these:
def tickGeneric(self, tickerid, tickType, value): ## overriden method this_tick_data=IBtick(self.get_time_stamp(),tickType, value)
self._my_market_data_dict[tickerid].put(this_tick_data)All the code does is identify which type of tick it is and then add it to the Queue that lives in the appropriate part of self._my_market_data. You can look at the classes IBtick and tick to see how this is done. I'm using local time as the timestamp here, but again you can change this if you want.
Dipping our toe into the metaphorical market stream
from __main function:
from TestClient.get_IB_market_data() method:
def get_IB_market_data(self, tickerid):## how long to wait for next item MAX_WAIT_MARKETDATEITEM = 5 market_data_q = self._market_data_q_dict[tickerid] market_data= finished=False while not finished: try: market_data.append(market_data_q.get(timeout=MAX_WAIT_MARKETDATEITEM)) except queue.Empty: ## no more data finished=True return stream_of_ticks(market_data)
We can see what data we have received so far. This also clears the queue of data that has been transmitted out of the app.wrapper storage. You can write this differently if you like of course.
An individual tick looks like this:
>>> print(market_data1) ask_price ask_size bid_price bid_size \ 2017-03-10 11:07:51.564816 NaN NaN NaN NaN canAutoExecute ignorabletick last_trade_price \ 2017-03-10 11:07:51.564816 None None 98.03 last_trade_size pastLimit 2017-03-10 11:07:51.564816 NaN None
ask_price ask_size bid_price bid_size \
2017-03-10 11:07:51.564899 NaN NaN NaN NaN
canAutoExecute ignorabletick last_trade_price \
2017-03-10 11:07:51.564899 None None NaN
2017-03-10 11:07:51.564899 200 None
Notice they're shown as a single row of a pandas Data Frame. This is so we can do this:
The advantage of this approach will be clear later, when I discuss interpretation.
Once in the stream of history you can't get out
If we just let that baby run we'd be receiving streams of prices until the cows came home. So what we do back in the client world is say STOP I've had enough after a preset amount of time (we could also STOP when the N'th tick has arrived, or when there all the slots in the marketdata are tuple are filled, which would be easy enough to code up).
from __main function:time.sleep(30)market_data2 = app.stop_getting_IB_market_data(tickerid)
from TestClient.stop_geting_IB_market_data() method:
def stop_getting_IB_market_data(self, tickerid): """
Stops the stream of market data and returns all the data we've had since we last asked for it :param tickerid: identifier for the request
:return: market data
self.cancelMktData(tickerid) ## Sometimes a lag whilst this happens, this prevents 'orphan' ticks appearing
time.sleep(5) market_data = self.get_IB_market_data(tickerid)
## output any errors while self.wrapper.is_error(): print(self.get_error())return market_data
This will also return any data that we haven't yet captured with a previous call to get_IB_market_data. Again feel free to change this.
Making the results meaningful
To understand the results we can use the power of pandas to resample the dataframe. First of all lets glue together the two seperate buckets of data we've captured:
market_data2_as_df=market_data2.as_pdDataFrame() all_market_data_as_df=pd.concat([market_data1_as_df, market_data2_as_df])
Now to see the bid-ask quoting activity, resolved to a one second resolution:
some_quotes = all_market_data_as_df.resample("1S").last()[["bid_size","bid_price",
"ask_price", "ask_size"]] print(some_quotes.head(10))
bid_size bid_price ask_price ask_size 2017-03-10 11:07:51 9952.0 98.030 98.040 3736.0 2017-03-10 11:07:52 2653.0 NaN NaN 212.0 2017-03-10 11:07:53 17250.0 98.025 98.045 9500.0 2017-03-10 11:07:54 3607.0 98.030 98.040 424.0 2017-03-10 11:07:55 12992.0 NaN NaN 5920.0 2017-03-10 11:07:56 10073.0 NaN NaN 3743.0 2017-03-10 11:07:57 9746.0 NaN NaN 3726.0 2017-03-10 11:07:58 8280.0 NaN NaN 4110.0 2017-03-10 11:07:59 17.0 NaN NaN 1723.0 2017-03-10 11:08:00 2920.0 NaN NaN 3248.0
Or the first few trades, resolved to 10 milliseconds:
some_trades = all_market_data_as_df.resample("10L").last()[["last_trade_price", "last_trade_size"]] print(some_trades.head(10))
last_trade_price last_trade_size 2017-03-10 11:07:51.560 98.03 200.0 2017-03-10 11:07:51.570 NaN NaN 2017-03-10 11:07:51.580 NaN NaN 2017-03-10 11:07:51.590 NaN NaN
Here I'm using the 'last' method. You could also use an average.
By the way it can be a bit dangerous to average prices too much; for example if you sample prices throughout the day and then take an average as your input into your trading algorithm you will underestimate the actual amount of volatility in the market. Similarly if you are trading high frequency stuff you will be using the active state of the order book and averaging average real time bars is probably not going to be a very wise thing to do. Over this short time period relative to my typical trading speed however its probably okay as mostly all we are going to be removing is a little illusory volatility caused by 'bid-ask' bounce.
Also even with this averaging its still worth running your prices through a 'jump detector' to make sure you don't trade off dirty prices showing spuriously large moves; I see these about once a month for each instrument I trade!
Much much more on this subject in this post
Islands in the stream...
That is it for prices. I use the historical data function whenever I start trading a particular contract but also every day as it gets close prices. This makes my system self recovering since even if it drops for a few days I will end up with daily prices at least being infilled. Also often non actively traded contracts still have close prices, useful if you are using intra contract spreads as a data input. Just be careful how you treat intraday and closing prices if you append them together.
Much much more on this subject in this post
I use market data to get intraday prices where a system requires that, and when I am just about to trade to check the market is liquid enough for what I want to do (or even just to check it is open since I don't bother keeping a holidays calendar for all my markets - I wouldn't want to spend more than 10 minutes of my time a day running this system now would I?). Plus it allows me to dis-aggregate my trading costs into what is coming from the inside spread, the cost of processing / execution delays and having to drop deeper into the order book.
Next on the menu will be placing an order! I will leave the trivial task of building a system which decides what the orders will be to the reader (hint: you might want to use the price in some way).
This is the third in a series of posts. The first two posts are:
The next post on placing orders is: