事件驱动系统
字数
809 字
阅读时间
4 分钟
职责和范围
事件驱动系统是 Homalos 量化交易系统中所有组件之间实现异步、松耦合交互的基础通信机制。该系统以EventBus
提供高性能事件路由的类为核心,具备双通道处理(同步和异步)、基于优先级的事件处理以及全面的线程安全。
本文档涵盖了EventBus
事件系统的实现、事件类型和优先级、线程安全机制以及组件在整个系统中的异步通信方式。有关使用此事件系统的更广泛的系统架构的信息,请参阅核心架构。有关使用事件的特定组件的详细信息,请参阅交易引擎。
EventBus 架构
该EventBus
实现了双通道事件处理系统,具有单独的同步和异步队列,每个队列由专用的工作线程进行管理。
维护EventBus
两个独立的处理通道:
渠道 | 队列大小 | 线程 | 目的 |
---|---|---|---|
同步 | 10,000 | _sync_thread | 需要立即处理的时间关键事件 |
异步 | 10,000 | _async_thread | 后台处理和日志记录 |
事件类型和优先级
事件系统使用全面的类型层次结构,并通过EventPriority
枚举和EventType
类实现基于优先级的处理。
优先级系统将事件路由到适当的处理通道:
优先级 | 值 | 通道 | 示例事件 |
---|---|---|---|
CRITICAL | 0 | 同步 | SYSTEM_ERROR ,RISK_REJECTED |
HIGH | 1 | 同步 | ORDER ,TRADE , GATEWAY_CONNECTED |
NORMAL | 2 | 异步 | MARKET_TICK ,POSITION_UPDATED |
LOW | 3 | 异步 | LOG_MESSAGE ,STATISTICS |
线程安全和性能
系统通过ThreadSafeCallback
类和内部的线程队列管理实现了全面的线程安全EventBus
。
关键线程安全特性:
- 回调调度:
ThreadSafeCallback.schedule_async_task()
方法schedule_callback()
处理跨线程通信 - 重试逻辑:内置可配置的重试
max_retries
机制retry_delay
- 优雅关闭:
EventType.SHUTDOWN
基于超时的线程连接信号 - 队列保护:
Queue.put_nowait()
通过Full
异常处理来防止阻塞
事件流和通信
事件驱动的通信模式通过标准化的事件发布和订阅实现系统组件之间的松耦合交互。
使用模式
事件发布
组件使用EventBus.publish()
可选异步处理的方法发布事件:
python
# High priority synchronous event
event_bus.publish(Event(EventType.ORDER, order_data))
# Background asynchronous event
event_bus.publish(Event(EventType.MARKET_TICK, tick_data), is_async=True)
# Using convenience functions
event = create_trading_event(EventType.RISK_CHECK, risk_data)
event_bus.publish(event)
事件订阅
组件订阅特定的事件类型或使用全局处理程序:
python
# Subscribe to specific event type
event_bus.subscribe(EventType.ORDER, self.handle_order)
# Subscribe to async processing
event_bus.subscribe(EventType.MARKET_TICK, self.handle_tick, is_async=True)
# Global handler for all events
event_bus.subscribe_global(self.handle_all_events)
事件监控
系统通过事件监视器和统计数据提供全面的监控:
python
# Add event monitor
event_bus.add_monitor(self.log_event)
# Get runtime statistics
stats = event_bus.get_stats()
# Returns: total_events_published, sync_events_processed,
# async_events_processed, error_count, etc.
事件驱动系统构成了 Homalos 架构的支柱,实现了所有系统组件之间的高性能、线程安全通信,同时保持了明确的关注点分离,并支持实时和后台处理要求。