Skip to content

事件驱动系统

字数
809 字
阅读时间
4 分钟

职责和范围

事件驱动系统是 Homalos 量化交易系统中所有组件之间实现异步、松耦合交互的基础通信机制。该系统以EventBus提供高性能事件路由的类为核心,具备双通道处理(同步和异步)、基于优先级的事件处理以及全面的线程安全。

本文档涵盖了EventBus事件系统的实现、事件类型和优先级、线程安全机制以及组件在整个系统中的异步通信方式。有关使用此事件系统的更广泛的系统架构的信息,请参阅核心架构。有关使用事件的特定组件的详细信息,请参阅交易引擎

EventBus 架构

EventBus实现了双通道事件处理系统,具有单独的同步和异步队列,每个队列由专用的工作线程进行管理。

维护EventBus两个独立的处理通道:

渠道队列大小线程目的
同步10,000_sync_thread需要立即处理的时间关键事件
异步10,000_async_thread后台处理和日志记录

事件类型和优先级

事件系统使用全面的类型层次结构,并通过EventPriority枚举和EventType类实现基于优先级的处理。

优先级系统将事件路由到适当的处理通道:

优先级通道示例事件
CRITICAL0同步SYSTEM_ERRORRISK_REJECTED
HIGH1同步ORDERTRADE, GATEWAY_CONNECTED
NORMAL2异步MARKET_TICKPOSITION_UPDATED
LOW3异步LOG_MESSAGESTATISTICS

线程安全和性能

系统通过ThreadSafeCallback类和内部的线程队列管理实现了全面的线程安全EventBus

关键线程安全特性:

  • 回调调度ThreadSafeCallback.schedule_async_task()方法schedule_callback()处理跨线程通信
  • 重试逻辑:内置可配置的重试max_retries机制retry_delay
  • 优雅关闭EventType.SHUTDOWN基于超时的线程连接信号
  • 队列保护Queue.put_nowait()通过Full异常处理来防止阻塞

事件流和通信

事件驱动的通信模式通过标准化的事件发布和订阅实现系统组件之间的松耦合交互。

使用模式

事件发布

组件使用EventBus.publish()可选异步处理的方法发布事件:

python
# High priority synchronous event
event_bus.publish(Event(EventType.ORDER, order_data))

# Background asynchronous event  
event_bus.publish(Event(EventType.MARKET_TICK, tick_data), is_async=True)

# Using convenience functions
event = create_trading_event(EventType.RISK_CHECK, risk_data)
event_bus.publish(event)

事件订阅

组件订阅特定的事件类型或使用全局处理程序:

python
# Subscribe to specific event type
event_bus.subscribe(EventType.ORDER, self.handle_order)

# Subscribe to async processing
event_bus.subscribe(EventType.MARKET_TICK, self.handle_tick, is_async=True)

# Global handler for all events
event_bus.subscribe_global(self.handle_all_events)

事件监控

系统通过事件监视器和统计数据提供全面的监控:

python
# Add event monitor
event_bus.add_monitor(self.log_event)

# Get runtime statistics
stats = event_bus.get_stats()
# Returns: total_events_published, sync_events_processed, 
#          async_events_processed, error_count, etc.

事件驱动系统构成了 Homalos 架构的支柱,实现了所有系统组件之间的高性能、线程安全通信,同时保持了明确的关注点分离,并支持实时和后台处理要求。