From 30f2368d5879f53959bb81c7bb5de5a46ab79567 Mon Sep 17 00:00:00 2001 From: Shawn Snyder Date: Tue, 17 Jun 2025 08:10:49 -0500 Subject: [PATCH 1/3] WIP on SMS/OptionsAdd --- intriniorealtime/equities_client.py | 43 +++++++++++++++++++++++++++++ intriniorealtime/options_client.py | 2 +- 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/intriniorealtime/equities_client.py b/intriniorealtime/equities_client.py index e49c0be..596d24d 100644 --- a/intriniorealtime/equities_client.py +++ b/intriniorealtime/equities_client.py @@ -360,6 +360,8 @@ def __init__(self, client): self.daemon = True self.client = client self.enabled = True + self.continuation_queue = queue.Queue(100) + self.continuation_lock: threading.Lock = threading.Lock() def run(self): self.client.ws = websocket.WebSocketApp( @@ -368,6 +370,7 @@ def run(self): on_open=self.on_open, on_close=self.on_close, on_message=self.on_message, + on_cont_message=self.on_cont_message, on_error=self.on_error ) @@ -390,6 +393,46 @@ def on_error(self, ws, error, *args): self.client.logger.error(f"Error in on_error handler: {repr(e)}; {repr(error)}") raise e + def stitch(self): + full = None + while not self.continuation_queue.empty(): + partial = self.continuation_queue.get(True, 1) + if full is None: + full = partial + else: + full = full.join(partial) + return full + + def on_cont_message(self, partial_message, is_last): # The 3rd argument is continue flag. if 0, the data continue + try: + if DEBUGGING: # This is here for performance reasons so we don't use slow reflection on every message. + if isinstance(partial_message, str): + self.client.logger.debug(f"Received message (hex): {partial_message.encode('utf-8').hex()}") + else: + if isinstance(partial_message, bytes): + self.client.logger.debug(f"Received message (hex): {partial_message.hex()}") + self.continuation_lock.acquire() + try: + if is_last == 0: + self.continuation_queue.put_nowait(partial_message) + else: + self.continuation_queue.put_nowait(partial_message) + full_message = self.stitch() + self.on_message(self.client.ws, full_message) + finally: + self.continuation_lock.release() + except queue.Full: + self.client.on_queue_full() + except Exception as e: + hex_message = "" + if isinstance(partial_message, str): + hex_message = partial_message.encode('utf-8').hex() + else: + if isinstance(partial_message, bytes): + hex_message = partial_message.hex() + self.client.logger.error(f"Websocket on_message ERROR. Message as hex: {hex_message}; error: {repr(e)}") + raise e + def on_message(self, ws, message): try: if DEBUGGING: # This is here for performance reasons so we don't use slow reflection on every message. diff --git a/intriniorealtime/options_client.py b/intriniorealtime/options_client.py index a9eb3b7..b6daa38 100644 --- a/intriniorealtime/options_client.py +++ b/intriniorealtime/options_client.py @@ -347,7 +347,7 @@ def __on_close(self, ws, closeStatusCode, closeMsg): def __on_error(self, ws, error): _log.error("Websocket - Error - {0}".format(error)) - def __on_data(self, ws, data, code, continueFlag): + def __on_data(self, ws, data, code, continueFlag): #continueFlag - If 0, the data continues if code == websocket.ABNF.OPCODE_BINARY: with _dataMsgLock: global _dataMsgCount From 671e593bece1e30dcb1d2b535293edf4ec575b03 Mon Sep 17 00:00:00 2001 From: Shawn Snyder Date: Tue, 17 Jun 2025 09:05:17 -0500 Subject: [PATCH 2/3] testing --- intriniorealtime/equities_client.py | 4 ++-- intriniorealtime/options_client.py | 29 +++++++++++++++++++++++++++-- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/intriniorealtime/equities_client.py b/intriniorealtime/equities_client.py index 596d24d..553a750 100644 --- a/intriniorealtime/equities_client.py +++ b/intriniorealtime/equities_client.py @@ -414,9 +414,9 @@ def on_cont_message(self, partial_message, is_last): # The 3rd argument is conti self.continuation_lock.acquire() try: if is_last == 0: - self.continuation_queue.put_nowait(partial_message) + self.continuation_queue.put(partial_message) else: - self.continuation_queue.put_nowait(partial_message) + self.continuation_queue.put(partial_message) full_message = self.stitch() self.on_message(self.client.ws, full_message) finally: diff --git a/intriniorealtime/options_client.py b/intriniorealtime/options_client.py index b6daa38..ec42b54 100644 --- a/intriniorealtime/options_client.py +++ b/intriniorealtime/options_client.py @@ -284,6 +284,9 @@ def __init__(self, data_queue: queue.Queue): super().__init__(ws_url, on_open=self.__on_open, on_close=self.__on_close, on_data=self.__on_data, on_error=self.__on_error) self.__wsLock: threading.Lock = ws_lock + self.__continuation_queue = queue.Queue(100) + self.__continuation_lock: threading.Lock = threading.Lock() + self.__currently_continuing: bool = False self.__worker_threads: list[threading.Thread] = worker_threads self.__get_channels: Callable[[None], set[tuple[str, bool]]] = get_channels self.__get_token: Callable[[None], str] = get_token @@ -347,12 +350,34 @@ def __on_close(self, ws, closeStatusCode, closeMsg): def __on_error(self, ws, error): _log.error("Websocket - Error - {0}".format(error)) - def __on_data(self, ws, data, code, continueFlag): #continueFlag - If 0, the data continues + def __stitch(self): + full = None + while not self.__continuation_queue.empty(): + partial = self.__continuation_queue.get(True, 1) + if full is None: + full = partial + else: + full = full.join(partial) + return full + + def __on_data(self, ws, data, code, is_last): #continueFlag - If 0, the data continues if code == websocket.ABNF.OPCODE_BINARY: with _dataMsgLock: global _dataMsgCount _dataMsgCount += 1 - self.__data_queue.put_nowait(data) + + if self.__currently_continuing or is_last == 0: # we're either in the middle of a continue, or we're starting a continue + with self.__continuation_lock: + if self.__currently_continuing or is_last == 0: #check lock check + self.__continuation_queue.put(data) + if is_last != 0: + full_message = self.__stitch() + self.__data_queue.put_nowait(full_message) + self.__currently_continuing = True if is_last == 0 else False # we're in the middle of a continue, but this is the last message, so remove flag + else: # We're not in the middle of a continue, and this isn't marked as multi-part, so this is a full message by itself. + self.__data_queue.put_nowait(data) + else: # We're not in the middle of a continue, and this isn't marked as multi-part, so this is a full message by itself. + self.__data_queue.put_nowait(data) else: _log.debug("Websocket - Message received") with _txtMsgLock: From 19d93b89c7469078efe1ddf50a32b4cd361ca945 Mon Sep 17 00:00:00 2001 From: Shawn Snyder Date: Thu, 19 Jun 2025 15:13:52 -0500 Subject: [PATCH 3/3] refine logging --- example_app_equities.py | 2 +- example_app_options.py | 2 +- intriniorealtime/equities_client.py | 8 +++++--- intriniorealtime/options_client.py | 21 ++++++++++++++------- 4 files changed, 21 insertions(+), 12 deletions(-) diff --git a/example_app_equities.py b/example_app_equities.py index 344e520..8da8d97 100644 --- a/example_app_equities.py +++ b/example_app_equities.py @@ -82,7 +82,7 @@ def on_kill_process(sig, frame): summarize_thread = Summarize(stop_event) summarize_thread.start() -time.sleep(120) +time.sleep(60 * 60 * 8) # sigint, or ctrl+c, during the thread wait will also perform the same below code. print("Stopping") diff --git a/example_app_options.py b/example_app_options.py index 3f8e56d..1effef6 100644 --- a/example_app_options.py +++ b/example_app_options.py @@ -147,6 +147,6 @@ def on_kill_process(sig, frame): # Use this to subscribe, dynamically, a list of specific option contracts or option chains. # intrinioRealtimeOptionsClient.join("GOOG__220408C02870000", "MSFT__220408C00315000", "AAPL__220414C00180000", "TSLA", "GE") -time.sleep(60 * 60) +time.sleep(60 * 60 * 8) # sigint, or ctrl+c, during the thread wait will also perform the same below code. on_kill_process(None, None) diff --git a/intriniorealtime/equities_client.py b/intriniorealtime/equities_client.py index 553a750..c64dd38 100644 --- a/intriniorealtime/equities_client.py +++ b/intriniorealtime/equities_client.py @@ -407,10 +407,12 @@ def on_cont_message(self, partial_message, is_last): # The 3rd argument is conti try: if DEBUGGING: # This is here for performance reasons so we don't use slow reflection on every message. if isinstance(partial_message, str): - self.client.logger.debug(f"Received message (hex): {partial_message.encode('utf-8').hex()}") + self.client.logger.debug(f"Received partial message (str): {partial_message.encode('utf-8').hex()}") else: if isinstance(partial_message, bytes): - self.client.logger.debug(f"Received message (hex): {partial_message.hex()}") + self.client.logger.debug(f"Received partial message (hex): {partial_message.hex()}") + #self.client.logger.debug(f"Received partial message (hex): {partial_message.hex()}") + self.continuation_lock.acquire() try: if is_last == 0: @@ -437,7 +439,7 @@ def on_message(self, ws, message): try: if DEBUGGING: # This is here for performance reasons so we don't use slow reflection on every message. if isinstance(message, str): - self.client.logger.debug(f"Received message (hex): {message.encode('utf-8').hex()}") + self.client.logger.debug(f"Received message (str): {message.encode('utf-8').hex()}") else: if isinstance(message, bytes): self.client.logger.debug(f"Received message (hex): {message.hex()}") diff --git a/intriniorealtime/options_client.py b/intriniorealtime/options_client.py index ec42b54..40d40dc 100644 --- a/intriniorealtime/options_client.py +++ b/intriniorealtime/options_client.py @@ -369,15 +369,16 @@ def __on_data(self, ws, data, code, is_last): #continueFlag - If 0, the data con if self.__currently_continuing or is_last == 0: # we're either in the middle of a continue, or we're starting a continue with self.__continuation_lock: if self.__currently_continuing or is_last == 0: #check lock check + #_log.info(f"Received partial message (hex): {data.hex()}") self.__continuation_queue.put(data) if is_last != 0: full_message = self.__stitch() - self.__data_queue.put_nowait(full_message) + self.__data_queue.put(full_message) self.__currently_continuing = True if is_last == 0 else False # we're in the middle of a continue, but this is the last message, so remove flag else: # We're not in the middle of a continue, and this isn't marked as multi-part, so this is a full message by itself. - self.__data_queue.put_nowait(data) + self.__data_queue.put(data) else: # We're not in the middle of a continue, and this isn't marked as multi-part, so this is a full message by itself. - self.__data_queue.put_nowait(data) + self.__data_queue.put(data) else: _log.debug("Websocket - Message received") with _txtMsgLock: @@ -500,13 +501,18 @@ def _thread_fn(index: int, data: queue.Queue, on_refresh: Callable[[OptionsRefresh], None] = None, on_unusual_activity: Callable[[OptionsUnusualActivity], None] = None): _log.debug("Starting worker thread {0}".format(index)) + datum: bytes = None + count: int = 0 + start_index: int = 1 + msg_type: int = 0 + message: bytes = None while not _stopFlag.is_set(): try: - datum: bytes = data.get(True, 1.0) - count: int = datum[0] - start_index: int = 1 + datum = data.get(True, 1.0) + count = datum[0] + start_index = 1 for _ in range(count): - msg_type: int = datum[start_index + 22] + msg_type = datum[start_index + 22] if msg_type == 1: # Quote message: bytes = datum[start_index:(start_index + _OPTIONS_QUOTE_MESSAGE_SIZE)] # byte structure: @@ -614,6 +620,7 @@ def _thread_fn(index: int, data: queue.Queue, continue except Exception as e: _log.error(f"Worker thread {index} Exception {e}") + _log.error(f"\tCurrent count: {count}\r\n\tCurrent start_index: {start_index}\r\n\tCurrent msg_type: {msg_type}\r\n\tFull message (hex): {datum.hex()}\r\n\tScoped message: {message.hex()}") continue _log.debug("Worker thread {0} stopped".format(index))