Add process event bus to backtest engine

This commit is contained in:
boris
2026-04-23 04:02:22 -07:00
parent 6b5112a363
commit 2e418f93d3
5 changed files with 342 additions and 1 deletions

View File

@@ -5,6 +5,7 @@ use thiserror::Error;
use crate::broker::{BrokerExecutionReport, BrokerSimulator}; use crate::broker::{BrokerExecutionReport, BrokerSimulator};
use crate::cost::CostModel; use crate::cost::CostModel;
use crate::data::{BenchmarkSnapshot, DataSet, DataSetError, PriceField}; use crate::data::{BenchmarkSnapshot, DataSet, DataSetError, PriceField};
use crate::event_bus::ProcessEventBus;
use crate::events::{ use crate::events::{
AccountEvent, FillEvent, OrderEvent, OrderSide, OrderStatus, PositionEvent, ProcessEvent, AccountEvent, FillEvent, OrderEvent, OrderSide, OrderStatus, PositionEvent, ProcessEvent,
ProcessEventKind, ProcessEventKind,
@@ -95,6 +96,7 @@ pub struct BacktestEngine<S, C, R> {
broker: BrokerSimulator<C, R>, broker: BrokerSimulator<C, R>,
config: BacktestConfig, config: BacktestConfig,
dividend_reinvestment: bool, dividend_reinvestment: bool,
process_event_bus: ProcessEventBus,
} }
impl<S, C, R> BacktestEngine<S, C, R> { impl<S, C, R> BacktestEngine<S, C, R> {
@@ -110,6 +112,7 @@ impl<S, C, R> BacktestEngine<S, C, R> {
broker, broker,
config, config,
dividend_reinvestment: false, dividend_reinvestment: false,
process_event_bus: ProcessEventBus::new(),
} }
} }
@@ -117,6 +120,24 @@ impl<S, C, R> BacktestEngine<S, C, R> {
self.dividend_reinvestment = enabled; self.dividend_reinvestment = enabled;
self self
} }
pub fn process_event_bus_mut(&mut self) -> &mut ProcessEventBus {
&mut self.process_event_bus
}
pub fn add_process_listener<F>(&mut self, kind: ProcessEventKind, listener: F)
where
F: FnMut(&ProcessEvent) + 'static,
{
self.process_event_bus.add_listener(kind, listener);
}
pub fn add_any_process_listener<F>(&mut self, listener: F)
where
F: FnMut(&ProcessEvent) + 'static,
{
self.process_event_bus.add_any_listener(listener);
}
} }
impl<S, C, R> BacktestEngine<S, C, R> impl<S, C, R> BacktestEngine<S, C, R>
@@ -222,6 +243,7 @@ where
let mut process_events = Vec::new(); let mut process_events = Vec::new();
publish_phase_event( publish_phase_event(
&mut self.strategy, &mut self.strategy,
&mut self.process_event_bus,
&daily_context, &daily_context,
&mut process_events, &mut process_events,
execution_date, execution_date,
@@ -231,6 +253,7 @@ where
self.strategy.before_trading(&daily_context)?; self.strategy.before_trading(&daily_context)?;
publish_phase_event( publish_phase_event(
&mut self.strategy, &mut self.strategy,
&mut self.process_event_bus,
&daily_context, &daily_context,
&mut process_events, &mut process_events,
execution_date, execution_date,
@@ -239,6 +262,7 @@ where
)?; )?;
publish_phase_event( publish_phase_event(
&mut self.strategy, &mut self.strategy,
&mut self.process_event_bus,
&daily_context, &daily_context,
&mut process_events, &mut process_events,
execution_date, execution_date,
@@ -247,6 +271,7 @@ where
)?; )?;
publish_phase_event( publish_phase_event(
&mut self.strategy, &mut self.strategy,
&mut self.process_event_bus,
&daily_context, &daily_context,
&mut process_events, &mut process_events,
execution_date, execution_date,
@@ -261,10 +286,12 @@ where
&schedule_rules, &schedule_rules,
&daily_context, &daily_context,
&mut process_events, &mut process_events,
&mut self.process_event_bus,
)?; )?;
auction_decision.merge_from(self.strategy.open_auction(&daily_context)?); auction_decision.merge_from(self.strategy.open_auction(&daily_context)?);
publish_phase_event( publish_phase_event(
&mut self.strategy, &mut self.strategy,
&mut self.process_event_bus,
&daily_context, &daily_context,
&mut process_events, &mut process_events,
execution_date, execution_date,
@@ -286,12 +313,14 @@ where
}; };
publish_process_events( publish_process_events(
&mut self.strategy, &mut self.strategy,
&mut self.process_event_bus,
&post_auction_context, &post_auction_context,
&mut process_events, &mut process_events,
&mut report.process_events, &mut report.process_events,
)?; )?;
publish_phase_event( publish_phase_event(
&mut self.strategy, &mut self.strategy,
&mut self.process_event_bus,
&post_auction_context, &post_auction_context,
&mut process_events, &mut process_events,
execution_date, execution_date,
@@ -301,6 +330,7 @@ where
publish_phase_event( publish_phase_event(
&mut self.strategy, &mut self.strategy,
&mut self.process_event_bus,
&post_auction_context, &post_auction_context,
&mut process_events, &mut process_events,
execution_date, execution_date,
@@ -333,6 +363,7 @@ where
portfolio: &portfolio, portfolio: &portfolio,
}, },
&mut process_events, &mut process_events,
&mut self.process_event_bus,
)?); )?);
let on_day_context = StrategyContext { let on_day_context = StrategyContext {
execution_date, execution_date,
@@ -343,6 +374,7 @@ where
}; };
publish_phase_event( publish_phase_event(
&mut self.strategy, &mut self.strategy,
&mut self.process_event_bus,
&on_day_context, &on_day_context,
&mut process_events, &mut process_events,
execution_date, execution_date,
@@ -362,6 +394,7 @@ where
}; };
publish_process_events( publish_process_events(
&mut self.strategy, &mut self.strategy,
&mut self.process_event_bus,
&post_intraday_context, &post_intraday_context,
&mut process_events, &mut process_events,
&mut intraday_report.process_events, &mut intraday_report.process_events,
@@ -375,6 +408,7 @@ where
report.diagnostics.extend(intraday_report.diagnostics); report.diagnostics.extend(intraday_report.diagnostics);
publish_phase_event( publish_phase_event(
&mut self.strategy, &mut self.strategy,
&mut self.process_event_bus,
&post_intraday_context, &post_intraday_context,
&mut process_events, &mut process_events,
execution_date, execution_date,
@@ -393,6 +427,7 @@ where
}; };
publish_phase_event( publish_phase_event(
&mut self.strategy, &mut self.strategy,
&mut self.process_event_bus,
&post_trade_context, &post_trade_context,
&mut process_events, &mut process_events,
execution_date, execution_date,
@@ -402,6 +437,7 @@ where
self.strategy.after_trading(&post_trade_context)?; self.strategy.after_trading(&post_trade_context)?;
publish_phase_event( publish_phase_event(
&mut self.strategy, &mut self.strategy,
&mut self.process_event_bus,
&post_trade_context, &post_trade_context,
&mut process_events, &mut process_events,
execution_date, execution_date,
@@ -411,6 +447,7 @@ where
let mut close_report = self.broker.after_trading(execution_date); let mut close_report = self.broker.after_trading(execution_date);
publish_process_events( publish_process_events(
&mut self.strategy, &mut self.strategy,
&mut self.process_event_bus,
&post_trade_context, &post_trade_context,
&mut process_events, &mut process_events,
&mut close_report.process_events, &mut close_report.process_events,
@@ -422,6 +459,7 @@ where
report.diagnostics.extend(close_report.diagnostics); report.diagnostics.extend(close_report.diagnostics);
publish_phase_event( publish_phase_event(
&mut self.strategy, &mut self.strategy,
&mut self.process_event_bus,
&post_trade_context, &post_trade_context,
&mut process_events, &mut process_events,
execution_date, execution_date,
@@ -430,6 +468,7 @@ where
)?; )?;
publish_phase_event( publish_phase_event(
&mut self.strategy, &mut self.strategy,
&mut self.process_event_bus,
&post_trade_context, &post_trade_context,
&mut process_events, &mut process_events,
execution_date, execution_date,
@@ -439,6 +478,7 @@ where
self.strategy.on_settlement(&post_trade_context)?; self.strategy.on_settlement(&post_trade_context)?;
publish_phase_event( publish_phase_event(
&mut self.strategy, &mut self.strategy,
&mut self.process_event_bus,
&post_trade_context, &post_trade_context,
&mut process_events, &mut process_events,
execution_date, execution_date,
@@ -447,6 +487,7 @@ where
)?; )?;
publish_phase_event( publish_phase_event(
&mut self.strategy, &mut self.strategy,
&mut self.process_event_bus,
&post_trade_context, &post_trade_context,
&mut process_events, &mut process_events,
execution_date, execution_date,
@@ -945,11 +986,13 @@ fn collect_scheduled_decisions<S: Strategy>(
rules: &[ScheduleRule], rules: &[ScheduleRule],
ctx: &StrategyContext<'_>, ctx: &StrategyContext<'_>,
process_events: &mut Vec<ProcessEvent>, process_events: &mut Vec<ProcessEvent>,
process_event_bus: &mut ProcessEventBus,
) -> Result<crate::strategy::StrategyDecision, BacktestError> { ) -> Result<crate::strategy::StrategyDecision, BacktestError> {
let mut combined = crate::strategy::StrategyDecision::default(); let mut combined = crate::strategy::StrategyDecision::default();
for rule in scheduler.triggered_rules(execution_date, stage, rules) { for rule in scheduler.triggered_rules(execution_date, stage, rules) {
publish_phase_event( publish_phase_event(
strategy, strategy,
process_event_bus,
ctx, ctx,
process_events, process_events,
execution_date, execution_date,
@@ -959,6 +1002,7 @@ fn collect_scheduled_decisions<S: Strategy>(
combined.merge_from(strategy.on_scheduled(ctx, rule)?); combined.merge_from(strategy.on_scheduled(ctx, rule)?);
publish_phase_event( publish_phase_event(
strategy, strategy,
process_event_bus,
ctx, ctx,
process_events, process_events,
execution_date, execution_date,
@@ -971,6 +1015,7 @@ fn collect_scheduled_decisions<S: Strategy>(
fn publish_phase_event<S: Strategy>( fn publish_phase_event<S: Strategy>(
strategy: &mut S, strategy: &mut S,
process_event_bus: &mut ProcessEventBus,
ctx: &StrategyContext<'_>, ctx: &StrategyContext<'_>,
events: &mut Vec<ProcessEvent>, events: &mut Vec<ProcessEvent>,
date: NaiveDate, date: NaiveDate,
@@ -985,6 +1030,7 @@ fn publish_phase_event<S: Strategy>(
side: None, side: None,
detail: detail.into(), detail: detail.into(),
}; };
process_event_bus.publish(&event);
strategy.on_process_event(ctx, &event)?; strategy.on_process_event(ctx, &event)?;
events.push(event); events.push(event);
Ok(()) Ok(())
@@ -992,11 +1038,13 @@ fn publish_phase_event<S: Strategy>(
fn publish_process_events<S: Strategy>( fn publish_process_events<S: Strategy>(
strategy: &mut S, strategy: &mut S,
process_event_bus: &mut ProcessEventBus,
ctx: &StrategyContext<'_>, ctx: &StrategyContext<'_>,
target: &mut Vec<ProcessEvent>, target: &mut Vec<ProcessEvent>,
incoming: &mut Vec<ProcessEvent>, incoming: &mut Vec<ProcessEvent>,
) -> Result<(), BacktestError> { ) -> Result<(), BacktestError> {
for event in incoming.drain(..) { for event in incoming.drain(..) {
process_event_bus.publish(&event);
strategy.on_process_event(ctx, &event)?; strategy.on_process_event(ctx, &event)?;
target.push(event); target.push(event);
} }

View File

@@ -0,0 +1,55 @@
use std::collections::BTreeMap;
use crate::events::{ProcessEvent, ProcessEventKind};
type ProcessEventListener = Box<dyn FnMut(&ProcessEvent)>;
#[derive(Default)]
pub struct ProcessEventBus {
listeners: BTreeMap<ProcessEventKind, Vec<ProcessEventListener>>,
any_listeners: Vec<ProcessEventListener>,
}
impl ProcessEventBus {
pub fn new() -> Self {
Self::default()
}
pub fn add_listener<F>(&mut self, kind: ProcessEventKind, listener: F)
where
F: FnMut(&ProcessEvent) + 'static,
{
self.listeners
.entry(kind)
.or_default()
.push(Box::new(listener));
}
pub fn prepend_listener<F>(&mut self, kind: ProcessEventKind, listener: F)
where
F: FnMut(&ProcessEvent) + 'static,
{
self.listeners
.entry(kind)
.or_default()
.insert(0, Box::new(listener));
}
pub fn add_any_listener<F>(&mut self, listener: F)
where
F: FnMut(&ProcessEvent) + 'static,
{
self.any_listeners.push(Box::new(listener));
}
pub fn publish(&mut self, event: &ProcessEvent) {
if let Some(listeners) = self.listeners.get_mut(&event.kind) {
for listener in listeners {
listener(event);
}
}
for listener in &mut self.any_listeners {
listener(event);
}
}
}

View File

@@ -91,7 +91,7 @@ pub struct AccountEvent {
pub note: String, pub note: String,
} }
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub enum ProcessEventKind { pub enum ProcessEventKind {
PreBeforeTrading, PreBeforeTrading,
BeforeTrading, BeforeTrading,

View File

@@ -3,6 +3,7 @@ pub mod calendar;
pub mod cost; pub mod cost;
pub mod data; pub mod data;
pub mod engine; pub mod engine;
pub mod event_bus;
pub mod events; pub mod events;
pub mod instrument; pub mod instrument;
pub mod metrics; pub mod metrics;
@@ -26,6 +27,7 @@ pub use engine::{
BacktestConfig, BacktestDayProgress, BacktestEngine, BacktestError, BacktestResult, BacktestConfig, BacktestDayProgress, BacktestEngine, BacktestError, BacktestResult,
DailyEquityPoint, DailyEquityPoint,
}; };
pub use event_bus::ProcessEventBus;
pub use events::{ pub use events::{
AccountEvent, FillEvent, OrderEvent, OrderSide, OrderStatus, PositionEvent, ProcessEvent, AccountEvent, FillEvent, OrderEvent, OrderSide, OrderStatus, PositionEvent, ProcessEvent,
ProcessEventKind, ProcessEventKind,

View File

@@ -897,3 +897,239 @@ fn engine_runs_scheduled_rules_for_daily_weekly_and_monthly_triggers() {
.any(|item| { item == "PostScheduled:scheduled:first_trading_day_on_day:on_day:post" }) .any(|item| { item == "PostScheduled:scheduled:first_trading_day_on_day:on_day:post" })
); );
} }
#[test]
fn engine_dispatches_process_events_to_external_bus_listeners() {
let date1 = d(2025, 1, 30);
let date2 = d(2025, 1, 31);
let date3 = d(2025, 2, 3);
let data = DataSet::from_components(
vec![Instrument {
symbol: "000001.SZ".to_string(),
name: "Anchor".to_string(),
board: "SZ".to_string(),
round_lot: 100,
listed_at: Some(d(2020, 1, 1)),
delisted_at: None,
status: "active".to_string(),
}],
vec![
DailyMarketSnapshot {
date: date1,
symbol: "000001.SZ".to_string(),
timestamp: Some("2025-01-30 09:25:00".to_string()),
day_open: 10.0,
open: 10.0,
high: 10.1,
low: 9.9,
close: 10.0,
last_price: 10.0,
bid1: 10.0,
ask1: 10.0,
prev_close: 9.9,
volume: 100_000,
tick_volume: 100_000,
bid1_volume: 100_000,
ask1_volume: 100_000,
trading_phase: Some("open_auction".to_string()),
paused: false,
upper_limit: 11.0,
lower_limit: 9.0,
price_tick: 0.01,
},
DailyMarketSnapshot {
date: date2,
symbol: "000001.SZ".to_string(),
timestamp: Some("2025-01-31 09:25:00".to_string()),
day_open: 10.1,
open: 10.1,
high: 10.2,
low: 10.0,
close: 10.1,
last_price: 10.1,
bid1: 10.1,
ask1: 10.1,
prev_close: 10.0,
volume: 110_000,
tick_volume: 110_000,
bid1_volume: 110_000,
ask1_volume: 110_000,
trading_phase: Some("open_auction".to_string()),
paused: false,
upper_limit: 11.1,
lower_limit: 9.1,
price_tick: 0.01,
},
DailyMarketSnapshot {
date: date3,
symbol: "000001.SZ".to_string(),
timestamp: Some("2025-02-03 09:25:00".to_string()),
day_open: 10.2,
open: 10.2,
high: 10.3,
low: 10.1,
close: 10.2,
last_price: 10.2,
bid1: 10.2,
ask1: 10.2,
prev_close: 10.1,
volume: 120_000,
tick_volume: 120_000,
bid1_volume: 120_000,
ask1_volume: 120_000,
trading_phase: Some("open_auction".to_string()),
paused: false,
upper_limit: 11.2,
lower_limit: 9.2,
price_tick: 0.01,
},
],
vec![
DailyFactorSnapshot {
date: date1,
symbol: "000001.SZ".to_string(),
market_cap_bn: 20.0,
free_float_cap_bn: 18.0,
pe_ttm: 10.0,
turnover_ratio: Some(1.0),
effective_turnover_ratio: Some(1.0),
extra_factors: BTreeMap::new(),
},
DailyFactorSnapshot {
date: date2,
symbol: "000001.SZ".to_string(),
market_cap_bn: 21.0,
free_float_cap_bn: 19.0,
pe_ttm: 10.0,
turnover_ratio: Some(1.0),
effective_turnover_ratio: Some(1.0),
extra_factors: BTreeMap::new(),
},
DailyFactorSnapshot {
date: date3,
symbol: "000001.SZ".to_string(),
market_cap_bn: 22.0,
free_float_cap_bn: 20.0,
pe_ttm: 10.0,
turnover_ratio: Some(1.0),
effective_turnover_ratio: Some(1.0),
extra_factors: BTreeMap::new(),
},
],
vec![
CandidateEligibility {
date: date1,
symbol: "000001.SZ".to_string(),
is_st: false,
is_new_listing: false,
is_paused: false,
allow_buy: true,
allow_sell: true,
is_kcb: false,
is_one_yuan: false,
},
CandidateEligibility {
date: date2,
symbol: "000001.SZ".to_string(),
is_st: false,
is_new_listing: false,
is_paused: false,
allow_buy: true,
allow_sell: true,
is_kcb: false,
is_one_yuan: false,
},
CandidateEligibility {
date: date3,
symbol: "000001.SZ".to_string(),
is_st: false,
is_new_listing: false,
is_paused: false,
allow_buy: true,
allow_sell: true,
is_kcb: false,
is_one_yuan: false,
},
],
vec![
BenchmarkSnapshot {
date: date1,
benchmark: "000300.SH".to_string(),
open: 100.0,
close: 100.0,
prev_close: 99.0,
volume: 1_000_000,
},
BenchmarkSnapshot {
date: date2,
benchmark: "000300.SH".to_string(),
open: 101.0,
close: 101.0,
prev_close: 100.0,
volume: 1_100_000,
},
BenchmarkSnapshot {
date: date3,
benchmark: "000300.SH".to_string(),
open: 102.0,
close: 102.0,
prev_close: 101.0,
volume: 1_200_000,
},
],
)
.expect("dataset");
let log = Rc::new(RefCell::new(Vec::new()));
let process_log = Rc::new(RefCell::new(Vec::new()));
let strategy = ScheduledProbeStrategy { log, process_log };
let broker = BrokerSimulator::new_with_execution_price(
ChinaAShareCostModel::default(),
ChinaEquityRuleHooks::default(),
PriceField::DayOpen,
);
let mut engine = BacktestEngine::new(
data,
strategy,
broker,
BacktestConfig {
initial_cash: 100_000.0,
benchmark_code: "000300.SH".to_string(),
start_date: Some(date1),
end_date: Some(date3),
decision_lag_trading_days: 0,
execution_price_field: PriceField::DayOpen,
},
);
let external_log = Rc::new(RefCell::new(Vec::new()));
engine.add_process_listener(ProcessEventKind::PreScheduled, {
let external_log = external_log.clone();
move |event| {
external_log
.borrow_mut()
.push(format!("{:?}:{}", event.kind, event.detail));
}
});
engine.add_process_listener(ProcessEventKind::PostScheduled, {
let external_log = external_log.clone();
move |event| {
external_log
.borrow_mut()
.push(format!("{:?}:{}", event.kind, event.detail));
}
});
engine.run().expect("backtest run");
let external_log = external_log.borrow();
assert!(
external_log
.iter()
.any(|item| { item == "PreScheduled:scheduled:daily_auction:open_auction:pre" })
);
assert!(
external_log
.iter()
.any(|item| { item == "PostScheduled:scheduled:first_trading_day_on_day:on_day:post" })
);
}