Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 57 additions & 12 deletions examples/websocket/ticker.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,68 @@
import time
import logging
import json
from aster.lib.utils import config_logging
from aster.websocket.client.stream import WebsocketClient as Client

# Configure the logging system globally.
config_logging(logging, logging.DEBUG)
logger = logging.getLogger(__name__)

def message_handler(message):
print(message)
def message_handler(message: str) -> None:
"""
Handles incoming WebSocket messages, parsing them safely.

Args:
message (str): The raw message received from the WebSocket.
"""
try:
# Assuming the message is a JSON string.
data = json.loads(message)
# Process the received data (e.g., save to DB, print specific fields).
logger.info(f"Received data: {data.get('s', 'Unknown')} price: {data.get('p')}")
except json.JSONDecodeError as e:
logger.error(f"Failed to decode message as JSON: {message[:100]}... Error: {e}")
except Exception as e:
logger.error(f"An unexpected error occurred in message handler: {e}")

my_client = Client()
my_client.start()

my_client.ticker(
id=13,
callback=message_handler,
symbol="btcusdt",
)
def run_client(duration: int = 5) -> None:
"""
Initializes and runs the WebSocket client for a specified duration.

Args:
duration (int): The number of seconds the client should run before stopping.
"""
client: Client | None = None
try:
logger.debug("Initializing WebSocket client.")
client = Client()
client.start()

time.sleep(2)
# Subscribe to the ticker stream for BTC/USDT.
logger.info("Subscribing to btcusdt ticker stream.")
client.ticker(
id=13,
callback=message_handler,
symbol="btcusdt",
)

logging.debug("closing ws connection")
my_client.stop()
# Non-blocking run time simulation: Wait for 'duration' seconds.
# This replaces the simple time.sleep(2) and makes the runtime explicit.
logger.info(f"Client running for {duration} seconds. Check logs for messages.")
time.sleep(duration)

except Exception as e:
# Catch connection errors or other exceptions during startup/runtime.
logger.error(f"Critical error during client operation: {e}")

finally:
if client:
logger.debug("Stopping WebSocket connection and cleaning up resources.")
client.stop()
# If the client library provides a way to wait for the thread to exit,
# (e.g., client.join()), it should be called here for robustness.

if __name__ == "__main__":
# Run the client for 5 seconds to demonstrate the flow.
run_client(duration=5)