diff --git a/crates/fidc-core/src/engine.rs b/crates/fidc-core/src/engine.rs index e9a9869..c543661 100644 --- a/crates/fidc-core/src/engine.rs +++ b/crates/fidc-core/src/engine.rs @@ -5,6 +5,7 @@ use thiserror::Error; use crate::broker::{BrokerExecutionReport, BrokerSimulator}; use crate::cost::CostModel; use crate::data::{BenchmarkSnapshot, DataSet, DataSetError, PriceField}; +use crate::event_bus::ProcessEventBus; use crate::events::{ AccountEvent, FillEvent, OrderEvent, OrderSide, OrderStatus, PositionEvent, ProcessEvent, ProcessEventKind, @@ -95,6 +96,7 @@ pub struct BacktestEngine { broker: BrokerSimulator, config: BacktestConfig, dividend_reinvestment: bool, + process_event_bus: ProcessEventBus, } impl BacktestEngine { @@ -110,6 +112,7 @@ impl BacktestEngine { broker, config, dividend_reinvestment: false, + process_event_bus: ProcessEventBus::new(), } } @@ -117,6 +120,24 @@ impl BacktestEngine { self.dividend_reinvestment = enabled; self } + + pub fn process_event_bus_mut(&mut self) -> &mut ProcessEventBus { + &mut self.process_event_bus + } + + pub fn add_process_listener(&mut self, kind: ProcessEventKind, listener: F) + where + F: FnMut(&ProcessEvent) + 'static, + { + self.process_event_bus.add_listener(kind, listener); + } + + pub fn add_any_process_listener(&mut self, listener: F) + where + F: FnMut(&ProcessEvent) + 'static, + { + self.process_event_bus.add_any_listener(listener); + } } impl BacktestEngine @@ -222,6 +243,7 @@ where let mut process_events = Vec::new(); publish_phase_event( &mut self.strategy, + &mut self.process_event_bus, &daily_context, &mut process_events, execution_date, @@ -231,6 +253,7 @@ where self.strategy.before_trading(&daily_context)?; publish_phase_event( &mut self.strategy, + &mut self.process_event_bus, &daily_context, &mut process_events, execution_date, @@ -239,6 +262,7 @@ where )?; publish_phase_event( &mut self.strategy, + &mut self.process_event_bus, &daily_context, &mut process_events, execution_date, @@ -247,6 +271,7 @@ where )?; publish_phase_event( &mut self.strategy, + &mut self.process_event_bus, &daily_context, &mut process_events, execution_date, @@ -261,10 +286,12 @@ where &schedule_rules, &daily_context, &mut process_events, + &mut self.process_event_bus, )?; auction_decision.merge_from(self.strategy.open_auction(&daily_context)?); publish_phase_event( &mut self.strategy, + &mut self.process_event_bus, &daily_context, &mut process_events, execution_date, @@ -286,12 +313,14 @@ where }; publish_process_events( &mut self.strategy, + &mut self.process_event_bus, &post_auction_context, &mut process_events, &mut report.process_events, )?; publish_phase_event( &mut self.strategy, + &mut self.process_event_bus, &post_auction_context, &mut process_events, execution_date, @@ -301,6 +330,7 @@ where publish_phase_event( &mut self.strategy, + &mut self.process_event_bus, &post_auction_context, &mut process_events, execution_date, @@ -333,6 +363,7 @@ where portfolio: &portfolio, }, &mut process_events, + &mut self.process_event_bus, )?); let on_day_context = StrategyContext { execution_date, @@ -343,6 +374,7 @@ where }; publish_phase_event( &mut self.strategy, + &mut self.process_event_bus, &on_day_context, &mut process_events, execution_date, @@ -362,6 +394,7 @@ where }; publish_process_events( &mut self.strategy, + &mut self.process_event_bus, &post_intraday_context, &mut process_events, &mut intraday_report.process_events, @@ -375,6 +408,7 @@ where report.diagnostics.extend(intraday_report.diagnostics); publish_phase_event( &mut self.strategy, + &mut self.process_event_bus, &post_intraday_context, &mut process_events, execution_date, @@ -393,6 +427,7 @@ where }; publish_phase_event( &mut self.strategy, + &mut self.process_event_bus, &post_trade_context, &mut process_events, execution_date, @@ -402,6 +437,7 @@ where self.strategy.after_trading(&post_trade_context)?; publish_phase_event( &mut self.strategy, + &mut self.process_event_bus, &post_trade_context, &mut process_events, execution_date, @@ -411,6 +447,7 @@ where let mut close_report = self.broker.after_trading(execution_date); publish_process_events( &mut self.strategy, + &mut self.process_event_bus, &post_trade_context, &mut process_events, &mut close_report.process_events, @@ -422,6 +459,7 @@ where report.diagnostics.extend(close_report.diagnostics); publish_phase_event( &mut self.strategy, + &mut self.process_event_bus, &post_trade_context, &mut process_events, execution_date, @@ -430,6 +468,7 @@ where )?; publish_phase_event( &mut self.strategy, + &mut self.process_event_bus, &post_trade_context, &mut process_events, execution_date, @@ -439,6 +478,7 @@ where self.strategy.on_settlement(&post_trade_context)?; publish_phase_event( &mut self.strategy, + &mut self.process_event_bus, &post_trade_context, &mut process_events, execution_date, @@ -447,6 +487,7 @@ where )?; publish_phase_event( &mut self.strategy, + &mut self.process_event_bus, &post_trade_context, &mut process_events, execution_date, @@ -945,11 +986,13 @@ fn collect_scheduled_decisions( rules: &[ScheduleRule], ctx: &StrategyContext<'_>, process_events: &mut Vec, + process_event_bus: &mut ProcessEventBus, ) -> Result { let mut combined = crate::strategy::StrategyDecision::default(); for rule in scheduler.triggered_rules(execution_date, stage, rules) { publish_phase_event( strategy, + process_event_bus, ctx, process_events, execution_date, @@ -959,6 +1002,7 @@ fn collect_scheduled_decisions( combined.merge_from(strategy.on_scheduled(ctx, rule)?); publish_phase_event( strategy, + process_event_bus, ctx, process_events, execution_date, @@ -971,6 +1015,7 @@ fn collect_scheduled_decisions( fn publish_phase_event( strategy: &mut S, + process_event_bus: &mut ProcessEventBus, ctx: &StrategyContext<'_>, events: &mut Vec, date: NaiveDate, @@ -985,6 +1030,7 @@ fn publish_phase_event( side: None, detail: detail.into(), }; + process_event_bus.publish(&event); strategy.on_process_event(ctx, &event)?; events.push(event); Ok(()) @@ -992,11 +1038,13 @@ fn publish_phase_event( fn publish_process_events( strategy: &mut S, + process_event_bus: &mut ProcessEventBus, ctx: &StrategyContext<'_>, target: &mut Vec, incoming: &mut Vec, ) -> Result<(), BacktestError> { for event in incoming.drain(..) { + process_event_bus.publish(&event); strategy.on_process_event(ctx, &event)?; target.push(event); } diff --git a/crates/fidc-core/src/event_bus.rs b/crates/fidc-core/src/event_bus.rs new file mode 100644 index 0000000..789a34b --- /dev/null +++ b/crates/fidc-core/src/event_bus.rs @@ -0,0 +1,55 @@ +use std::collections::BTreeMap; + +use crate::events::{ProcessEvent, ProcessEventKind}; + +type ProcessEventListener = Box; + +#[derive(Default)] +pub struct ProcessEventBus { + listeners: BTreeMap>, + any_listeners: Vec, +} + +impl ProcessEventBus { + pub fn new() -> Self { + Self::default() + } + + pub fn add_listener(&mut self, kind: ProcessEventKind, listener: F) + where + F: FnMut(&ProcessEvent) + 'static, + { + self.listeners + .entry(kind) + .or_default() + .push(Box::new(listener)); + } + + pub fn prepend_listener(&mut self, kind: ProcessEventKind, listener: F) + where + F: FnMut(&ProcessEvent) + 'static, + { + self.listeners + .entry(kind) + .or_default() + .insert(0, Box::new(listener)); + } + + pub fn add_any_listener(&mut self, listener: F) + where + F: FnMut(&ProcessEvent) + 'static, + { + self.any_listeners.push(Box::new(listener)); + } + + pub fn publish(&mut self, event: &ProcessEvent) { + if let Some(listeners) = self.listeners.get_mut(&event.kind) { + for listener in listeners { + listener(event); + } + } + for listener in &mut self.any_listeners { + listener(event); + } + } +} diff --git a/crates/fidc-core/src/events.rs b/crates/fidc-core/src/events.rs index f2dc98f..36e861d 100644 --- a/crates/fidc-core/src/events.rs +++ b/crates/fidc-core/src/events.rs @@ -91,7 +91,7 @@ pub struct AccountEvent { pub note: String, } -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] pub enum ProcessEventKind { PreBeforeTrading, BeforeTrading, diff --git a/crates/fidc-core/src/lib.rs b/crates/fidc-core/src/lib.rs index 0c8b672..8487743 100644 --- a/crates/fidc-core/src/lib.rs +++ b/crates/fidc-core/src/lib.rs @@ -3,6 +3,7 @@ pub mod calendar; pub mod cost; pub mod data; pub mod engine; +pub mod event_bus; pub mod events; pub mod instrument; pub mod metrics; @@ -26,6 +27,7 @@ pub use engine::{ BacktestConfig, BacktestDayProgress, BacktestEngine, BacktestError, BacktestResult, DailyEquityPoint, }; +pub use event_bus::ProcessEventBus; pub use events::{ AccountEvent, FillEvent, OrderEvent, OrderSide, OrderStatus, PositionEvent, ProcessEvent, ProcessEventKind, diff --git a/crates/fidc-core/tests/engine_hooks.rs b/crates/fidc-core/tests/engine_hooks.rs index 1dfe6fd..e1826ff 100644 --- a/crates/fidc-core/tests/engine_hooks.rs +++ b/crates/fidc-core/tests/engine_hooks.rs @@ -897,3 +897,239 @@ fn engine_runs_scheduled_rules_for_daily_weekly_and_monthly_triggers() { .any(|item| { item == "PostScheduled:scheduled:first_trading_day_on_day:on_day:post" }) ); } + +#[test] +fn engine_dispatches_process_events_to_external_bus_listeners() { + let date1 = d(2025, 1, 30); + let date2 = d(2025, 1, 31); + let date3 = d(2025, 2, 3); + let data = DataSet::from_components( + vec![Instrument { + symbol: "000001.SZ".to_string(), + name: "Anchor".to_string(), + board: "SZ".to_string(), + round_lot: 100, + listed_at: Some(d(2020, 1, 1)), + delisted_at: None, + status: "active".to_string(), + }], + vec![ + DailyMarketSnapshot { + date: date1, + symbol: "000001.SZ".to_string(), + timestamp: Some("2025-01-30 09:25:00".to_string()), + day_open: 10.0, + open: 10.0, + high: 10.1, + low: 9.9, + close: 10.0, + last_price: 10.0, + bid1: 10.0, + ask1: 10.0, + prev_close: 9.9, + volume: 100_000, + tick_volume: 100_000, + bid1_volume: 100_000, + ask1_volume: 100_000, + trading_phase: Some("open_auction".to_string()), + paused: false, + upper_limit: 11.0, + lower_limit: 9.0, + price_tick: 0.01, + }, + DailyMarketSnapshot { + date: date2, + symbol: "000001.SZ".to_string(), + timestamp: Some("2025-01-31 09:25:00".to_string()), + day_open: 10.1, + open: 10.1, + high: 10.2, + low: 10.0, + close: 10.1, + last_price: 10.1, + bid1: 10.1, + ask1: 10.1, + prev_close: 10.0, + volume: 110_000, + tick_volume: 110_000, + bid1_volume: 110_000, + ask1_volume: 110_000, + trading_phase: Some("open_auction".to_string()), + paused: false, + upper_limit: 11.1, + lower_limit: 9.1, + price_tick: 0.01, + }, + DailyMarketSnapshot { + date: date3, + symbol: "000001.SZ".to_string(), + timestamp: Some("2025-02-03 09:25:00".to_string()), + day_open: 10.2, + open: 10.2, + high: 10.3, + low: 10.1, + close: 10.2, + last_price: 10.2, + bid1: 10.2, + ask1: 10.2, + prev_close: 10.1, + volume: 120_000, + tick_volume: 120_000, + bid1_volume: 120_000, + ask1_volume: 120_000, + trading_phase: Some("open_auction".to_string()), + paused: false, + upper_limit: 11.2, + lower_limit: 9.2, + price_tick: 0.01, + }, + ], + vec![ + DailyFactorSnapshot { + date: date1, + symbol: "000001.SZ".to_string(), + market_cap_bn: 20.0, + free_float_cap_bn: 18.0, + pe_ttm: 10.0, + turnover_ratio: Some(1.0), + effective_turnover_ratio: Some(1.0), + extra_factors: BTreeMap::new(), + }, + DailyFactorSnapshot { + date: date2, + symbol: "000001.SZ".to_string(), + market_cap_bn: 21.0, + free_float_cap_bn: 19.0, + pe_ttm: 10.0, + turnover_ratio: Some(1.0), + effective_turnover_ratio: Some(1.0), + extra_factors: BTreeMap::new(), + }, + DailyFactorSnapshot { + date: date3, + symbol: "000001.SZ".to_string(), + market_cap_bn: 22.0, + free_float_cap_bn: 20.0, + pe_ttm: 10.0, + turnover_ratio: Some(1.0), + effective_turnover_ratio: Some(1.0), + extra_factors: BTreeMap::new(), + }, + ], + vec![ + CandidateEligibility { + date: date1, + symbol: "000001.SZ".to_string(), + is_st: false, + is_new_listing: false, + is_paused: false, + allow_buy: true, + allow_sell: true, + is_kcb: false, + is_one_yuan: false, + }, + CandidateEligibility { + date: date2, + symbol: "000001.SZ".to_string(), + is_st: false, + is_new_listing: false, + is_paused: false, + allow_buy: true, + allow_sell: true, + is_kcb: false, + is_one_yuan: false, + }, + CandidateEligibility { + date: date3, + symbol: "000001.SZ".to_string(), + is_st: false, + is_new_listing: false, + is_paused: false, + allow_buy: true, + allow_sell: true, + is_kcb: false, + is_one_yuan: false, + }, + ], + vec![ + BenchmarkSnapshot { + date: date1, + benchmark: "000300.SH".to_string(), + open: 100.0, + close: 100.0, + prev_close: 99.0, + volume: 1_000_000, + }, + BenchmarkSnapshot { + date: date2, + benchmark: "000300.SH".to_string(), + open: 101.0, + close: 101.0, + prev_close: 100.0, + volume: 1_100_000, + }, + BenchmarkSnapshot { + date: date3, + benchmark: "000300.SH".to_string(), + open: 102.0, + close: 102.0, + prev_close: 101.0, + volume: 1_200_000, + }, + ], + ) + .expect("dataset"); + + let log = Rc::new(RefCell::new(Vec::new())); + let process_log = Rc::new(RefCell::new(Vec::new())); + let strategy = ScheduledProbeStrategy { log, process_log }; + let broker = BrokerSimulator::new_with_execution_price( + ChinaAShareCostModel::default(), + ChinaEquityRuleHooks::default(), + PriceField::DayOpen, + ); + let mut engine = BacktestEngine::new( + data, + strategy, + broker, + BacktestConfig { + initial_cash: 100_000.0, + benchmark_code: "000300.SH".to_string(), + start_date: Some(date1), + end_date: Some(date3), + decision_lag_trading_days: 0, + execution_price_field: PriceField::DayOpen, + }, + ); + let external_log = Rc::new(RefCell::new(Vec::new())); + engine.add_process_listener(ProcessEventKind::PreScheduled, { + let external_log = external_log.clone(); + move |event| { + external_log + .borrow_mut() + .push(format!("{:?}:{}", event.kind, event.detail)); + } + }); + engine.add_process_listener(ProcessEventKind::PostScheduled, { + let external_log = external_log.clone(); + move |event| { + external_log + .borrow_mut() + .push(format!("{:?}:{}", event.kind, event.detail)); + } + }); + + engine.run().expect("backtest run"); + + let external_log = external_log.borrow(); + assert!( + external_log + .iter() + .any(|item| { item == "PreScheduled:scheduled:daily_auction:open_auction:pre" }) + ); + assert!( + external_log + .iter() + .any(|item| { item == "PostScheduled:scheduled:first_trading_day_on_day:on_day:post" }) + ); +}