Files
fidc-backtest-engine/crates/fidc-core/src/engine.rs
2026-04-23 20:14:05 -07:00

2052 lines
79 KiB
Rust

use std::collections::BTreeSet;
use chrono::NaiveDate;
use serde::Serialize;
use thiserror::Error;
use crate::broker::{BrokerExecutionReport, BrokerSimulator};
use crate::cost::CostModel;
use crate::data::{BenchmarkSnapshot, DataSet, DataSetError, PriceField};
use crate::event_bus::ProcessEventBus;
use crate::events::{
AccountEvent, FillEvent, OrderEvent, OrderSide, OrderStatus, PositionEvent, ProcessEvent,
ProcessEventKind,
};
use crate::metrics::{BacktestMetrics, compute_backtest_metrics};
use crate::portfolio::{CashReceivable, HoldingSummary, PortfolioState};
use crate::rules::EquityRuleHooks;
use crate::scheduler::{ScheduleRule, ScheduleStage, Scheduler, default_stage_time};
use crate::strategy::{Strategy, StrategyContext};
#[derive(Debug, Error)]
pub enum BacktestError {
#[error(transparent)]
Data(#[from] DataSetError),
#[error("missing {field} price for {symbol} on {date}")]
MissingPrice {
date: NaiveDate,
symbol: String,
field: &'static str,
},
#[error("benchmark snapshot missing for {date}")]
MissingBenchmark { date: NaiveDate },
#[error("{0}")]
Execution(String),
}
#[derive(Debug, Clone)]
pub struct BacktestConfig {
pub initial_cash: f64,
pub benchmark_code: String,
pub start_date: Option<NaiveDate>,
pub end_date: Option<NaiveDate>,
pub decision_lag_trading_days: usize,
pub execution_price_field: PriceField,
}
#[derive(Debug, Clone, Serialize)]
pub struct DailyEquityPoint {
#[serde(with = "date_format")]
pub date: NaiveDate,
pub cash: f64,
pub market_value: f64,
pub total_equity: f64,
pub benchmark_close: f64,
pub notes: String,
pub diagnostics: String,
}
#[derive(Debug, Clone)]
pub struct BacktestResult {
pub strategy_name: String,
pub equity_curve: Vec<DailyEquityPoint>,
pub benchmark_series: Vec<BenchmarkSnapshot>,
pub order_events: Vec<OrderEvent>,
pub fills: Vec<FillEvent>,
pub position_events: Vec<PositionEvent>,
pub account_events: Vec<AccountEvent>,
pub process_events: Vec<ProcessEvent>,
pub holdings_summary: Vec<HoldingSummary>,
pub daily_holdings: Vec<HoldingSummary>,
pub metrics: BacktestMetrics,
}
#[derive(Debug, Clone, Serialize)]
pub struct BacktestDayProgress {
#[serde(with = "date_format")]
pub date: NaiveDate,
pub cash: f64,
pub market_value: f64,
pub total_equity: f64,
pub unit_nav: f64,
pub total_return: f64,
pub benchmark_close: f64,
pub daily_fill_count: usize,
pub cumulative_trade_count: usize,
pub holding_count: usize,
pub notes: String,
pub diagnostics: String,
pub orders: Vec<OrderEvent>,
pub fills: Vec<FillEvent>,
pub holdings: Vec<HoldingSummary>,
pub process_events: Vec<ProcessEvent>,
}
pub struct BacktestEngine<S, C, R> {
data: DataSet,
strategy: S,
broker: BrokerSimulator<C, R>,
config: BacktestConfig,
dividend_reinvestment: bool,
process_event_bus: ProcessEventBus,
dynamic_universe: Option<BTreeSet<String>>,
subscriptions: BTreeSet<String>,
}
impl<S, C, R> BacktestEngine<S, C, R> {
pub fn new(
data: DataSet,
strategy: S,
broker: BrokerSimulator<C, R>,
config: BacktestConfig,
) -> Self {
Self {
data,
strategy,
broker,
config,
dividend_reinvestment: false,
process_event_bus: ProcessEventBus::new(),
dynamic_universe: None,
subscriptions: BTreeSet::new(),
}
}
pub fn with_dividend_reinvestment(mut self, enabled: bool) -> Self {
self.dividend_reinvestment = enabled;
self
}
pub fn process_event_bus_mut(&mut self) -> &mut ProcessEventBus {
&mut self.process_event_bus
}
pub fn add_process_listener<F>(&mut self, kind: ProcessEventKind, listener: F)
where
F: FnMut(&ProcessEvent) + 'static,
{
self.process_event_bus.add_listener(kind, listener);
}
pub fn add_any_process_listener<F>(&mut self, listener: F)
where
F: FnMut(&ProcessEvent) + 'static,
{
self.process_event_bus.add_any_listener(listener);
}
}
impl<S, C, R> BacktestEngine<S, C, R>
where
S: Strategy,
C: CostModel,
R: EquityRuleHooks,
{
fn apply_strategy_directives(
&mut self,
execution_date: NaiveDate,
decision_date: NaiveDate,
decision_index: usize,
portfolio: &mut PortfolioState,
open_orders: &[crate::strategy::OpenOrderView],
process_events: &mut Vec<ProcessEvent>,
decision: &mut crate::strategy::StrategyDecision,
directive_report: &mut BrokerExecutionReport,
) -> Result<(), BacktestError> {
if decision.order_intents.is_empty() {
return Ok(());
}
let mut retained = Vec::with_capacity(decision.order_intents.len());
for intent in decision.order_intents.drain(..) {
match intent {
crate::strategy::OrderIntent::UpdateUniverse { symbols, reason } => {
let symbol_count = symbols.len();
self.dynamic_universe = Some(symbols.clone());
decision
.diagnostics
.push(format!("dynamic_universe_updated count={symbol_count}"));
publish_custom_process_event(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
portfolio,
open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
process_events,
ProcessEvent {
date: execution_date,
kind: ProcessEventKind::UniverseUpdated,
order_id: None,
symbol: (symbol_count == 1)
.then(|| symbols.iter().next().cloned())
.flatten(),
side: None,
detail: format!(
"reason={reason} count={symbol_count} symbols={}",
symbols.iter().cloned().collect::<Vec<_>>().join(",")
),
},
)?;
}
crate::strategy::OrderIntent::Subscribe { symbols, reason } => {
let mut added = Vec::new();
for symbol in symbols {
if self.subscriptions.insert(symbol.clone()) {
added.push(symbol);
}
}
if !added.is_empty() {
decision.diagnostics.push(format!(
"subscriptions_added count={} total={}",
added.len(),
self.subscriptions.len()
));
publish_custom_process_event(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
portfolio,
open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
process_events,
ProcessEvent {
date: execution_date,
kind: ProcessEventKind::UniverseSubscribed,
order_id: None,
symbol: (added.len() == 1).then(|| added[0].clone()),
side: None,
detail: format!(
"reason={reason} count={} symbols={}",
added.len(),
added.join(",")
),
},
)?;
}
}
crate::strategy::OrderIntent::Unsubscribe { symbols, reason } => {
let mut removed = Vec::new();
for symbol in symbols {
if self.subscriptions.remove(&symbol) {
removed.push(symbol);
}
}
if !removed.is_empty() {
decision.diagnostics.push(format!(
"subscriptions_removed count={} total={}",
removed.len(),
self.subscriptions.len()
));
publish_custom_process_event(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
portfolio,
open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
process_events,
ProcessEvent {
date: execution_date,
kind: ProcessEventKind::UniverseUnsubscribed,
order_id: None,
symbol: (removed.len() == 1).then(|| removed[0].clone()),
side: None,
detail: format!(
"reason={reason} count={} symbols={}",
removed.len(),
removed.join(",")
),
},
)?;
}
}
crate::strategy::OrderIntent::DepositWithdraw {
amount,
receiving_days,
reason,
} => {
let cash_before = portfolio.cash();
if receiving_days == 0 {
portfolio
.deposit_withdraw(amount)
.map_err(BacktestError::Execution)?;
directive_report.account_events.push(AccountEvent {
date: execution_date,
cash_before,
cash_after: portfolio.cash(),
total_equity: portfolio.total_equity(),
note: format!("deposit_withdraw amount={amount:.2} reason={reason}"),
});
} else {
let payable_date = self
.data
.next_trading_date(execution_date, receiving_days)
.ok_or_else(|| {
BacktestError::Execution(format!(
"no trading date for deposit_withdraw receiving_days={receiving_days} from {execution_date}"
))
})?;
portfolio
.schedule_deposit_withdraw(payable_date, amount, reason.clone())
.map_err(BacktestError::Execution)?;
directive_report.account_events.push(AccountEvent {
date: execution_date,
cash_before,
cash_after: portfolio.cash(),
total_equity: portfolio.total_equity(),
note: format!(
"deposit_withdraw_scheduled amount={amount:.2} payable_date={payable_date} reason={reason}"
),
});
}
decision.diagnostics.push(format!(
"account_deposit_withdraw amount={amount:.2} receiving_days={receiving_days}"
));
publish_custom_process_event(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
&*portfolio,
open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
process_events,
ProcessEvent {
date: execution_date,
kind: ProcessEventKind::AccountDepositWithdraw,
order_id: None,
symbol: None,
side: None,
detail: format!(
"reason={reason} amount={amount:.2} receiving_days={receiving_days} cash_before={cash_before:.2} cash_after={:.2}",
portfolio.cash()
),
},
)?;
}
crate::strategy::OrderIntent::FinanceRepay { amount, reason } => {
let cash_before = portfolio.cash();
let liabilities_before = portfolio.cash_liabilities();
portfolio
.finance_repay(amount)
.map_err(BacktestError::Execution)?;
directive_report.account_events.push(AccountEvent {
date: execution_date,
cash_before,
cash_after: portfolio.cash(),
total_equity: portfolio.total_equity(),
note: format!(
"finance_repay amount={amount:.2} liabilities_before={liabilities_before:.2} liabilities_after={:.2} reason={reason}",
portfolio.cash_liabilities()
),
});
decision.diagnostics.push(format!(
"account_finance_repay amount={amount:.2} liabilities={:.2}",
portfolio.cash_liabilities()
));
publish_custom_process_event(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
&*portfolio,
open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
process_events,
ProcessEvent {
date: execution_date,
kind: ProcessEventKind::AccountFinanceRepay,
order_id: None,
symbol: None,
side: None,
detail: format!(
"reason={reason} amount={amount:.2} cash_before={cash_before:.2} cash_after={:.2} liabilities_before={liabilities_before:.2} liabilities_after={:.2}",
portfolio.cash(),
portfolio.cash_liabilities()
),
},
)?;
}
other => retained.push(other),
}
}
decision.order_intents = retained;
Ok(())
}
pub fn run(&mut self) -> Result<BacktestResult, BacktestError> {
self.run_with_progress(|_| {})
}
pub fn run_with_progress<F>(
&mut self,
mut on_progress: F,
) -> Result<BacktestResult, BacktestError>
where
F: FnMut(&BacktestDayProgress),
{
let mut portfolio = PortfolioState::new(self.config.initial_cash);
let scheduler_calendar = self.data.calendar().clone();
let scheduler = Scheduler::new(&scheduler_calendar);
let execution_dates = self
.data
.calendar()
.iter()
.filter(|date| {
self.config
.start_date
.map(|start| *date >= start)
.unwrap_or(true)
})
.filter(|date| self.config.end_date.map(|end| *date <= end).unwrap_or(true))
.filter(|date| {
!self.data.factor_snapshots_on(*date).is_empty()
&& !self.data.candidate_snapshots_on(*date).is_empty()
})
.collect::<Vec<_>>();
let mut result = BacktestResult {
strategy_name: self.strategy.name().to_string(),
benchmark_series: self
.data
.benchmark_series()
.into_iter()
.filter(|row| {
self.config
.start_date
.map(|start| row.date >= start)
.unwrap_or(true)
})
.filter(|row| {
self.config
.end_date
.map(|end| row.date <= end)
.unwrap_or(true)
})
.collect(),
order_events: Vec::new(),
fills: Vec::new(),
position_events: Vec::new(),
account_events: Vec::new(),
process_events: Vec::new(),
equity_curve: Vec::new(),
holdings_summary: Vec::new(),
daily_holdings: Vec::new(),
metrics: BacktestMetrics::default(),
};
for (execution_idx, execution_date) in execution_dates.iter().copied().enumerate() {
let mut corporate_action_notes = Vec::new();
portfolio.begin_trading_day();
let pending_cash_flow_report = self.settle_pending_cash_flows(
execution_date,
&mut portfolio,
&mut corporate_action_notes,
);
self.extend_result(&mut result, pending_cash_flow_report);
let receivable_report = self.settle_cash_receivables(
execution_date,
&mut portfolio,
&mut corporate_action_notes,
)?;
self.extend_result(&mut result, receivable_report);
let corporate_action_report = self.apply_corporate_actions(
execution_date,
&mut portfolio,
&mut corporate_action_notes,
)?;
self.extend_result(&mut result, corporate_action_report);
let delisting_report = self.settle_delisted_positions(
execution_date,
&mut portfolio,
&mut corporate_action_notes,
)?;
self.extend_result(&mut result, delisting_report);
let decision_slot = execution_idx
.checked_sub(self.config.decision_lag_trading_days)
.map(|decision_idx| (decision_idx, execution_dates[decision_idx]));
let (decision_index, decision_date) =
decision_slot.unwrap_or((execution_idx, execution_date));
let mut process_events = Vec::new();
let mut directive_report = BrokerExecutionReport::default();
let pre_open_orders = self.broker.open_order_views();
let schedule_rules = self.strategy.schedule_rules();
publish_phase_event(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
&portfolio,
&pre_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::PreBeforeTrading,
"before_trading:pre",
)?;
self.strategy.before_trading(&StrategyContext {
execution_date,
decision_date,
decision_index,
data: &self.data,
portfolio: &portfolio,
open_orders: &pre_open_orders,
dynamic_universe: self.dynamic_universe.as_ref(),
subscriptions: &self.subscriptions,
process_events: &process_events,
active_process_event: None,
active_datetime: stage_datetime(
execution_date,
default_stage_time(ScheduleStage::BeforeTrading),
),
order_events: result.order_events.as_slice(),
fills: result.fills.as_slice(),
})?;
publish_phase_event(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
&portfolio,
&pre_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::BeforeTrading,
"before_trading",
)?;
let mut before_trading_decision = collect_scheduled_decisions(
&mut self.strategy,
&scheduler,
execution_date,
ScheduleStage::BeforeTrading,
&schedule_rules,
decision_date,
decision_index,
&self.data,
&portfolio,
&pre_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
&mut self.process_event_bus,
default_stage_time(ScheduleStage::BeforeTrading),
result.order_events.as_slice(),
result.fills.as_slice(),
)?;
self.apply_strategy_directives(
execution_date,
decision_date,
decision_index,
&mut portfolio,
&pre_open_orders,
&mut process_events,
&mut before_trading_decision,
&mut directive_report,
)?;
publish_phase_event(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
&portfolio,
&pre_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::PostBeforeTrading,
"before_trading:post",
)?;
publish_phase_event(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
&portfolio,
&pre_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::PreOpenAuction,
"open_auction:pre",
)?;
let mut auction_decision = collect_scheduled_decisions(
&mut self.strategy,
&scheduler,
execution_date,
ScheduleStage::OpenAuction,
&schedule_rules,
decision_date,
decision_index,
&self.data,
&portfolio,
&pre_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
&mut self.process_event_bus,
default_stage_time(ScheduleStage::OpenAuction),
result.order_events.as_slice(),
result.fills.as_slice(),
)?;
auction_decision.merge_from(self.strategy.open_auction(&StrategyContext {
execution_date,
decision_date,
decision_index,
data: &self.data,
portfolio: &portfolio,
open_orders: &pre_open_orders,
dynamic_universe: self.dynamic_universe.as_ref(),
subscriptions: &self.subscriptions,
process_events: &process_events,
active_process_event: None,
active_datetime: stage_datetime(
execution_date,
default_stage_time(ScheduleStage::OpenAuction),
),
order_events: result.order_events.as_slice(),
fills: result.fills.as_slice(),
})?);
publish_phase_event(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
&portfolio,
&pre_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::OpenAuction,
"open_auction",
)?;
self.apply_strategy_directives(
execution_date,
decision_date,
decision_index,
&mut portfolio,
&pre_open_orders,
&mut process_events,
&mut auction_decision,
&mut directive_report,
)?;
let mut report = self.broker.execute(
execution_date,
&mut portfolio,
&self.data,
&auction_decision,
)?;
let post_auction_open_orders = self.broker.open_order_views();
publish_process_events(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
&portfolio,
&post_auction_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
&mut report.process_events,
)?;
publish_phase_event(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
&portfolio,
&post_auction_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::PostOpenAuction,
"open_auction:post",
)?;
publish_phase_event(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
&portfolio,
&post_auction_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::PreOnDay,
"on_day:pre",
)?;
let on_day_open_orders = self.broker.open_order_views();
let mut decision = decision_slot
.map(|(decision_idx, decision_date)| {
self.strategy.on_day(&StrategyContext {
execution_date,
decision_date,
decision_index: decision_idx,
data: &self.data,
portfolio: &portfolio,
open_orders: &on_day_open_orders,
dynamic_universe: self.dynamic_universe.as_ref(),
subscriptions: &self.subscriptions,
process_events: &process_events,
active_process_event: None,
active_datetime: stage_datetime(
execution_date,
default_stage_time(ScheduleStage::OnDay),
),
order_events: result.order_events.as_slice(),
fills: result.fills.as_slice(),
})
})
.transpose()?
.unwrap_or_default();
decision.merge_from(collect_scheduled_decisions(
&mut self.strategy,
&scheduler,
execution_date,
ScheduleStage::OnDay,
&schedule_rules,
decision_date,
decision_index,
&self.data,
&portfolio,
&on_day_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
&mut self.process_event_bus,
default_stage_time(ScheduleStage::OnDay),
result.order_events.as_slice(),
result.fills.as_slice(),
)?);
publish_phase_event(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
&portfolio,
&on_day_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::OnDay,
"on_day",
)?;
let bar_open_orders = self.broker.open_order_views();
publish_phase_event(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
&portfolio,
&bar_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::PreBar,
"bar:pre",
)?;
decision.merge_from(collect_scheduled_decisions(
&mut self.strategy,
&scheduler,
execution_date,
ScheduleStage::Bar,
&schedule_rules,
decision_date,
decision_index,
&self.data,
&portfolio,
&bar_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
&mut self.process_event_bus,
default_stage_time(ScheduleStage::Bar),
result.order_events.as_slice(),
result.fills.as_slice(),
)?);
decision.merge_from(self.strategy.on_bar(&StrategyContext {
execution_date,
decision_date,
decision_index,
data: &self.data,
portfolio: &portfolio,
open_orders: &bar_open_orders,
dynamic_universe: self.dynamic_universe.as_ref(),
subscriptions: &self.subscriptions,
process_events: &process_events,
active_process_event: None,
active_datetime: stage_datetime(
execution_date,
default_stage_time(ScheduleStage::Bar),
),
order_events: result.order_events.as_slice(),
fills: result.fills.as_slice(),
})?);
publish_phase_event(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
&portfolio,
&bar_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::Bar,
"bar",
)?;
self.apply_strategy_directives(
execution_date,
decision_date,
decision_index,
&mut portfolio,
&on_day_open_orders,
&mut process_events,
&mut decision,
&mut directive_report,
)?;
let mut intraday_report =
self.broker
.execute(execution_date, &mut portfolio, &self.data, &decision)?;
let post_intraday_open_orders = self.broker.open_order_views();
publish_process_events(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
&portfolio,
&post_intraday_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
&mut intraday_report.process_events,
)?;
report.order_events.extend(intraday_report.order_events);
report.fill_events.extend(intraday_report.fill_events);
report
.position_events
.extend(intraday_report.position_events);
report.account_events.extend(intraday_report.account_events);
report.diagnostics.extend(intraday_report.diagnostics);
publish_phase_event(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
&portfolio,
&post_intraday_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::PostOnDay,
"on_day:post",
)?;
publish_phase_event(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
&portfolio,
&post_intraday_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::PostBar,
"bar:post",
)?;
if should_run_tick_events(&schedule_rules, &self.subscriptions) {
let filter_by_subscription = !self.subscriptions.is_empty();
let tick_quotes = self
.data
.execution_quotes_on_date(execution_date)
.into_iter()
.filter(|quote| {
!filter_by_subscription || self.subscriptions.contains(&quote.symbol)
})
.collect::<Vec<_>>();
for quote in tick_quotes {
let tick_time = quote.timestamp.time();
let tick_open_orders = self.broker.open_order_views();
publish_phase_event(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
&portfolio,
&tick_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::PreTick,
format!("tick:{}:{}:pre", quote.symbol, quote.timestamp),
)?;
let mut tick_decision = collect_scheduled_decisions(
&mut self.strategy,
&scheduler,
execution_date,
ScheduleStage::Tick,
&schedule_rules,
decision_date,
decision_index,
&self.data,
&portfolio,
&tick_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
&mut self.process_event_bus,
Some(tick_time),
result.order_events.as_slice(),
result.fills.as_slice(),
)?;
tick_decision.merge_from(self.strategy.on_tick(
&StrategyContext {
execution_date,
decision_date,
decision_index,
data: &self.data,
portfolio: &portfolio,
open_orders: &tick_open_orders,
dynamic_universe: self.dynamic_universe.as_ref(),
subscriptions: &self.subscriptions,
process_events: &process_events,
active_process_event: None,
active_datetime: Some(quote.timestamp),
order_events: result.order_events.as_slice(),
fills: result.fills.as_slice(),
},
&quote,
)?);
publish_phase_event(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
&portfolio,
&tick_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::Tick,
format!("tick:{}:{}", quote.symbol, quote.timestamp),
)?;
self.apply_strategy_directives(
execution_date,
decision_date,
decision_index,
&mut portfolio,
&tick_open_orders,
&mut process_events,
&mut tick_decision,
&mut directive_report,
)?;
let mut tick_report = self.broker.execute_between(
execution_date,
&mut portfolio,
&self.data,
&tick_decision,
Some(tick_time),
Some(tick_time),
)?;
let post_tick_open_orders = self.broker.open_order_views();
publish_process_events(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
&portfolio,
&post_tick_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
&mut tick_report.process_events,
)?;
merge_broker_report(&mut report, tick_report);
publish_phase_event(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
&portfolio,
&post_tick_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::PostTick,
format!("tick:{}:{}:post", quote.symbol, quote.timestamp),
)?;
}
}
portfolio.update_prices(execution_date, &self.data, PriceField::Close)?;
let post_trade_open_orders = self.broker.open_order_views();
let visible_order_events = result
.order_events
.iter()
.cloned()
.chain(report.order_events.iter().cloned())
.collect::<Vec<_>>();
let visible_fills = result
.fills
.iter()
.cloned()
.chain(report.fill_events.iter().cloned())
.collect::<Vec<_>>();
publish_phase_event(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
&portfolio,
&post_trade_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::PreAfterTrading,
"after_trading:pre",
)?;
self.strategy.after_trading(&StrategyContext {
execution_date,
decision_date,
decision_index,
data: &self.data,
portfolio: &portfolio,
open_orders: &post_trade_open_orders,
dynamic_universe: self.dynamic_universe.as_ref(),
subscriptions: &self.subscriptions,
process_events: &process_events,
active_process_event: None,
active_datetime: stage_datetime(
execution_date,
default_stage_time(ScheduleStage::AfterTrading),
),
order_events: visible_order_events.as_slice(),
fills: visible_fills.as_slice(),
})?;
publish_phase_event(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
&portfolio,
&post_trade_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::AfterTrading,
"after_trading",
)?;
let mut after_trading_decision = collect_scheduled_decisions(
&mut self.strategy,
&scheduler,
execution_date,
ScheduleStage::AfterTrading,
&schedule_rules,
decision_date,
decision_index,
&self.data,
&portfolio,
&post_trade_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
&mut self.process_event_bus,
default_stage_time(ScheduleStage::AfterTrading),
visible_order_events.as_slice(),
visible_fills.as_slice(),
)?;
self.apply_strategy_directives(
execution_date,
decision_date,
decision_index,
&mut portfolio,
&post_trade_open_orders,
&mut process_events,
&mut after_trading_decision,
&mut directive_report,
)?;
let mut close_report = self.broker.after_trading(execution_date);
publish_process_events(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
&portfolio,
&post_trade_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
&mut close_report.process_events,
)?;
report.order_events.extend(close_report.order_events);
report.fill_events.extend(close_report.fill_events);
report.position_events.extend(close_report.position_events);
report.account_events.extend(close_report.account_events);
report.diagnostics.extend(close_report.diagnostics);
let post_close_open_orders = self.broker.open_order_views();
let visible_order_events_after_close = result
.order_events
.iter()
.cloned()
.chain(report.order_events.iter().cloned())
.collect::<Vec<_>>();
let visible_fills_after_close = result
.fills
.iter()
.cloned()
.chain(report.fill_events.iter().cloned())
.collect::<Vec<_>>();
publish_phase_event(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
&portfolio,
&post_close_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::PostAfterTrading,
"after_trading:post",
)?;
publish_phase_event(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
&portfolio,
&post_close_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::PreSettlement,
"settlement:pre",
)?;
self.strategy.on_settlement(&StrategyContext {
execution_date,
decision_date,
decision_index,
data: &self.data,
portfolio: &portfolio,
open_orders: &post_close_open_orders,
dynamic_universe: self.dynamic_universe.as_ref(),
subscriptions: &self.subscriptions,
process_events: &process_events,
active_process_event: None,
active_datetime: stage_datetime(
execution_date,
default_stage_time(ScheduleStage::Settlement),
),
order_events: visible_order_events_after_close.as_slice(),
fills: visible_fills_after_close.as_slice(),
})?;
publish_phase_event(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
&portfolio,
&post_close_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::Settlement,
"settlement",
)?;
let mut settlement_decision = collect_scheduled_decisions(
&mut self.strategy,
&scheduler,
execution_date,
ScheduleStage::Settlement,
&schedule_rules,
decision_date,
decision_index,
&self.data,
&portfolio,
&post_close_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
&mut self.process_event_bus,
default_stage_time(ScheduleStage::Settlement),
visible_order_events_after_close.as_slice(),
visible_fills_after_close.as_slice(),
)?;
self.apply_strategy_directives(
execution_date,
decision_date,
decision_index,
&mut portfolio,
&post_close_open_orders,
&mut process_events,
&mut settlement_decision,
&mut directive_report,
)?;
publish_phase_event(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
&portfolio,
&post_close_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::PostSettlement,
"settlement:post",
)?;
merge_broker_report(&mut report, directive_report);
let daily_fill_count = report.fill_events.len();
let day_orders = report.order_events.clone();
let day_fills = report.fill_events.clone();
let broker_diagnostics = report.diagnostics.clone();
self.extend_result(&mut result, report);
let benchmark =
self.data
.benchmark(execution_date)
.ok_or(BacktestError::MissingBenchmark {
date: execution_date,
})?;
let notes = corporate_action_notes
.into_iter()
.chain(decision.notes.into_iter())
.collect::<Vec<_>>()
.join(" | ");
let diagnostics = decision
.diagnostics
.into_iter()
.chain(broker_diagnostics.into_iter())
.collect::<Vec<_>>()
.join(" | ");
let holdings_for_day = portfolio.holdings_summary(execution_date);
let day_process_events = process_events.clone();
result.equity_curve.push(DailyEquityPoint {
date: execution_date,
cash: portfolio.cash(),
market_value: portfolio.market_value(),
total_equity: portfolio.total_equity(),
benchmark_close: benchmark.close,
notes,
diagnostics,
});
result.daily_holdings.extend(holdings_for_day.clone());
let latest = result
.equity_curve
.last()
.expect("equity point pushed for progress event");
on_progress(&BacktestDayProgress {
date: execution_date,
cash: latest.cash,
market_value: latest.market_value,
total_equity: latest.total_equity,
unit_nav: if self.config.initial_cash.abs() < f64::EPSILON {
0.0
} else {
latest.total_equity / self.config.initial_cash
},
total_return: if self.config.initial_cash.abs() < f64::EPSILON {
0.0
} else {
(latest.total_equity / self.config.initial_cash) - 1.0
},
benchmark_close: latest.benchmark_close,
daily_fill_count,
cumulative_trade_count: result.fills.len(),
holding_count: holdings_for_day.len(),
notes: latest.notes.clone(),
diagnostics: latest.diagnostics.clone(),
orders: day_orders,
fills: day_fills,
holdings: holdings_for_day,
process_events: day_process_events,
});
result.process_events.extend(process_events);
}
if let Some(last_date) = execution_dates.last().copied() {
result.holdings_summary = portfolio.holdings_summary(last_date);
}
result.metrics = compute_backtest_metrics(
&result.equity_curve,
&result.fills,
&result.daily_holdings,
self.config.initial_cash,
);
Ok(result)
}
fn extend_result(
&self,
result: &mut BacktestResult,
report: BrokerExecutionReport,
) -> BrokerExecutionReport {
result.order_events.extend(report.order_events.clone());
result.fills.extend(report.fill_events.clone());
result
.position_events
.extend(report.position_events.clone());
result.account_events.extend(report.account_events.clone());
report
}
fn apply_corporate_actions(
&self,
date: NaiveDate,
portfolio: &mut PortfolioState,
notes: &mut Vec<String>,
) -> Result<BrokerExecutionReport, BacktestError> {
let mut report = BrokerExecutionReport::default();
for action in self.data.corporate_actions_on(date) {
if !action.has_effect() {
continue;
}
let Some(existing_position) = portfolio.position(&action.symbol) else {
continue;
};
if existing_position.quantity == 0 {
continue;
}
if action.share_cash.abs() > f64::EPSILON {
let cash_before = portfolio.cash();
let (cash_delta, quantity_after, average_cost) = {
let position = portfolio
.position_mut_if_exists(&action.symbol)
.expect("position exists for dividend action");
let cash_delta = position.apply_cash_dividend(action.share_cash);
(cash_delta, position.quantity, position.average_cost)
};
if cash_delta.abs() > f64::EPSILON {
let payable_date = action.payable_date.unwrap_or(date);
let immediate_cash = payable_date <= date;
let note = if immediate_cash {
portfolio.apply_cash_delta(cash_delta);
format!(
"cash_dividend {} share_cash={:.6} quantity={} cash={:.2}",
action.symbol, action.share_cash, quantity_after, cash_delta
)
} else {
portfolio.add_cash_receivable(CashReceivable {
symbol: action.symbol.clone(),
ex_date: date,
payable_date,
amount: cash_delta,
reason: format!("cash_dividend {:.6}", action.share_cash),
});
format!(
"cash_dividend_receivable {} share_cash={:.6} quantity={} payable_date={} cash={:.2}",
action.symbol,
action.share_cash,
quantity_after,
payable_date,
cash_delta
)
};
notes.push(note.clone());
report.account_events.push(AccountEvent {
date,
cash_before,
cash_after: portfolio.cash(),
total_equity: portfolio.total_equity(),
note,
});
report.position_events.push(PositionEvent {
date,
symbol: action.symbol.clone(),
delta_quantity: 0,
quantity_after,
average_cost,
realized_pnl_delta: 0.0,
reason: format!("cash_dividend {:.6}", action.share_cash),
});
}
}
let split_ratio = action.split_ratio();
if (split_ratio - 1.0).abs() > f64::EPSILON {
let (delta_quantity, quantity_after, average_cost) = {
let position = portfolio
.position_mut_if_exists(&action.symbol)
.expect("position exists for split action");
let delta_quantity = position.apply_split_ratio(split_ratio);
(delta_quantity, position.quantity, position.average_cost)
};
if delta_quantity != 0 {
let note = format!(
"stock_split {} ratio={:.6} delta_qty={}",
action.symbol, split_ratio, delta_quantity
);
notes.push(note);
report.position_events.push(PositionEvent {
date,
symbol: action.symbol.clone(),
delta_quantity,
quantity_after,
average_cost,
realized_pnl_delta: 0.0,
reason: format!("stock_split {:.6}", split_ratio),
});
}
}
if action.has_successor_conversion() {
let successor_symbol = action
.successor_symbol
.as_deref()
.expect("successor symbol checked");
let Some(outcome) = portfolio.apply_successor_conversion(
&action.symbol,
successor_symbol,
action.successor_ratio_value(),
action.successor_cash_value(),
) else {
continue;
};
let reason = format!(
"successor_conversion {}->{} ratio={:.6} cash_per_share={:.6}",
outcome.old_symbol,
outcome.new_symbol,
action.successor_ratio_value(),
action.successor_cash_value()
);
notes.push(reason.clone());
report.position_events.push(PositionEvent {
date,
symbol: outcome.old_symbol.clone(),
delta_quantity: -(outcome.old_quantity as i32),
quantity_after: 0,
average_cost: 0.0,
realized_pnl_delta: 0.0,
reason: reason.clone(),
});
report.position_events.push(PositionEvent {
date,
symbol: outcome.new_symbol.clone(),
delta_quantity: outcome.new_quantity_delta,
quantity_after: outcome.new_quantity_after,
average_cost: outcome.new_average_cost_after,
realized_pnl_delta: 0.0,
reason: reason.clone(),
});
if outcome.cash_delta.abs() > f64::EPSILON {
let cash_before = portfolio.cash();
portfolio.apply_cash_delta(outcome.cash_delta);
report.account_events.push(AccountEvent {
date,
cash_before,
cash_after: portfolio.cash(),
total_equity: portfolio.total_equity(),
note: format!("{} cash={:.2}", reason, outcome.cash_delta),
});
}
}
}
portfolio.prune_flat_positions();
Ok(report)
}
fn settle_cash_receivables(
&self,
date: NaiveDate,
portfolio: &mut PortfolioState,
notes: &mut Vec<String>,
) -> Result<BrokerExecutionReport, BacktestError> {
let mut report = BrokerExecutionReport::default();
let settled = portfolio.settle_cash_receivables(date);
for receivable in settled {
let mut note = format!(
"cash_receivable_settled {} ex_date={} payable_date={} cash={:.2}",
receivable.symbol, receivable.ex_date, receivable.payable_date, receivable.amount
);
let cash_before = portfolio.cash() - receivable.amount;
if self.dividend_reinvestment
&& receivable.reason.starts_with("cash_dividend")
&& receivable.amount > 0.0
{
let reinvest_price = portfolio
.position(&receivable.symbol)
.map(|position| position.last_price)
.filter(|price| price.is_finite() && *price > 0.0)
.or_else(|| {
self.data
.calendar()
.previous_day(date)
.and_then(|prev_date| {
self.data.price_on_or_before(
prev_date,
&receivable.symbol,
PriceField::Close,
)
})
});
let round_lot = self
.data
.instrument(&receivable.symbol)
.map(|instrument| instrument.round_lot.max(1))
.unwrap_or(100);
if let Some(price) = reinvest_price {
let raw_quantity = (receivable.amount / price).floor() as u32;
let reinvest_quantity = (raw_quantity / round_lot) * round_lot;
if reinvest_quantity > 0 {
let reinvest_cash = reinvest_quantity as f64 * price;
let residual_cash = receivable.amount - reinvest_cash;
portfolio.apply_cash_delta(-reinvest_cash);
portfolio.position_mut(&receivable.symbol).buy(
date,
reinvest_quantity,
price,
);
note = format!(
"cash_receivable_reinvested {} ex_date={} payable_date={} cash={:.2} reinvest_qty={} reinvest_price={:.4} residual_cash={:.2}",
receivable.symbol,
receivable.ex_date,
receivable.payable_date,
receivable.amount,
reinvest_quantity,
price,
residual_cash
);
report.fill_events.push(FillEvent {
date,
order_id: None,
symbol: receivable.symbol.clone(),
side: OrderSide::Buy,
quantity: reinvest_quantity,
price,
gross_amount: reinvest_cash,
commission: 0.0,
stamp_tax: 0.0,
net_cash_flow: -reinvest_cash,
reason: "dividend_reinvestment".to_string(),
});
report.position_events.push(PositionEvent {
date,
symbol: receivable.symbol.clone(),
delta_quantity: reinvest_quantity as i32,
quantity_after: portfolio
.position(&receivable.symbol)
.map(|position| position.quantity)
.unwrap_or(0),
average_cost: portfolio
.position(&receivable.symbol)
.map(|position| position.average_cost)
.unwrap_or(0.0),
realized_pnl_delta: 0.0,
reason: "dividend_reinvestment".to_string(),
});
report.process_events.push(ProcessEvent {
date,
kind: ProcessEventKind::Trade,
order_id: None,
symbol: Some(receivable.symbol.clone()),
side: Some(OrderSide::Buy),
detail: format!(
"dividend_reinvestment quantity={} price={}",
reinvest_quantity, price
),
});
}
}
}
notes.push(note.clone());
report.account_events.push(AccountEvent {
date,
cash_before,
cash_after: portfolio.cash(),
total_equity: portfolio.total_equity(),
note,
});
}
Ok(report)
}
fn settle_pending_cash_flows(
&self,
date: NaiveDate,
portfolio: &mut PortfolioState,
notes: &mut Vec<String>,
) -> BrokerExecutionReport {
let mut report = BrokerExecutionReport::default();
for flow in portfolio.settle_pending_cash_flows(date) {
let cash_before = portfolio.cash() - flow.amount;
let note = format!(
"deposit_withdraw_settled amount={:.2} payable_date={} reason={}",
flow.amount, flow.payable_date, flow.reason
);
notes.push(note.clone());
report.account_events.push(AccountEvent {
date,
cash_before,
cash_after: portfolio.cash(),
total_equity: portfolio.total_equity(),
note,
});
}
report
}
fn settle_delisted_positions(
&self,
date: NaiveDate,
portfolio: &mut PortfolioState,
notes: &mut Vec<String>,
) -> Result<BrokerExecutionReport, BacktestError> {
let mut report = BrokerExecutionReport::default();
let symbols = portfolio.positions().keys().cloned().collect::<Vec<_>>();
for symbol in symbols {
let Some(position) = portfolio.position(&symbol) else {
continue;
};
if position.quantity == 0 {
continue;
}
let Some(instrument) = self.data.instrument(&symbol) else {
continue;
};
let should_settle = instrument.is_delisted_before(date)
|| (instrument.status.eq_ignore_ascii_case("delisted")
&& instrument.delisted_at.is_none()
&& self.data.market(date, &symbol).is_none());
if !should_settle {
continue;
}
let quantity = position.quantity;
let fallback_reference_price = if position.last_price > 0.0 {
position.last_price
} else {
position.average_cost
};
let effective_delisted_at = instrument
.delisted_at
.or_else(|| self.data.calendar().previous_day(date))
.unwrap_or(date);
let settlement_price = self
.data
.price_on_or_before(effective_delisted_at, &symbol, PriceField::Close)
.or_else(|| {
self.data
.price_on_or_before(date, &symbol, PriceField::Close)
})
.filter(|price| price.is_finite() && *price > 0.0)
.unwrap_or(fallback_reference_price);
if !settlement_price.is_finite() || settlement_price <= 0.0 {
return Err(BacktestError::Execution(format!(
"missing delisting settlement price for {} on {}",
symbol, date
)));
}
let cash_before = portfolio.cash();
let gross_amount = settlement_price * quantity as f64;
let realized_pnl_delta = {
let position = portfolio
.position_mut_if_exists(&symbol)
.expect("position exists for delisting settlement");
position
.sell(quantity, settlement_price)
.map_err(BacktestError::Execution)?
};
portfolio.apply_cash_delta(gross_amount);
portfolio.prune_flat_positions();
let reason = format!(
"delisted_cash_settlement effective_date={} status={}",
effective_delisted_at, instrument.status
);
notes.push(reason.clone());
report.order_events.push(OrderEvent {
date,
order_id: None,
symbol: symbol.clone(),
side: OrderSide::Sell,
requested_quantity: quantity,
filled_quantity: quantity,
status: OrderStatus::Filled,
reason: reason.clone(),
});
report.fill_events.push(FillEvent {
date,
order_id: None,
symbol: symbol.clone(),
side: OrderSide::Sell,
quantity,
price: settlement_price,
gross_amount,
commission: 0.0,
stamp_tax: 0.0,
net_cash_flow: gross_amount,
reason: reason.clone(),
});
report.position_events.push(PositionEvent {
date,
symbol: symbol.clone(),
delta_quantity: -(quantity as i32),
quantity_after: 0,
average_cost: 0.0,
realized_pnl_delta,
reason: reason.clone(),
});
report.account_events.push(AccountEvent {
date,
cash_before,
cash_after: portfolio.cash(),
total_equity: portfolio.total_equity(),
note: reason,
});
}
Ok(report)
}
}
fn collect_scheduled_decisions<S: Strategy>(
strategy: &mut S,
scheduler: &Scheduler<'_>,
execution_date: NaiveDate,
stage: ScheduleStage,
rules: &[ScheduleRule],
decision_date: NaiveDate,
decision_index: usize,
data: &crate::data::DataSet,
portfolio: &PortfolioState,
open_orders: &[crate::strategy::OpenOrderView],
dynamic_universe: Option<&BTreeSet<String>>,
subscriptions: &BTreeSet<String>,
process_events: &mut Vec<ProcessEvent>,
process_event_bus: &mut ProcessEventBus,
current_time: Option<chrono::NaiveTime>,
order_events: &[OrderEvent],
fills: &[FillEvent],
) -> Result<crate::strategy::StrategyDecision, BacktestError> {
let mut combined = crate::strategy::StrategyDecision::default();
for rule in scheduler.triggered_rules_at(execution_date, stage, current_time, rules) {
publish_phase_event(
strategy,
process_event_bus,
execution_date,
decision_date,
decision_index,
data,
portfolio,
open_orders,
dynamic_universe,
subscriptions,
process_events,
execution_date,
ProcessEventKind::PreScheduled,
format!("scheduled:{}:{}:pre", rule.name, stage_label(stage)),
)?;
combined.merge_from(strategy.on_scheduled(
&StrategyContext {
execution_date,
decision_date,
decision_index,
data,
portfolio,
open_orders,
dynamic_universe,
subscriptions,
process_events: process_events.as_slice(),
active_process_event: None,
active_datetime: stage_datetime(execution_date, current_time),
order_events,
fills,
},
rule,
)?);
publish_phase_event(
strategy,
process_event_bus,
execution_date,
decision_date,
decision_index,
data,
portfolio,
open_orders,
dynamic_universe,
subscriptions,
process_events,
execution_date,
ProcessEventKind::PostScheduled,
format!("scheduled:{}:{}:post", rule.name, stage_label(stage)),
)?;
}
Ok(combined)
}
fn publish_phase_event<S: Strategy>(
strategy: &mut S,
process_event_bus: &mut ProcessEventBus,
execution_date: NaiveDate,
decision_date: NaiveDate,
decision_index: usize,
data: &crate::data::DataSet,
portfolio: &PortfolioState,
open_orders: &[crate::strategy::OpenOrderView],
dynamic_universe: Option<&BTreeSet<String>>,
subscriptions: &BTreeSet<String>,
events: &mut Vec<ProcessEvent>,
date: NaiveDate,
kind: ProcessEventKind,
detail: impl Into<String>,
) -> Result<(), BacktestError> {
let event = ProcessEvent {
date,
kind,
order_id: None,
symbol: None,
side: None,
detail: detail.into(),
};
process_event_bus.publish(&event);
let process_events = events.as_slice();
let event_ctx = StrategyContext {
execution_date,
decision_date,
decision_index,
data,
portfolio,
open_orders,
dynamic_universe,
subscriptions,
process_events,
active_process_event: Some(&event),
active_datetime: None,
order_events: &[],
fills: &[],
};
strategy.on_process_event(&event_ctx, &event)?;
events.push(event);
Ok(())
}
fn publish_process_events<S: Strategy>(
strategy: &mut S,
process_event_bus: &mut ProcessEventBus,
execution_date: NaiveDate,
decision_date: NaiveDate,
decision_index: usize,
data: &crate::data::DataSet,
portfolio: &PortfolioState,
open_orders: &[crate::strategy::OpenOrderView],
dynamic_universe: Option<&BTreeSet<String>>,
subscriptions: &BTreeSet<String>,
target: &mut Vec<ProcessEvent>,
incoming: &mut Vec<ProcessEvent>,
) -> Result<(), BacktestError> {
for event in incoming.drain(..) {
process_event_bus.publish(&event);
let process_events = target.as_slice();
let event_ctx = StrategyContext {
execution_date,
decision_date,
decision_index,
data,
portfolio,
open_orders,
dynamic_universe,
subscriptions,
process_events,
active_process_event: Some(&event),
active_datetime: None,
order_events: &[],
fills: &[],
};
strategy.on_process_event(&event_ctx, &event)?;
target.push(event);
}
Ok(())
}
fn publish_custom_process_event<S: Strategy>(
strategy: &mut S,
process_event_bus: &mut ProcessEventBus,
execution_date: NaiveDate,
decision_date: NaiveDate,
decision_index: usize,
data: &crate::data::DataSet,
portfolio: &PortfolioState,
open_orders: &[crate::strategy::OpenOrderView],
dynamic_universe: Option<&BTreeSet<String>>,
subscriptions: &BTreeSet<String>,
target: &mut Vec<ProcessEvent>,
event: ProcessEvent,
) -> Result<(), BacktestError> {
process_event_bus.publish(&event);
let process_events = target.as_slice();
let event_ctx = StrategyContext {
execution_date,
decision_date,
decision_index,
data,
portfolio,
open_orders,
dynamic_universe,
subscriptions,
process_events,
active_process_event: Some(&event),
active_datetime: None,
order_events: &[],
fills: &[],
};
strategy.on_process_event(&event_ctx, &event)?;
target.push(event);
Ok(())
}
fn stage_label(stage: ScheduleStage) -> &'static str {
match stage {
ScheduleStage::BeforeTrading => "before_trading",
ScheduleStage::OpenAuction => "open_auction",
ScheduleStage::Bar => "bar",
ScheduleStage::Tick => "tick",
ScheduleStage::OnDay => "on_day",
ScheduleStage::AfterTrading => "after_trading",
ScheduleStage::Settlement => "settlement",
}
}
fn stage_datetime(
date: NaiveDate,
time: Option<chrono::NaiveTime>,
) -> Option<chrono::NaiveDateTime> {
time.map(|value| date.and_time(value))
}
fn should_run_tick_events(rules: &[ScheduleRule], subscriptions: &BTreeSet<String>) -> bool {
!subscriptions.is_empty() || rules.iter().any(|rule| rule.stage == ScheduleStage::Tick)
}
fn merge_broker_report(target: &mut BrokerExecutionReport, incoming: BrokerExecutionReport) {
target.order_events.extend(incoming.order_events);
target.fill_events.extend(incoming.fill_events);
target.position_events.extend(incoming.position_events);
target.account_events.extend(incoming.account_events);
target.process_events.extend(incoming.process_events);
target.diagnostics.extend(incoming.diagnostics);
}
mod date_format {
use chrono::NaiveDate;
use serde::Serializer;
const FORMAT: &str = "%Y-%m-%d";
pub fn serialize<S>(date: &NaiveDate, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&date.format(FORMAT).to_string())
}
}