Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion example_app_equities.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def on_kill_process(sig, frame):
summarize_thread = Summarize(stop_event)
summarize_thread.start()

time.sleep(120)
time.sleep(60 * 60 * 8)

# sigint, or ctrl+c, during the thread wait will also perform the same below code.
print("Stopping")
Expand Down
2 changes: 1 addition & 1 deletion example_app_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,6 @@ def on_kill_process(sig, frame):
# Use this to subscribe, dynamically, a list of specific option contracts or option chains.
# intrinioRealtimeOptionsClient.join("GOOG__220408C02870000", "MSFT__220408C00315000", "AAPL__220414C00180000", "TSLA", "GE")

time.sleep(60 * 60)
time.sleep(60 * 60 * 8)
# sigint, or ctrl+c, during the thread wait will also perform the same below code.
on_kill_process(None, None)
47 changes: 46 additions & 1 deletion intriniorealtime/equities_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,8 @@ def __init__(self, client):
self.daemon = True
self.client = client
self.enabled = True
self.continuation_queue = queue.Queue(100)
self.continuation_lock: threading.Lock = threading.Lock()

def run(self):
self.client.ws = websocket.WebSocketApp(
Expand All @@ -368,6 +370,7 @@ def run(self):
on_open=self.on_open,
on_close=self.on_close,
on_message=self.on_message,
on_cont_message=self.on_cont_message,
on_error=self.on_error
)

Expand All @@ -390,11 +393,53 @@ def on_error(self, ws, error, *args):
self.client.logger.error(f"Error in on_error handler: {repr(e)}; {repr(error)}")
raise e

def stitch(self):
full = None
while not self.continuation_queue.empty():
partial = self.continuation_queue.get(True, 1)
if full is None:
full = partial
else:
full = full.join(partial)
return full

def on_cont_message(self, partial_message, is_last): # The 3rd argument is continue flag. if 0, the data continue
try:
if DEBUGGING: # This is here for performance reasons so we don't use slow reflection on every message.
if isinstance(partial_message, str):
self.client.logger.debug(f"Received partial message (str): {partial_message.encode('utf-8').hex()}")
else:
if isinstance(partial_message, bytes):
self.client.logger.debug(f"Received partial message (hex): {partial_message.hex()}")
#self.client.logger.debug(f"Received partial message (hex): {partial_message.hex()}")

self.continuation_lock.acquire()
try:
if is_last == 0:
self.continuation_queue.put(partial_message)
else:
self.continuation_queue.put(partial_message)
full_message = self.stitch()
self.on_message(self.client.ws, full_message)
finally:
self.continuation_lock.release()
except queue.Full:
self.client.on_queue_full()
except Exception as e:
hex_message = ""
if isinstance(partial_message, str):
hex_message = partial_message.encode('utf-8').hex()
else:
if isinstance(partial_message, bytes):
hex_message = partial_message.hex()
self.client.logger.error(f"Websocket on_message ERROR. Message as hex: {hex_message}; error: {repr(e)}")
raise e

def on_message(self, ws, message):
try:
if DEBUGGING: # This is here for performance reasons so we don't use slow reflection on every message.
if isinstance(message, str):
self.client.logger.debug(f"Received message (hex): {message.encode('utf-8').hex()}")
self.client.logger.debug(f"Received message (str): {message.encode('utf-8').hex()}")
else:
if isinstance(message, bytes):
self.client.logger.debug(f"Received message (hex): {message.hex()}")
Expand Down
44 changes: 38 additions & 6 deletions intriniorealtime/options_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,9 @@ def __init__(self,
data_queue: queue.Queue):
super().__init__(ws_url, on_open=self.__on_open, on_close=self.__on_close, on_data=self.__on_data, on_error=self.__on_error)
self.__wsLock: threading.Lock = ws_lock
self.__continuation_queue = queue.Queue(100)
self.__continuation_lock: threading.Lock = threading.Lock()
self.__currently_continuing: bool = False
self.__worker_threads: list[threading.Thread] = worker_threads
self.__get_channels: Callable[[None], set[tuple[str, bool]]] = get_channels
self.__get_token: Callable[[None], str] = get_token
Expand Down Expand Up @@ -347,12 +350,35 @@ def __on_close(self, ws, closeStatusCode, closeMsg):
def __on_error(self, ws, error):
_log.error("Websocket - Error - {0}".format(error))

def __on_data(self, ws, data, code, continueFlag):
def __stitch(self):
full = None
while not self.__continuation_queue.empty():
partial = self.__continuation_queue.get(True, 1)
if full is None:
full = partial
else:
full = full.join(partial)
return full

def __on_data(self, ws, data, code, is_last): #continueFlag - If 0, the data continues
if code == websocket.ABNF.OPCODE_BINARY:
with _dataMsgLock:
global _dataMsgCount
_dataMsgCount += 1
self.__data_queue.put_nowait(data)

if self.__currently_continuing or is_last == 0: # we're either in the middle of a continue, or we're starting a continue
with self.__continuation_lock:
if self.__currently_continuing or is_last == 0: #check lock check
#_log.info(f"Received partial message (hex): {data.hex()}")
self.__continuation_queue.put(data)
if is_last != 0:
full_message = self.__stitch()
self.__data_queue.put(full_message)
self.__currently_continuing = True if is_last == 0 else False # we're in the middle of a continue, but this is the last message, so remove flag
else: # We're not in the middle of a continue, and this isn't marked as multi-part, so this is a full message by itself.
self.__data_queue.put(data)
else: # We're not in the middle of a continue, and this isn't marked as multi-part, so this is a full message by itself.
self.__data_queue.put(data)
else:
_log.debug("Websocket - Message received")
with _txtMsgLock:
Expand Down Expand Up @@ -475,13 +501,18 @@ def _thread_fn(index: int, data: queue.Queue,
on_refresh: Callable[[OptionsRefresh], None] = None,
on_unusual_activity: Callable[[OptionsUnusualActivity], None] = None):
_log.debug("Starting worker thread {0}".format(index))
datum: bytes = None
count: int = 0
start_index: int = 1
msg_type: int = 0
message: bytes = None
while not _stopFlag.is_set():
try:
datum: bytes = data.get(True, 1.0)
count: int = datum[0]
start_index: int = 1
datum = data.get(True, 1.0)
count = datum[0]
start_index = 1
for _ in range(count):
msg_type: int = datum[start_index + 22]
msg_type = datum[start_index + 22]
if msg_type == 1: # Quote
message: bytes = datum[start_index:(start_index + _OPTIONS_QUOTE_MESSAGE_SIZE)]
# byte structure:
Expand Down Expand Up @@ -589,6 +620,7 @@ def _thread_fn(index: int, data: queue.Queue,
continue
except Exception as e:
_log.error(f"Worker thread {index} Exception {e}")
_log.error(f"\tCurrent count: {count}\r\n\tCurrent start_index: {start_index}\r\n\tCurrent msg_type: {msg_type}\r\n\tFull message (hex): {datum.hex()}\r\n\tScoped message: {message.hex()}")
continue
_log.debug("Worker thread {0} stopped".format(index))

Expand Down