Polygon Realtime Stream cron job started failing in the last couple days

Hey @brian,

Things have been running very smooth for the last couple months, but suddenly in the last few days my daily Polygon stream job started failing with the following error. Unfortunately it doesn't provide enough information for me to troubleshoot. Can you help?

Here's my cron job:

quantrocket realtime collect 'usstock-realtime-polygon'

Here's the detailed error logs:

quantrocket_flightlog_1|2021-11-24 03:30:02 quantrocket.realtime: INFO Collecting market data for 12130 securities in usstock-realtime-polygon
quantrocket_houston_1|172.18.0.14 - - [24/Nov/2021:08:30:02 +0000] "POST /flightlog/handler HTTP/1.1" 200 5 "-" "-"
quantrocket_realtime_1|updating market data collections
quantrocket_houston_1|172.18.0.14 - - [24/Nov/2021:08:30:10 +0000] "GET /master/securities.csv?exclude_delisted=True&fields=Symbol&fields=SecType&fields=Timezone HTTP/1.1" 200 509084 "-" "-"
quantrocket_realtime_1|Issuing market data request for A STK (sid FIBBG000C2V3D6)
quantrocket_blotter_1|recycling spooler after 90 tasks
quantrocket_blotter_1|OOOPS the spooler is no more...trying respawn...
quantrocket_blotter_1|spawned the uWSGI spooler on dir /var/tmp/uwsgi/spool with pid 389
quantrocket_houston_1|172.18.0.8 - - [24/Nov/2021:08:30:13 +0000] "GET /master/securities.csv?vendors=ibkr&fields=ibkr_ConId HTTP/1.1" 200 768677 "-" "-"
quantrocket_realtime_1|An error occurred while subscribing to A STK (sid FIBBG000C2V3D6), retrying (error was: 'NoneType' object has no attribute 'send')
quantrocket_flightlog_1|2021-11-24 03:30:14 quantrocket.realtime: ERROR Traceback (most recent call last):
quantrocket_flightlog_1|2021-11-24 03:30:14 quantrocket.realtime: ERROR   File "sym://qrocket_realtime_collect_polygon_streamconn_py", line 25, in wrapped
quantrocket_flightlog_1|2021-11-24 03:30:14 quantrocket.realtime: ERROR   File "sym://qrocket_realtime_collect_polygon_streamconn_py", line 134, in subscribe
quantrocket_flightlog_1|2021-11-24 03:30:14 quantrocket.realtime: ERROR AttributeError: 'NoneType' object has no attribute 'send'
quantrocket_flightlog_1|2021-11-24 03:30:14 quantrocket.realtime: ERROR 
quantrocket_realtime_1|Polygon status message: {'status': 'auth_success', 'message': 'authenticated'}
quantrocket_realtime_1|connected to: wss://socket.polygon.io/stocks
quantrocket_realtime_1|Worker exception detected, shutting down
quantrocket_flightlog_1|2021-11-24 03:30:45 quantrocket.realtime: INFO Exiting Polygon market data collection due to errors

Oddly enough, restarting the realtime Docker service, and re-running this cron job does get things to work.

Hey @brian, just following up on this. The issue is now fairly permanent, and happening every other day or so. I tried scheduling a restart of the docker service, but that didn't seem to have any impact. At this point I'm at a loss; not sure what else to do other than maybe switch from Polygon to Alpaca for realtime data, which I'd hate to do as Polygon has been rock solid for running my half dozen strategies on a 1 minute timeframe.

Any help or advice would be much appreciated. Thanks man.

Seems like a websocket disconnect, which tends to happen with higher load. Are you streaming a large number of securities? Does it happen if you stream a smaller number? Perhaps try splitting the traffic between Alpaca and Polygon?

So, this issue ONLY happens when first starting the stream for the day. Once it gets started, I've never had this issue mid-day.

I do stream the entire us stock market universe, which unfortunately is required for my strategies.

The strange thing is that things worked fine for months, which makes me wonder if the Polygon authentication or initialization is now taking a little longer, thus causing some sort of race condition?

One idea I've had is somehow ramping up the stream over time; is this possible? For example, adding 1,000 securities, at a time, to the stream over the course of a few minutes, in order to ramp up to the 10k or so in the usstocks universe. Thoughts?

Unfortunately this problem has continued, but I was ultimately able to get the following solution to work, and so I'm posting it here in case anyone else has a similar issue.

In short, it appears the Polygon realtime collector experiences some sort of race condition during the initial connection/authentication when trying to subscribe to a large number of securities; say the entire ~12k active usstock universe. I found that once the collector is connected/authenticated with a few securities, I could then subscribe to the remaining 12k securities without any issues. Full solution below.

First, setup the following Python file at /codeload/realtime.py

import logging
import time
import numpy as np
from quantrocket.flightlog import FlightlogHandler
from quantrocket.realtime import get_db_config, collect_market_data, get_active_collections
from quantrocket.master import get_securities

# Setup logger
logger = logging.getLogger('quantrocket_customizations')
logger.setLevel(logging.DEBUG)
handler = FlightlogHandler(background=False)
logger.addHandler(handler)

def start_collection(code:str, timeout:int=60):
    """
    Start realtime data collection in 
    a reliable and robust manner.

    Parameters
    ----------
    code : str, required
        Realtime tick or aggregate database code to collect
    timeout : int, optional
        Realtime collection fails if not started within timeout; 
        default is 60 seconds

    Examples
    -------
    start realtime data collection for usstock polygon database:

    >>> start_collection(code='usstock-realtime-polygon')
    """

    # Grab realtime database config
    config = get_db_config(code)
    if not config:
        msg = (f'No realtime database configuration found for {code}')
        logger.error(msg)
        raise Exception(msg)

    # Grab securities to collect
    universes = config['universes']
    securities = get_securities(universes=universes)
    if securities.empty:
        msg = ('No securities found realtime database '
            f'configured universe(s) {universes}')
        logger.error(msg)
        raise Exception(msg)
    securities = securities.sort_values(by='Symbol')
    logger.info(f'Loaded {len(securities)} securities '
        f'from {universes} to collect for {code}.')

    # Start realtime collection engine with just 
    # a few securities to trigger authentication
    auth_sids = securities[:5].index.to_list()
    logger.info(f'Starting realtime collection authentication '
        f'for {code} with the following auth sids "{auth_sids}".')
    collect_market_data(code, sids=auth_sids)
    
    # Validate collection started before proceeding
    time.sleep(10) # give auth collection a little time to start up
    collection_started = False
    collection_start_timing = 0
    logger.info(f'Validating realtime collection authentication for {code}.')
    while not collection_started:
        
        # Grab active collections
        active_collections = get_active_collections().get(
            config['vendor'], {}).get(code)

        # Are collections active?
        if active_collections and active_collections > 0:
            collection_started = True
            logger.info(f'Realtime collection for {code} '
                f'has started, and is authenticated.')
        
        # Timeout if not started within reasonable time
        elif collection_start_timing > timeout:
            msg = (f'Unable to start/validate realtime data collection '
                f'for {code} within {timeout} seconds.')
            logger.error(msg)
            raise Exception(msg)

        # Check again in 10 seconds
        else:
            logger.info(f'Realtime collection for {code} not yet started/'
                f'authenticated. Will try again in a few seconds.')
            collection_start_timing += 10
            time.sleep(10)

    # Start collection for remaining securities
    logger.info(f'Starting collection for remaining '
        f'{len(securities[5:])} {code} securities.')
    collect_market_data(code)

Then, in your quantrocket.countdown.crontab.sh add the following:

# Restart realtime collector weekly (I only restart once a week to apply any security master updates)
30 20 * * sun quantrocket satellite exec 'codeload.realtime.start_collection' --params 'code:usstock-realtime-polygon'
30 20 * * fri quantrocket realtime cancel --all

Hey @brian, I thought I had this solved, but unfortunately the issue still persists. My solution above, which attempts to subscribe to just 5 sids as a starting point also fails, so it's not something with the number of securities.

I'm at a loss for solving this as I don't have access to the connection code for Polygon. Could you help me sort this out?

Note that after this issue occurs, the realtime collection subscribe or cancel command doesn't work until the docker container is ultimately restarted, and then from there I can often get things to work again.

quantrocket_realtime_1|updating market data collections
quantrocket_realtime_1|Issuing market data request for A STK (sid FIBBG000C2V3D6)
quantrocket_realtime_1|An error occurred while subscribing to A STK (sid FIBBG000C2V3D6), retrying (error was: 'NoneType' object has no attribute 'send')
quantrocket_flightlog_1|2022-01-09 20:30:05 quantrocket.realtime: ERROR Traceback (most recent call last):
quantrocket_flightlog_1|2022-01-09 20:30:05 quantrocket.realtime: ERROR   File "sym://qrocket_realtime_collect_polygon_streamconn_py", line 25, in wrapped
quantrocket_flightlog_1|2022-01-09 20:30:05 quantrocket.realtime: ERROR   File "sym://qrocket_realtime_collect_polygon_streamconn_py", line 134, in subscribe
quantrocket_flightlog_1|2022-01-09 20:30:05 quantrocket.realtime: ERROR AttributeError: 'NoneType' object has no attribute 'send'
quantrocket_flightlog_1|2022-01-09 20:30:05 quantrocket.realtime: ERROR 
quantrocket_realtime_1|Polygon status message: {'status': 'auth_success', 'message': 'authenticated'}
quantrocket_realtime_1|connected to: wss://socket.polygon.io/stocks
quantrocket_realtime_1|Worker exception detected, shutting down
quantrocket_flightlog_1|2022-01-09 20:30:36 quantrocket.realtime: INFO Exiting Polygon market data collection due to errors
quantrocket_realtime_1|recycling mule after collecting market data
quantrocket_realtime_1|reconnecting to Polygon websocket after error: code = 1006 (connection closed abnormally [internal]), no reason
quantrocket_realtime_1|Polygon status message: {'status': 'auth_success', 'message': 'authenticated'}
quantrocket_realtime_1|connected to: wss://socket.polygon.io/stocks

Based on your logs, it looks like the problem occurs when there is a websocket disconnect (for whatever reason) that happens while subscriptions are still being added. We will add some retry logic around that.

Thanks @brian... That actually makes sense now that I think about it, because the authentication issue occurs intermittently, and ONLY when establishing a new connection/collection. But I have noticed reconnections occur regularly, but that never causes me any problems.

If I leave the realtime collector connected to Polygon, it stays connected forever, and I have zero issues until I go to restart it over the weekend to incorporate security master changes. If I didn't have to restart the realtime collection, I doubt I'd ever have this issue.

Anyhow, thanks so much for your help!

A fix for this issue (hopefully) is now available in quantrocket/realtime:latest.

Ah, thank you so much! I will get it implemented today and let you know the results. The issue is pretty consistent so should know within a couple days.