IB API: reqMktData() - how to submit a new snapshot request after app.run() call?

Discussion in 'Automated Trading' started by kmiklas, Apr 28, 2020.

  1. kmiklas

    kmiklas

    How can new ``reqMktData`` requests be queued after ``app.run()`` has been invoked, and the message loop is spinning?

    A little background, snapshot market data requests from the TWS API follow this pattern:

    1. Create contract object
    2. Call ``reqMktData`` to queue request, passing in contract object,
    3. Call ``app.run()`` to start the message loop
    4. Receive market data snapshot in a callback
    5. Message loop spinning... but nothing new coming, because requests were for a snapshot.

    ...is there and accepted idiom for how new requests can be queued to a spinning message loop? How can this be accomplished? My ideas were:

    a. Hack into ``app.run`` method (in ``client.py``), add an expiration timer, and re-invoke ``app.run()`` for each new request. Meh.
    b. Spawn a separate thread for the purpose of queuing new requests
    c. Queue new requests in the callback function (here, ``tickprice``)

    Tyvm, Keith :^)

    Code:
        import ...
       
        class Goose(EWrapper, EClient):
            def __init__(self):
                EClient.__init__(self, self)
       
            def error(self, reqId, errorCode, errorString):
                print("Error: ", reqId, " ", errorCode, " ", errorString)
       
            @iswrapper
            # ! [tickprice]
            def tickPrice(self, reqId: TickerId, tickType: TickType, price: float,
                          attrib: TickAttrib):
                super().tickPrice(reqId, tickType, price, attrib)
                print(f"~~> {reqId}, {tickType}, {price}")
    
        def main():
            app = Goose()
            app.connect("127.0.0.1", 7496, 0)
            print("serverVersion:%s connectionTime:%s" % (app.serverVersion(), app.twsConnectionTime()))
       
            a = Contract()
            a.symbol = "AAPL"
            a.secType = "STK"
            a.exchange = "SMART"
            a.currency = "USD"
            a.primaryExchange = "NASDAQ"
       
            # Queue request. Note "True" setting for MD snapshot (not streaming)
            app.reqMktData(1000, a, "", True, False, [])
       
            # Enter event loop
            app.run()
       
            # Sleep 3 seconds, then make a request for MSFT data.
            # NEVER EXECUTES - Main thread with app.run() event loop spinning.
            time.sleep(3)
            m = Contract()
            m.symbol = "AAPL"
            m.secType = "STK"
            m.exchange = "SMART"
            m.currency = "USD"
            m.primaryExchange = "NASDAQ"
            app.reqMktData(1001, m, "", True, False, [])
       
        if __name__ == "__main__":
            main()
     
  2. Girija

    Girija

    I don't do Python but you queue up with multiple reqmktdata for the items you want and the call back receives data. App.run is in a infinite loop.
    You wd ideally have threads implementing the framework.
    For example you can have a queue to which you queue the securities from the main. Build a contract and invoke reqmktdata in a thread. The callbacks would fire as and when results arrive. If you want to stop quotes you invoke cancelmktdata.
    When you are ready to quit you disconnect. You will need a handler to tell the program when to quit.
     
    kmiklas likes this.
  3. I think that your issue is caused by the 3 seconds wait time you use. The API user manual mentions that in case of a snapshot it automatically times out after 11 seconds. http://interactivebrokers.github.io/tws-api/md_request.html#md_snapshot
    After which you could reuse the ID to request a snapshot for another stock ticker.
    If you don't want to wait 11 seconds you can send a cancellation for the data request. Using app.cancelMktData(1001). After which you can reuse the ID for another stock ticket.
    Another possibility is to use a unique identifier for each stock ticker, submit all requests at the same time and capture the returned data for each ticker separately. Example AAPL ID 1001, MSFT ID 1002, etc.
     
  4. kmiklas

    kmiklas

    Can't be... app.run() is the message loop, which spins infinitely. Once this is called, the main thread is effectively hung; the execution flow never even reaches the sleep statement.

    I am already doing this. Note in the example that AAPL is 1000, and MSFT is 1001 in the reqMktData call.
     
  5. kmiklas

    kmiklas

    Have you done this? I haven't looked at the run() method of the EClient class. I don't know if it reads contracts as part of the message loop, or if they need to be set up in advance, and are read once as a batch.

    EDIT: find below the run() method from EClient. Looks like the message loop does continually fetch messages from the queue, with the self.msg_queue.get statement.

    Code:
        def run(self):
            """This is the function that has the message loop."""
    
            try:
                while not self.done and (self.isConnected()
                            or not self.msg_queue.empty()):
                    try:
                        try:
                            text = self.msg_queue.get(block=True, timeout=0.2)  # <-- HERE
                            if len(text) > MAX_MSG_LEN:
                                self.wrapper.error(NO_VALID_ID, BAD_LENGTH.code(),
                                    "%s:%d:%s" % (BAD_LENGTH.msg(), len(text), text))
                                self.disconnect()
                                break
                        except queue.Empty:
                            logger.debug("queue.get: empty")
                        else:
                            fields = comm.read_fields(text)
                            logger.debug("fields %s", fields)
                            self.decoder.interpret(fields)
                    except (KeyboardInterrupt, SystemExit):
                        logger.info("detected KeyboardInterrupt, SystemExit")
                        self.keyboardInterrupt()
                        self.keyboardInterruptHard()
                    except BadMessage:
                        logger.info("BadMessage")
                        self.conn.disconnect()
    
                    logger.debug("conn:%d queue.sz:%d",
                                 self.isConnected(),
                                 self.msg_queue.qsize())
            finally:
                self.disconnect()
     
    Last edited: Apr 29, 2020
  6. Girija

    Girija

    I don't code in Python.
    Here is what ALI expects you to do.
    Say you have a list of tickets you want market data in a file.
    Connect and validate errors.
    Get acct summary.
    For each one of the ticker in the list build a contract and register with reqmktdata.
    Tickprice / tickstring callbacks will fire as appropriate async to the request.
    Due to the async nature, you will continue to receive market data until you cancel or disconnect. It is a stream.
    Take a look at the samples from git.
     
  7. kmiklas

    kmiklas

    @Girija : It is not a subscription and/or stream request; it is a snapshot request.

    app.reqMktData(100, z, "", True, False, [])

    ^^^ The "true" means that it's a snapshot request. Just one data point.
     
  8. Girija

    Girija

    Sorry didn't see that. For snapshots you will typically wait until ticksnapshotend is received.
     
  9. kmiklas

    kmiklas

    I ended up spawning a separate thread to submit reqMktData() requests to the cycling message loop.

    It polls the queue for new requests as it spins. Works perfectly.
     
  10. Girija

    Girija

    Impressed. Next is to roll your normal trading style into code and backtest.
     
    #10     Apr 29, 2020