wiki/examples/py/ws_test_load.md
import asyncio
import os
import sys
import psutil
import time
from collections import defaultdict
from datetime import datetime
import ccxt.pro as ccxt
class PerformanceMetrics:
def __init__(self):
self.message_count = defaultdict(int)
self.error_count = defaultdict(int)
self.start_time = time.time()
self.process = psutil.Process()
self.last_print_time = time.time()
self.print_interval = 5 # Print metrics every 5 seconds
def increment_message_count(self, symbol):
self.message_count[symbol] += 1
def increment_error_count(self, symbol):
self.error_count[symbol] += 1
def get_metrics(self):
current_time = time.time()
elapsed = current_time - self.start_time
# Calculate messages per second
total_messages = sum(self.message_count.values())
total_errors = sum(self.error_count.values())
messages_per_second = total_messages / elapsed if elapsed > 0 else 0
# Get system metrics
memory_info = self.process.memory_info()
cpu_percent = self.process.cpu_percent()
return {
'elapsed_time': elapsed,
'total_messages': total_messages,
'total_errors': total_errors,
'messages_per_second': messages_per_second,
'memory_usage_mb': memory_info.rss / (1024 * 1024), # Convert to MB
'cpu_percent': cpu_percent,
'symbols_subscribed': len(self.message_count)
}
def should_print_metrics(self):
current_time = time.time()
if current_time - self.last_print_time >= self.print_interval:
self.last_print_time = current_time
return True
return False
async def watch_orderbook(binance, symbol, metrics):
while True:
try:
await binance.watch_order_book(symbol)
metrics.increment_message_count(symbol)
if metrics.should_print_metrics():
current_metrics = metrics.get_metrics()
print(f"\nPerformance Metrics at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}:")
print(f"Symbols subscribed: {current_metrics['symbols_subscribed']}")
print(f"Messages per second: {current_metrics['messages_per_second']:.2f}")
print(f"Total messages: {current_metrics['total_messages']}")
print(f"Total errors: {current_metrics['total_errors']}")
print(f"Memory usage: {current_metrics['memory_usage_mb']:.2f} MB")
print(f"CPU usage: {current_metrics['cpu_percent']:.1f}%")
print(f"Elapsed time: {current_metrics['elapsed_time']:.1f} seconds")
print("-" * 50)
except Exception as e:
print(f"Error in {symbol}: {str(e)}")
metrics.increment_error_count(symbol)
await asyncio.sleep(1) # Wait before retrying
async def main():
binance = ccxt.binance({})
await binance.load_markets()
symbols = binance.symbols
metrics = PerformanceMetrics()
print(f"Starting to monitor {len(symbols)} symbols...")
tasks = []
for symbol in symbols[:500]:
await asyncio.sleep(0.1)
task = asyncio.create_task(watch_orderbook(binance, symbol, metrics))
tasks.append(task)
await asyncio.gather(*tasks)
if __name__ == '__main__':
asyncio.run(main())
# Results Using asyncio 3.12^
# Performance Metrics at 2025-05-18 20:24:04:
# Symbols subscribed: 100
# Messages per second: 358.01
# Total messages: 174240
# Memory usage: 73.94 MB
# CPU usage: 6.6%
# Elapsed time: 486.7 seconds
# Results using current version using asyncio 3.10
# Performance Metrics at 2025-05-18 20:41:31:
# Symbols subscribed: 100
# Messages per second: 297.41
# Total messages: 119353
# Memory usage: 79.78 MB
# CPU usage: 7.5%
# Elapsed time: 401.3 seconds