use std::collections::{BTreeMap, BTreeSet}; use chrono::{Datelike, NaiveDate}; use serde::Serialize; use thiserror::Error; use crate::broker::{BrokerExecutionReport, BrokerSimulator, MatchingType}; use crate::cost::CostModel; use crate::data::{BenchmarkSnapshot, DataSet, DataSetError, PriceField}; use crate::event_bus::{BacktestProcessMod, ProcessEventBus}; use crate::events::{ AccountEvent, FillEvent, OrderEvent, OrderSide, OrderStatus, PositionEvent, ProcessEvent, ProcessEventKind, }; use crate::futures::{ FuturesAccountState, FuturesExecutionReport, FuturesOrderIntent, FuturesPositionEffect, FuturesTransactionCostModel, }; use crate::metrics::{BacktestMetrics, compute_backtest_metrics}; use crate::portfolio::{CashReceivable, HoldingSummary, PortfolioState}; use crate::rules::EquityRuleHooks; use crate::scheduler::{ScheduleRule, ScheduleStage, Scheduler, default_stage_time}; use crate::strategy::{Strategy, StrategyContext}; #[derive(Debug, Error)] pub enum BacktestError { #[error(transparent)] Data(#[from] DataSetError), #[error("missing {field} price for {symbol} on {date}")] MissingPrice { date: NaiveDate, symbol: String, field: &'static str, }, #[error("benchmark snapshot missing for {date}")] MissingBenchmark { date: NaiveDate }, #[error("{0}")] Execution(String), } #[derive(Debug, Clone)] pub struct BacktestConfig { pub initial_cash: f64, pub benchmark_code: String, pub start_date: Option, pub end_date: Option, pub decision_lag_trading_days: usize, pub execution_price_field: PriceField, } #[derive(Debug, Clone, Serialize)] pub struct DailyEquityPoint { #[serde(with = "date_format")] pub date: NaiveDate, pub cash: f64, pub market_value: f64, pub total_equity: f64, pub benchmark_close: f64, pub notes: String, pub diagnostics: String, } #[derive(Debug, Clone)] pub struct BacktestResult { pub strategy_name: String, pub equity_curve: Vec, pub benchmark_series: Vec, pub order_events: Vec, pub fills: Vec, pub position_events: Vec, pub account_events: Vec, pub process_events: Vec, pub holdings_summary: Vec, pub daily_holdings: Vec, pub metrics: BacktestMetrics, } #[derive(Debug, Clone, Serialize)] pub struct AnalyzerTradeRow { #[serde(with = "date_format")] pub date: NaiveDate, pub order_id: Option, pub symbol: String, pub side: OrderSide, pub quantity: u32, pub price: f64, pub gross_amount: f64, pub transaction_cost: f64, pub net_cash_flow: f64, pub reason: String, } #[derive(Debug, Clone, Serialize)] pub struct AnalyzerPositionRow { #[serde(with = "date_format")] pub date: NaiveDate, pub symbol: String, pub quantity: u32, pub market_value: f64, pub weight: f64, pub average_cost: f64, pub realized_pnl: f64, pub unrealized_pnl: f64, pub transaction_cost: f64, } #[derive(Debug, Clone, Serialize)] pub struct AnalyzerMonthlyReturnRow { pub year: i32, pub month: u32, pub portfolio_return: f64, pub benchmark_return: f64, pub excess_return: f64, } #[derive(Debug, Clone, Serialize)] pub struct AnalyzerRiskSummary { pub total_return: f64, pub annual_return: f64, pub benchmark_cumulative_return: f64, pub excess_cumulative_return: f64, pub alpha: f64, pub beta: f64, pub sharpe: f64, pub sortino: f64, pub information_ratio: f64, pub tracking_error: f64, pub volatility: f64, pub max_drawdown: f64, pub max_drawdown_duration_days: usize, pub win_rate: f64, pub excess_win_rate: f64, } #[derive(Debug, Clone, Serialize)] pub struct AnalyzerReport { pub strategy_name: String, pub trades: Vec, pub positions: Vec, pub monthly_returns: Vec, pub risk_summary: AnalyzerRiskSummary, pub equity_curve: Vec, pub benchmark_series: Vec, pub metrics: BacktestMetrics, } impl BacktestResult { pub fn analyzer_report(&self) -> AnalyzerReport { AnalyzerReport { strategy_name: self.strategy_name.clone(), trades: self .fills .iter() .map(|fill| AnalyzerTradeRow { date: fill.date, order_id: fill.order_id, symbol: fill.symbol.clone(), side: fill.side, quantity: fill.quantity, price: fill.price, gross_amount: fill.gross_amount, transaction_cost: fill.commission + fill.stamp_tax, net_cash_flow: fill.net_cash_flow, reason: fill.reason.clone(), }) .collect(), positions: self .daily_holdings .iter() .map(|holding| AnalyzerPositionRow { date: holding.date, symbol: holding.symbol.clone(), quantity: holding.quantity, market_value: holding.market_value, weight: holding.value_percent, average_cost: holding.average_cost, realized_pnl: holding.realized_pnl, unrealized_pnl: holding.unrealized_pnl, transaction_cost: holding.transaction_cost, }) .collect(), monthly_returns: self.analyzer_monthly_returns(), risk_summary: self.analyzer_risk_summary(), equity_curve: self.equity_curve.clone(), benchmark_series: self.benchmark_series.clone(), metrics: self.metrics.clone(), } } pub fn analyzer_report_json(&self) -> Result { serde_json::to_string_pretty(&self.analyzer_report()) } pub fn analyzer_monthly_returns(&self) -> Vec { let mut month_points = BTreeMap::<(i32, u32), (f64, f64, f64, f64)>::new(); for point in &self.equity_curve { let key = (point.date.year(), point.date.month()); month_points .entry(key) .and_modify(|(_, _, end_equity, end_benchmark)| { *end_equity = point.total_equity; *end_benchmark = point.benchmark_close; }) .or_insert(( point.total_equity, point.benchmark_close, point.total_equity, point.benchmark_close, )); } month_points .into_iter() .map( |((year, month), (start_equity, start_benchmark, end_equity, end_benchmark))| { let portfolio_return = analyzer_ratio_change(start_equity, end_equity); let benchmark_return = analyzer_ratio_change(start_benchmark, end_benchmark); AnalyzerMonthlyReturnRow { year, month, portfolio_return, benchmark_return, excess_return: portfolio_return - benchmark_return, } }, ) .collect() } pub fn analyzer_risk_summary(&self) -> AnalyzerRiskSummary { AnalyzerRiskSummary { total_return: self.metrics.total_return, annual_return: self.metrics.annual_return, benchmark_cumulative_return: self.metrics.benchmark_cumulative_return, excess_cumulative_return: self.metrics.excess_cumulative_return, alpha: self.metrics.alpha, beta: self.metrics.beta, sharpe: self.metrics.sharpe, sortino: self.metrics.sortino, information_ratio: self.metrics.information_ratio, tracking_error: self.metrics.tracking_error, volatility: self.metrics.volatility, max_drawdown: self.metrics.max_drawdown, max_drawdown_duration_days: self.metrics.max_drawdown_duration_days, win_rate: self.metrics.win_rate, excess_win_rate: self.metrics.excess_win_rate, } } } #[derive(Debug, Clone, Serialize)] pub struct BacktestDayProgress { #[serde(with = "date_format")] pub date: NaiveDate, pub cash: f64, pub market_value: f64, pub total_equity: f64, pub unit_nav: f64, pub total_return: f64, pub benchmark_close: f64, pub daily_fill_count: usize, pub cumulative_trade_count: usize, pub holding_count: usize, pub notes: String, pub diagnostics: String, pub orders: Vec, pub fills: Vec, pub holdings: Vec, pub process_events: Vec, } #[derive(Debug, Clone)] struct FuturesOpenOrder { order_id: u64, intent: FuturesOrderIntent, requested_quantity: u32, filled_quantity: u32, remaining_quantity: u32, limit_price: f64, reason: String, } pub struct BacktestEngine { data: DataSet, strategy: S, broker: BrokerSimulator, config: BacktestConfig, dividend_reinvestment: bool, process_event_bus: ProcessEventBus, dynamic_universe: Option>, subscriptions: BTreeSet, futures_account: Option, next_futures_order_id: u64, futures_open_orders: Vec, futures_expirations: BTreeMap>, futures_settlement_price_mode: String, futures_cost_model: FuturesTransactionCostModel, } impl BacktestEngine { pub fn new( data: DataSet, strategy: S, broker: BrokerSimulator, config: BacktestConfig, ) -> Self { Self { data, strategy, broker, config, dividend_reinvestment: false, process_event_bus: ProcessEventBus::new(), dynamic_universe: None, subscriptions: BTreeSet::new(), futures_account: None, next_futures_order_id: 9_000_000_000, futures_open_orders: Vec::new(), futures_expirations: BTreeMap::new(), futures_settlement_price_mode: "close".to_string(), futures_cost_model: FuturesTransactionCostModel::default(), } } pub fn with_dividend_reinvestment(mut self, enabled: bool) -> Self { self.dividend_reinvestment = enabled; self } pub fn with_futures_account(mut self, account: FuturesAccountState) -> Self { self.futures_account = Some(account); self } pub fn with_futures_initial_cash(self, initial_cash: f64) -> Self { self.with_futures_account(FuturesAccountState::new(initial_cash)) } pub fn futures_account(&self) -> Option<&FuturesAccountState> { self.futures_account.as_ref() } pub fn futures_account_mut(&mut self) -> Option<&mut FuturesAccountState> { self.futures_account.as_mut() } pub fn with_futures_expiration( mut self, date: NaiveDate, symbol: impl Into, settlement_price: f64, ) -> Self { self.futures_expirations .entry(date) .or_default() .insert(symbol.into(), settlement_price); self } pub fn with_futures_expirations( mut self, expirations: BTreeMap>, ) -> Self { self.futures_expirations = expirations; self } pub fn with_futures_settlement_price_mode(mut self, mode: impl Into) -> Self { self.futures_settlement_price_mode = mode.into(); self } pub fn with_futures_transaction_cost_model( mut self, cost_model: FuturesTransactionCostModel, ) -> Self { self.futures_cost_model = cost_model; self } pub fn process_event_bus_mut(&mut self) -> &mut ProcessEventBus { &mut self.process_event_bus } pub fn add_process_listener(&mut self, kind: ProcessEventKind, listener: F) where F: FnMut(&ProcessEvent) + 'static, { self.process_event_bus.add_listener(kind, listener); } pub fn add_any_process_listener(&mut self, listener: F) where F: FnMut(&ProcessEvent) + 'static, { self.process_event_bus.add_any_listener(listener); } pub fn install_process_mod(&mut self, module: &mut M) where M: BacktestProcessMod, { self.process_event_bus.install_mod(module); } } impl BacktestEngine where S: Strategy, C: CostModel, R: EquityRuleHooks, { fn apply_strategy_directives( &mut self, execution_date: NaiveDate, decision_date: NaiveDate, decision_index: usize, portfolio: &mut PortfolioState, open_orders: &[crate::strategy::OpenOrderView], process_events: &mut Vec, decision: &mut crate::strategy::StrategyDecision, directive_report: &mut BrokerExecutionReport, ) -> Result<(), BacktestError> { if decision.order_intents.is_empty() { return Ok(()); } let mut retained = Vec::with_capacity(decision.order_intents.len()); for intent in decision.order_intents.drain(..) { match intent { crate::strategy::OrderIntent::UpdateUniverse { symbols, reason } => { let symbol_count = symbols.len(); self.dynamic_universe = Some(symbols.clone()); decision .diagnostics .push(format!("dynamic_universe_updated count={symbol_count}")); publish_custom_process_event( &mut self.strategy, &mut self.process_event_bus, execution_date, decision_date, decision_index, &self.data, portfolio, self.futures_account.as_ref(), open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, process_events, ProcessEvent { date: execution_date, kind: ProcessEventKind::UniverseUpdated, order_id: None, symbol: (symbol_count == 1) .then(|| symbols.iter().next().cloned()) .flatten(), side: None, detail: format!( "reason={reason} count={symbol_count} symbols={}", symbols.iter().cloned().collect::>().join(",") ), }, )?; } crate::strategy::OrderIntent::Subscribe { symbols, reason } => { let mut added = Vec::new(); for symbol in symbols { if self.subscriptions.insert(symbol.clone()) { added.push(symbol); } } if !added.is_empty() { decision.diagnostics.push(format!( "subscriptions_added count={} total={}", added.len(), self.subscriptions.len() )); publish_custom_process_event( &mut self.strategy, &mut self.process_event_bus, execution_date, decision_date, decision_index, &self.data, portfolio, self.futures_account.as_ref(), open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, process_events, ProcessEvent { date: execution_date, kind: ProcessEventKind::UniverseSubscribed, order_id: None, symbol: (added.len() == 1).then(|| added[0].clone()), side: None, detail: format!( "reason={reason} count={} symbols={}", added.len(), added.join(",") ), }, )?; } } crate::strategy::OrderIntent::Unsubscribe { symbols, reason } => { let mut removed = Vec::new(); for symbol in symbols { if self.subscriptions.remove(&symbol) { removed.push(symbol); } } if !removed.is_empty() { decision.diagnostics.push(format!( "subscriptions_removed count={} total={}", removed.len(), self.subscriptions.len() )); publish_custom_process_event( &mut self.strategy, &mut self.process_event_bus, execution_date, decision_date, decision_index, &self.data, portfolio, self.futures_account.as_ref(), open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, process_events, ProcessEvent { date: execution_date, kind: ProcessEventKind::UniverseUnsubscribed, order_id: None, symbol: (removed.len() == 1).then(|| removed[0].clone()), side: None, detail: format!( "reason={reason} count={} symbols={}", removed.len(), removed.join(",") ), }, )?; } } crate::strategy::OrderIntent::DepositWithdraw { amount, receiving_days, reason, } => { let cash_before = portfolio.cash(); if receiving_days == 0 { portfolio .deposit_withdraw(amount) .map_err(BacktestError::Execution)?; directive_report.account_events.push(AccountEvent { date: execution_date, cash_before, cash_after: portfolio.cash(), total_equity: portfolio.total_equity(), note: format!("deposit_withdraw amount={amount:.2} reason={reason}"), }); } else { let payable_date = self .data .next_trading_date(execution_date, receiving_days) .ok_or_else(|| { BacktestError::Execution(format!( "no trading date for deposit_withdraw receiving_days={receiving_days} from {execution_date}" )) })?; portfolio .schedule_deposit_withdraw(payable_date, amount, reason.clone()) .map_err(BacktestError::Execution)?; directive_report.account_events.push(AccountEvent { date: execution_date, cash_before, cash_after: portfolio.cash(), total_equity: portfolio.total_equity(), note: format!( "deposit_withdraw_scheduled amount={amount:.2} payable_date={payable_date} reason={reason}" ), }); } decision.diagnostics.push(format!( "account_deposit_withdraw amount={amount:.2} receiving_days={receiving_days}" )); publish_custom_process_event( &mut self.strategy, &mut self.process_event_bus, execution_date, decision_date, decision_index, &self.data, &*portfolio, self.futures_account.as_ref(), open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, process_events, ProcessEvent { date: execution_date, kind: ProcessEventKind::AccountDepositWithdraw, order_id: None, symbol: None, side: None, detail: format!( "reason={reason} amount={amount:.2} receiving_days={receiving_days} cash_before={cash_before:.2} cash_after={:.2}", portfolio.cash() ), }, )?; } crate::strategy::OrderIntent::FinanceRepay { amount, reason } => { let cash_before = portfolio.cash(); let liabilities_before = portfolio.cash_liabilities(); portfolio .finance_repay(amount) .map_err(BacktestError::Execution)?; directive_report.account_events.push(AccountEvent { date: execution_date, cash_before, cash_after: portfolio.cash(), total_equity: portfolio.total_equity(), note: format!( "finance_repay amount={amount:.2} liabilities_before={liabilities_before:.2} liabilities_after={:.2} reason={reason}", portfolio.cash_liabilities() ), }); decision.diagnostics.push(format!( "account_finance_repay amount={amount:.2} liabilities={:.2}", portfolio.cash_liabilities() )); publish_custom_process_event( &mut self.strategy, &mut self.process_event_bus, execution_date, decision_date, decision_index, &self.data, &*portfolio, self.futures_account.as_ref(), open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, process_events, ProcessEvent { date: execution_date, kind: ProcessEventKind::AccountFinanceRepay, order_id: None, symbol: None, side: None, detail: format!( "reason={reason} amount={amount:.2} cash_before={cash_before:.2} cash_after={:.2} liabilities_before={liabilities_before:.2} liabilities_after={:.2}", portfolio.cash(), portfolio.cash_liabilities() ), }, )?; } crate::strategy::OrderIntent::SetManagementFeeRate { rate, reason } => { portfolio .set_management_fee_rate(rate) .map_err(BacktestError::Execution)?; decision .diagnostics .push(format!("account_management_fee_rate rate={rate:.6}")); publish_custom_process_event( &mut self.strategy, &mut self.process_event_bus, execution_date, decision_date, decision_index, &self.data, &*portfolio, self.futures_account.as_ref(), open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, process_events, ProcessEvent { date: execution_date, kind: ProcessEventKind::AccountManagementFee, order_id: None, symbol: None, side: None, detail: format!( "reason={reason} rate={rate:.6} management_fees={:.2}", portfolio.management_fees() ), }, )?; } crate::strategy::OrderIntent::CancelOrder { order_id, reason } => { let report = self.cancel_futures_open_order(execution_date, order_id, &reason); if report.order_events.is_empty() && report.process_events.is_empty() { retained .push(crate::strategy::OrderIntent::CancelOrder { order_id, reason }); } else { merge_futures_report(directive_report, report); } } crate::strategy::OrderIntent::CancelSymbol { symbol, reason } => { let report = self.cancel_futures_open_orders_for_symbol( execution_date, &symbol, &reason, ); if report.order_events.is_empty() && report.process_events.is_empty() { retained .push(crate::strategy::OrderIntent::CancelSymbol { symbol, reason }); } else { merge_futures_report(directive_report, report); } } crate::strategy::OrderIntent::CancelAll { reason } => { let report = self.cancel_all_futures_open_orders(execution_date, &reason); let has_stock_open_orders = !self.broker.open_order_views().is_empty(); if has_stock_open_orders || report.order_events.is_empty() { retained.push(crate::strategy::OrderIntent::CancelAll { reason: reason.clone(), }); } merge_futures_report(directive_report, report); } crate::strategy::OrderIntent::Futures { intent } => { let order_id = self.next_futures_order_id; self.next_futures_order_id += 1; let report = self.submit_futures_order(execution_date, order_id, intent, false); decision.diagnostics.push(format!( "futures_order order_id={order_id} events={}", report.order_events.len() )); merge_futures_report(directive_report, report); } other => retained.push(other), } } decision.order_intents = retained; Ok(()) } fn open_order_views(&self) -> Vec { let mut views = self.broker.open_order_views(); views.extend( self.futures_open_orders .iter() .map(|order| crate::strategy::OpenOrderView { order_id: order.order_id, symbol: order.intent.symbol.clone(), side: order.intent.side(), requested_quantity: order.requested_quantity, filled_quantity: order.filled_quantity, remaining_quantity: order.remaining_quantity, unfilled_quantity: order.remaining_quantity, status: OrderStatus::Pending, avg_price: 0.0, transaction_cost: 0.0, limit_price: order.limit_price, reason: order.reason.clone(), }), ); views.sort_by_key(|order| order.order_id); views } fn aggregate_initial_cash(&self) -> f64 { self.config.initial_cash + self .futures_account .as_ref() .map(FuturesAccountState::starting_cash) .unwrap_or(0.0) } fn aggregate_cash(&self, portfolio: &PortfolioState) -> f64 { portfolio.cash() + self .futures_account .as_ref() .map(FuturesAccountState::cash) .unwrap_or(0.0) } fn aggregate_market_value(&self, portfolio: &PortfolioState) -> f64 { portfolio.market_value() + self .futures_account .as_ref() .map(FuturesAccountState::position_equity) .unwrap_or(0.0) } fn aggregate_total_equity(&self, portfolio: &PortfolioState) -> f64 { portfolio.total_equity() + self .futures_account .as_ref() .map(FuturesAccountState::total_value) .unwrap_or(0.0) } fn submit_futures_order( &mut self, date: NaiveDate, order_id: u64, intent: FuturesOrderIntent, from_pending: bool, ) -> FuturesExecutionReport { let Some(_) = self.futures_account.as_ref() else { return self.reject_futures_order( date, order_id, intent, "futures account is not enabled".to_string(), ); }; if let Some(reason) = self.validate_futures_submission(&intent) { return self.reject_futures_order(date, order_id, intent, reason); } let original_requested = intent.quantity; let mut intent = self.resolve_futures_trading_parameters(date, intent); let fill = self.resolve_futures_fill(date, &intent); let Some((execution_price, fill_quantity)) = fill else { if intent.allow_pending || intent.limit_price.is_some() { return self.queue_futures_order( date, order_id, intent, original_requested, 0, from_pending, "limit not matched or no executable futures price", ); } return self.reject_futures_order( date, order_id, intent, "missing executable futures price".to_string(), ); }; if fill_quantity == 0 { if intent.allow_pending || intent.limit_price.is_some() { return self.queue_futures_order( date, order_id, intent, original_requested, 0, from_pending, "futures liquidity unavailable", ); } return self.reject_futures_order( date, order_id, intent, "futures liquidity unavailable".to_string(), ); } let remaining = original_requested.saturating_sub(fill_quantity); intent.price = execution_price; intent.quantity = fill_quantity; intent = self.resolve_futures_transaction_cost(date, intent); let mut report = self .futures_account .as_mut() .expect("checked futures account") .execute_order(date, Some(order_id), intent.clone()); if remaining > 0 && (intent.allow_pending || intent.limit_price.is_some()) { for event in &mut report.order_events { if event.order_id == Some(order_id) { event.requested_quantity = original_requested; event.filled_quantity = fill_quantity; event.status = OrderStatus::PartiallyFilled; } } let mut remaining_intent = intent.clone(); remaining_intent.quantity = remaining; remaining_intent.transaction_cost = 0.0; let queued = self.queue_futures_order( date, order_id, remaining_intent, original_requested, fill_quantity, true, "partial fill remaining quantity pending", ); report.order_events.extend(queued.order_events); report.process_events.extend(queued.process_events); report.diagnostics.extend(queued.diagnostics); } else if remaining > 0 { for event in &mut report.order_events { if event.order_id == Some(order_id) { event.requested_quantity = original_requested; event.filled_quantity = fill_quantity; event.status = OrderStatus::PartiallyFilled; event.reason.push_str(": remaining quantity canceled"); } } } report } fn process_futures_open_orders(&mut self, date: NaiveDate) -> BrokerExecutionReport { let pending = std::mem::take(&mut self.futures_open_orders); let mut combined = BrokerExecutionReport::default(); for mut order in pending { order.intent.quantity = order.remaining_quantity; let report = self.submit_futures_order(date, order.order_id, order.intent, true); merge_futures_report(&mut combined, report); } combined } fn queue_futures_order( &mut self, date: NaiveDate, order_id: u64, intent: FuturesOrderIntent, requested_quantity: u32, filled_quantity: u32, _from_pending: bool, reason: &str, ) -> FuturesExecutionReport { let mut report = FuturesExecutionReport::default(); let side = intent.side(); let limit_price = intent.limit_price.unwrap_or(intent.price); self.futures_open_orders.push(FuturesOpenOrder { order_id, requested_quantity, filled_quantity, remaining_quantity: intent.quantity, limit_price, reason: format!("{}: {reason}", intent.reason), intent, }); report.order_events.push(OrderEvent { date, order_id: Some(order_id), symbol: self .futures_open_orders .last() .map(|order| order.intent.symbol.clone()) .unwrap_or_default(), side, requested_quantity, filled_quantity, status: OrderStatus::Pending, reason: reason.to_string(), }); report.process_events.push(ProcessEvent { date, kind: ProcessEventKind::OrderCreationPass, order_id: Some(order_id), symbol: self .futures_open_orders .last() .map(|order| order.intent.symbol.clone()), side: Some(side), detail: format!("futures pending limit_price={limit_price:.6} reason={reason}"), }); report } fn reject_futures_order( &self, date: NaiveDate, order_id: u64, intent: FuturesOrderIntent, reason: String, ) -> FuturesExecutionReport { let side = intent.side(); let mut report = FuturesExecutionReport::default(); report.order_events.push(OrderEvent { date, order_id: Some(order_id), symbol: intent.symbol.clone(), side, requested_quantity: intent.quantity, filled_quantity: 0, status: OrderStatus::Rejected, reason: format!( "{}: {reason} direction={} effect={}", intent.reason, intent.direction.as_str(), intent.effect.as_str() ), }); report.process_events.push(ProcessEvent { date, kind: ProcessEventKind::OrderCreationReject, order_id: Some(order_id), symbol: Some(intent.symbol), side: Some(side), detail: reason, }); report } fn validate_futures_submission(&self, intent: &FuturesOrderIntent) -> Option { if intent.quantity == 0 { return Some("zero futures quantity".to_string()); } if let Some(limit_price) = intent.limit_price { if !limit_price.is_finite() || limit_price <= 0.0 { return Some("invalid futures limit price".to_string()); } for order in &self.futures_open_orders { if order.intent.symbol != intent.symbol || order.intent.side() == intent.side() { continue; } let existing_limit = order.limit_price; let crosses = match intent.side() { OrderSide::Buy => limit_price >= existing_limit, OrderSide::Sell => limit_price <= existing_limit, }; if crosses { return Some(format!( "self-trade risk with futures open order {}", order.order_id )); } } } None } fn resolve_futures_trading_parameters( &self, date: NaiveDate, mut intent: FuturesOrderIntent, ) -> FuturesOrderIntent { if let Some(params) = self.data.futures_trading_parameter(date, &intent.symbol) { intent.spec = params.spec(); } intent } fn resolve_futures_transaction_cost( &self, date: NaiveDate, mut intent: FuturesOrderIntent, ) -> FuturesOrderIntent { if intent.transaction_cost > 0.0 { return intent; } if let Some(params) = self.data.futures_trading_parameter(date, &intent.symbol) { let close_today_quantity = self.futures_close_today_quantity(&intent); intent.transaction_cost = self.futures_cost_model.calculate( params, intent.effect, intent.price, intent.quantity, close_today_quantity, ); } intent } fn futures_close_today_quantity(&self, intent: &FuturesOrderIntent) -> u32 { match intent.effect { FuturesPositionEffect::Open | FuturesPositionEffect::CloseYesterday => 0, FuturesPositionEffect::CloseToday => intent.quantity, FuturesPositionEffect::Close => self .futures_account .as_ref() .and_then(|account| account.position(&intent.symbol, intent.direction)) .map(|position| intent.quantity.saturating_sub(position.old_quantity)) .unwrap_or(0), } } fn resolve_futures_fill( &self, date: NaiveDate, intent: &FuturesOrderIntent, ) -> Option<(f64, u32)> { if self.broker.execution_price_field() == PriceField::Last { if let Some(fill) = self.resolve_futures_intraday_fill(date, intent) { return Some(fill); } } if let Some(snapshot) = self.data.market(date, &intent.symbol) { if snapshot.paused { return None; } let price = match self.broker.execution_price_field() { PriceField::DayOpen => snapshot.day_open, PriceField::Open => snapshot.open, PriceField::Close => snapshot.close, PriceField::Last => match intent.side() { OrderSide::Buy => snapshot.buy_price(PriceField::Last), OrderSide::Sell => snapshot.sell_price(PriceField::Last), }, }; if !self.futures_price_can_trade(snapshot, intent.side(), price, intent.limit_price) { return None; } return Some((price, intent.quantity)); } if intent.price.is_finite() && intent.price > 0.0 { if futures_limit_satisfied(intent.side(), intent.price, intent.limit_price) { return Some((intent.price, intent.quantity)); } } None } fn resolve_futures_intraday_fill( &self, date: NaiveDate, intent: &FuturesOrderIntent, ) -> Option<(f64, u32)> { let snapshot = self.data.market(date, &intent.symbol); let quotes = self.data.execution_quotes_on(date, &intent.symbol); for quote in quotes { let price = match self.broker.matching_type() { MatchingType::NextTickBestOwn => match intent.side() { OrderSide::Buy => { if quote.bid1.is_finite() && quote.bid1 > 0.0 { quote.bid1 } else { quote.last_price } } OrderSide::Sell => { if quote.ask1.is_finite() && quote.ask1 > 0.0 { quote.ask1 } else { quote.last_price } } }, MatchingType::NextTickBestCounterparty | MatchingType::CounterpartyOffer => { match intent.side() { OrderSide::Buy => quote.buy_price().unwrap_or(quote.last_price), OrderSide::Sell => quote.sell_price().unwrap_or(quote.last_price), } } _ => quote.last_price, }; if let Some(snapshot) = snapshot { if !self.futures_price_can_trade(snapshot, intent.side(), price, intent.limit_price) { continue; } } else if !futures_limit_satisfied(intent.side(), price, intent.limit_price) { continue; } let top_level_quantity = match intent.side() { OrderSide::Buy => quote.ask1_volume, OrderSide::Sell => quote.bid1_volume, } .max(quote.volume_delta) .min(u32::MAX as u64) as u32; let fill_quantity = if top_level_quantity == 0 { intent.quantity } else { intent.quantity.min(top_level_quantity) }; if price.is_finite() && price > 0.0 && fill_quantity > 0 { return Some((price, fill_quantity)); } } None } fn futures_price_can_trade( &self, snapshot: &crate::data::DailyMarketSnapshot, side: OrderSide, price: f64, limit_price: Option, ) -> bool { if !price.is_finite() || price <= 0.0 { return false; } if !futures_limit_satisfied(side, price, limit_price) { return false; } match side { OrderSide::Buy => !snapshot.is_at_upper_limit_price(price), OrderSide::Sell => !snapshot.is_at_lower_limit_price(price), } } fn cancel_futures_open_order( &mut self, date: NaiveDate, order_id: u64, reason: &str, ) -> FuturesExecutionReport { let Some(index) = self .futures_open_orders .iter() .position(|order| order.order_id == order_id) else { return FuturesExecutionReport::default(); }; let order = self.futures_open_orders.remove(index); futures_cancel_report(date, order, reason) } fn cancel_futures_open_orders_for_symbol( &mut self, date: NaiveDate, symbol: &str, reason: &str, ) -> FuturesExecutionReport { let mut report = FuturesExecutionReport::default(); let mut retained = Vec::with_capacity(self.futures_open_orders.len()); let mut canceled = Vec::new(); for order in self.futures_open_orders.drain(..) { if order.intent.symbol == symbol { canceled.push(order); } else { retained.push(order); } } self.futures_open_orders = retained; for order in canceled { merge_futures_execution_report(&mut report, futures_cancel_report(date, order, reason)); } report } fn cancel_all_futures_open_orders( &mut self, date: NaiveDate, reason: &str, ) -> FuturesExecutionReport { let mut report = FuturesExecutionReport::default(); for order in std::mem::take(&mut self.futures_open_orders) { merge_futures_execution_report(&mut report, futures_cancel_report(date, order, reason)); } report } pub fn run(&mut self) -> Result { self.run_with_progress(|_| {}) } pub fn run_with_progress( &mut self, mut on_progress: F, ) -> Result where F: FnMut(&BacktestDayProgress), { let mut portfolio = PortfolioState::new(self.config.initial_cash); let scheduler_calendar = self.data.calendar().clone(); let scheduler = Scheduler::new(&scheduler_calendar); let execution_dates = self .data .calendar() .iter() .filter(|date| { self.config .start_date .map(|start| *date >= start) .unwrap_or(true) }) .filter(|date| self.config.end_date.map(|end| *date <= end).unwrap_or(true)) .filter(|date| { !self.data.factor_snapshots_on(*date).is_empty() && !self.data.candidate_snapshots_on(*date).is_empty() }) .collect::>(); let mut result = BacktestResult { strategy_name: self.strategy.name().to_string(), benchmark_series: self .data .benchmark_series() .into_iter() .filter(|row| { self.config .start_date .map(|start| row.date >= start) .unwrap_or(true) }) .filter(|row| { self.config .end_date .map(|end| row.date <= end) .unwrap_or(true) }) .collect(), order_events: Vec::new(), fills: Vec::new(), position_events: Vec::new(), account_events: Vec::new(), process_events: Vec::new(), equity_curve: Vec::new(), holdings_summary: Vec::new(), daily_holdings: Vec::new(), metrics: BacktestMetrics::default(), }; for (execution_idx, execution_date) in execution_dates.iter().copied().enumerate() { let mut corporate_action_notes = Vec::new(); portfolio.begin_trading_day(); if let Some(account) = self.futures_account.as_mut() { account.begin_trading_day(); } let pending_cash_flow_report = self.settle_pending_cash_flows( execution_date, &mut portfolio, &mut corporate_action_notes, ); self.extend_result(&mut result, pending_cash_flow_report); let receivable_report = self.settle_cash_receivables( execution_date, &mut portfolio, &mut corporate_action_notes, )?; self.extend_result(&mut result, receivable_report); let corporate_action_report = self.apply_corporate_actions( execution_date, &mut portfolio, &mut corporate_action_notes, )?; self.extend_result(&mut result, corporate_action_report); let delisting_report = self.settle_delisted_positions( execution_date, &mut portfolio, &mut corporate_action_notes, )?; self.extend_result(&mut result, delisting_report); let futures_open_order_report = self.process_futures_open_orders(execution_date); self.extend_result(&mut result, futures_open_order_report); let decision_slot = execution_idx .checked_sub(self.config.decision_lag_trading_days) .map(|decision_idx| (decision_idx, execution_dates[decision_idx])); let (decision_index, decision_date) = decision_slot.unwrap_or((execution_idx, execution_date)); let mut process_events = Vec::new(); let mut directive_report = BrokerExecutionReport::default(); let pre_open_orders = self.open_order_views(); let schedule_rules = self.strategy.schedule_rules(); publish_phase_event( &mut self.strategy, &mut self.process_event_bus, execution_date, decision_date, decision_index, &self.data, &portfolio, self.futures_account.as_ref(), &pre_open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, &mut process_events, execution_date, ProcessEventKind::PreBeforeTrading, "before_trading:pre", )?; self.strategy.before_trading(&StrategyContext { execution_date, decision_date, decision_index, data: &self.data, portfolio: &portfolio, futures_account: self.futures_account.as_ref(), open_orders: &pre_open_orders, dynamic_universe: self.dynamic_universe.as_ref(), subscriptions: &self.subscriptions, process_events: &process_events, active_process_event: None, active_datetime: stage_datetime( execution_date, default_stage_time(ScheduleStage::BeforeTrading), ), order_events: result.order_events.as_slice(), fills: result.fills.as_slice(), })?; publish_phase_event( &mut self.strategy, &mut self.process_event_bus, execution_date, decision_date, decision_index, &self.data, &portfolio, self.futures_account.as_ref(), &pre_open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, &mut process_events, execution_date, ProcessEventKind::BeforeTrading, "before_trading", )?; let mut before_trading_decision = collect_scheduled_decisions( &mut self.strategy, &scheduler, execution_date, ScheduleStage::BeforeTrading, &schedule_rules, decision_date, decision_index, &self.data, &portfolio, self.futures_account.as_ref(), &pre_open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, &mut process_events, &mut self.process_event_bus, default_stage_time(ScheduleStage::BeforeTrading), result.order_events.as_slice(), result.fills.as_slice(), )?; self.apply_strategy_directives( execution_date, decision_date, decision_index, &mut portfolio, &pre_open_orders, &mut process_events, &mut before_trading_decision, &mut directive_report, )?; publish_phase_event( &mut self.strategy, &mut self.process_event_bus, execution_date, decision_date, decision_index, &self.data, &portfolio, self.futures_account.as_ref(), &pre_open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, &mut process_events, execution_date, ProcessEventKind::PostBeforeTrading, "before_trading:post", )?; publish_phase_event( &mut self.strategy, &mut self.process_event_bus, execution_date, decision_date, decision_index, &self.data, &portfolio, self.futures_account.as_ref(), &pre_open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, &mut process_events, execution_date, ProcessEventKind::PreOpenAuction, "open_auction:pre", )?; let mut auction_decision = collect_scheduled_decisions( &mut self.strategy, &scheduler, execution_date, ScheduleStage::OpenAuction, &schedule_rules, decision_date, decision_index, &self.data, &portfolio, self.futures_account.as_ref(), &pre_open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, &mut process_events, &mut self.process_event_bus, default_stage_time(ScheduleStage::OpenAuction), result.order_events.as_slice(), result.fills.as_slice(), )?; auction_decision.merge_from(self.strategy.open_auction(&StrategyContext { execution_date, decision_date, decision_index, data: &self.data, portfolio: &portfolio, futures_account: self.futures_account.as_ref(), open_orders: &pre_open_orders, dynamic_universe: self.dynamic_universe.as_ref(), subscriptions: &self.subscriptions, process_events: &process_events, active_process_event: None, active_datetime: stage_datetime( execution_date, default_stage_time(ScheduleStage::OpenAuction), ), order_events: result.order_events.as_slice(), fills: result.fills.as_slice(), })?); publish_phase_event( &mut self.strategy, &mut self.process_event_bus, execution_date, decision_date, decision_index, &self.data, &portfolio, self.futures_account.as_ref(), &pre_open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, &mut process_events, execution_date, ProcessEventKind::OpenAuction, "open_auction", )?; self.apply_strategy_directives( execution_date, decision_date, decision_index, &mut portfolio, &pre_open_orders, &mut process_events, &mut auction_decision, &mut directive_report, )?; let mut report = self.broker.execute( execution_date, &mut portfolio, &self.data, &auction_decision, )?; let post_auction_open_orders = self.open_order_views(); publish_process_events( &mut self.strategy, &mut self.process_event_bus, execution_date, decision_date, decision_index, &self.data, &portfolio, self.futures_account.as_ref(), &post_auction_open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, &mut process_events, &mut report.process_events, )?; publish_phase_event( &mut self.strategy, &mut self.process_event_bus, execution_date, decision_date, decision_index, &self.data, &portfolio, self.futures_account.as_ref(), &post_auction_open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, &mut process_events, execution_date, ProcessEventKind::PostOpenAuction, "open_auction:post", )?; publish_phase_event( &mut self.strategy, &mut self.process_event_bus, execution_date, decision_date, decision_index, &self.data, &portfolio, self.futures_account.as_ref(), &post_auction_open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, &mut process_events, execution_date, ProcessEventKind::PreOnDay, "on_day:pre", )?; let on_day_open_orders = self.open_order_views(); let mut decision = decision_slot .map(|(decision_idx, decision_date)| { self.strategy.on_day(&StrategyContext { execution_date, decision_date, decision_index: decision_idx, data: &self.data, portfolio: &portfolio, futures_account: self.futures_account.as_ref(), open_orders: &on_day_open_orders, dynamic_universe: self.dynamic_universe.as_ref(), subscriptions: &self.subscriptions, process_events: &process_events, active_process_event: None, active_datetime: stage_datetime( execution_date, default_stage_time(ScheduleStage::OnDay), ), order_events: result.order_events.as_slice(), fills: result.fills.as_slice(), }) }) .transpose()? .unwrap_or_default(); decision.merge_from(collect_scheduled_decisions( &mut self.strategy, &scheduler, execution_date, ScheduleStage::OnDay, &schedule_rules, decision_date, decision_index, &self.data, &portfolio, self.futures_account.as_ref(), &on_day_open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, &mut process_events, &mut self.process_event_bus, default_stage_time(ScheduleStage::OnDay), result.order_events.as_slice(), result.fills.as_slice(), )?); publish_phase_event( &mut self.strategy, &mut self.process_event_bus, execution_date, decision_date, decision_index, &self.data, &portfolio, self.futures_account.as_ref(), &on_day_open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, &mut process_events, execution_date, ProcessEventKind::OnDay, "on_day", )?; let bar_open_orders = self.open_order_views(); publish_phase_event( &mut self.strategy, &mut self.process_event_bus, execution_date, decision_date, decision_index, &self.data, &portfolio, self.futures_account.as_ref(), &bar_open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, &mut process_events, execution_date, ProcessEventKind::PreBar, "bar:pre", )?; decision.merge_from(collect_scheduled_decisions( &mut self.strategy, &scheduler, execution_date, ScheduleStage::Bar, &schedule_rules, decision_date, decision_index, &self.data, &portfolio, self.futures_account.as_ref(), &bar_open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, &mut process_events, &mut self.process_event_bus, default_stage_time(ScheduleStage::Bar), result.order_events.as_slice(), result.fills.as_slice(), )?); decision.merge_from(self.strategy.on_bar(&StrategyContext { execution_date, decision_date, decision_index, data: &self.data, portfolio: &portfolio, futures_account: self.futures_account.as_ref(), open_orders: &bar_open_orders, dynamic_universe: self.dynamic_universe.as_ref(), subscriptions: &self.subscriptions, process_events: &process_events, active_process_event: None, active_datetime: stage_datetime( execution_date, default_stage_time(ScheduleStage::Bar), ), order_events: result.order_events.as_slice(), fills: result.fills.as_slice(), })?); publish_phase_event( &mut self.strategy, &mut self.process_event_bus, execution_date, decision_date, decision_index, &self.data, &portfolio, self.futures_account.as_ref(), &bar_open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, &mut process_events, execution_date, ProcessEventKind::Bar, "bar", )?; self.apply_strategy_directives( execution_date, decision_date, decision_index, &mut portfolio, &on_day_open_orders, &mut process_events, &mut decision, &mut directive_report, )?; let mut intraday_report = self.broker .execute(execution_date, &mut portfolio, &self.data, &decision)?; let post_intraday_open_orders = self.open_order_views(); publish_process_events( &mut self.strategy, &mut self.process_event_bus, execution_date, decision_date, decision_index, &self.data, &portfolio, self.futures_account.as_ref(), &post_intraday_open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, &mut process_events, &mut intraday_report.process_events, )?; report.order_events.extend(intraday_report.order_events); report.fill_events.extend(intraday_report.fill_events); report .position_events .extend(intraday_report.position_events); report.account_events.extend(intraday_report.account_events); report.diagnostics.extend(intraday_report.diagnostics); publish_phase_event( &mut self.strategy, &mut self.process_event_bus, execution_date, decision_date, decision_index, &self.data, &portfolio, self.futures_account.as_ref(), &post_intraday_open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, &mut process_events, execution_date, ProcessEventKind::PostOnDay, "on_day:post", )?; publish_phase_event( &mut self.strategy, &mut self.process_event_bus, execution_date, decision_date, decision_index, &self.data, &portfolio, self.futures_account.as_ref(), &post_intraday_open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, &mut process_events, execution_date, ProcessEventKind::PostBar, "bar:post", )?; if should_run_tick_events(&schedule_rules, &self.subscriptions) { let filter_by_subscription = !self.subscriptions.is_empty(); let tick_quotes = self .data .execution_quotes_on_date(execution_date) .into_iter() .filter(|quote| { !filter_by_subscription || self.subscriptions.contains("e.symbol) }) .collect::>(); for quote in tick_quotes { let tick_time = quote.timestamp.time(); let tick_open_orders = self.open_order_views(); publish_phase_event( &mut self.strategy, &mut self.process_event_bus, execution_date, decision_date, decision_index, &self.data, &portfolio, self.futures_account.as_ref(), &tick_open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, &mut process_events, execution_date, ProcessEventKind::PreTick, format!("tick:{}:{}:pre", quote.symbol, quote.timestamp), )?; let mut tick_decision = collect_scheduled_decisions( &mut self.strategy, &scheduler, execution_date, ScheduleStage::Tick, &schedule_rules, decision_date, decision_index, &self.data, &portfolio, self.futures_account.as_ref(), &tick_open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, &mut process_events, &mut self.process_event_bus, Some(tick_time), result.order_events.as_slice(), result.fills.as_slice(), )?; tick_decision.merge_from(self.strategy.on_tick( &StrategyContext { execution_date, decision_date, decision_index, data: &self.data, portfolio: &portfolio, futures_account: self.futures_account.as_ref(), open_orders: &tick_open_orders, dynamic_universe: self.dynamic_universe.as_ref(), subscriptions: &self.subscriptions, process_events: &process_events, active_process_event: None, active_datetime: Some(quote.timestamp), order_events: result.order_events.as_slice(), fills: result.fills.as_slice(), }, "e, )?); publish_phase_event( &mut self.strategy, &mut self.process_event_bus, execution_date, decision_date, decision_index, &self.data, &portfolio, self.futures_account.as_ref(), &tick_open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, &mut process_events, execution_date, ProcessEventKind::Tick, format!("tick:{}:{}", quote.symbol, quote.timestamp), )?; self.apply_strategy_directives( execution_date, decision_date, decision_index, &mut portfolio, &tick_open_orders, &mut process_events, &mut tick_decision, &mut directive_report, )?; let mut tick_report = self.broker.execute_between( execution_date, &mut portfolio, &self.data, &tick_decision, Some(tick_time), Some(tick_time), )?; let post_tick_open_orders = self.open_order_views(); publish_process_events( &mut self.strategy, &mut self.process_event_bus, execution_date, decision_date, decision_index, &self.data, &portfolio, self.futures_account.as_ref(), &post_tick_open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, &mut process_events, &mut tick_report.process_events, )?; merge_broker_report(&mut report, tick_report); publish_phase_event( &mut self.strategy, &mut self.process_event_bus, execution_date, decision_date, decision_index, &self.data, &portfolio, self.futures_account.as_ref(), &post_tick_open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, &mut process_events, execution_date, ProcessEventKind::PostTick, format!("tick:{}:{}:post", quote.symbol, quote.timestamp), )?; } } portfolio.update_prices(execution_date, &self.data, PriceField::Close)?; let post_trade_open_orders = self.open_order_views(); let visible_order_events = result .order_events .iter() .cloned() .chain(report.order_events.iter().cloned()) .collect::>(); let visible_fills = result .fills .iter() .cloned() .chain(report.fill_events.iter().cloned()) .collect::>(); publish_phase_event( &mut self.strategy, &mut self.process_event_bus, execution_date, decision_date, decision_index, &self.data, &portfolio, self.futures_account.as_ref(), &post_trade_open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, &mut process_events, execution_date, ProcessEventKind::PreAfterTrading, "after_trading:pre", )?; self.strategy.after_trading(&StrategyContext { execution_date, decision_date, decision_index, data: &self.data, portfolio: &portfolio, futures_account: self.futures_account.as_ref(), open_orders: &post_trade_open_orders, dynamic_universe: self.dynamic_universe.as_ref(), subscriptions: &self.subscriptions, process_events: &process_events, active_process_event: None, active_datetime: stage_datetime( execution_date, default_stage_time(ScheduleStage::AfterTrading), ), order_events: visible_order_events.as_slice(), fills: visible_fills.as_slice(), })?; publish_phase_event( &mut self.strategy, &mut self.process_event_bus, execution_date, decision_date, decision_index, &self.data, &portfolio, self.futures_account.as_ref(), &post_trade_open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, &mut process_events, execution_date, ProcessEventKind::AfterTrading, "after_trading", )?; let mut after_trading_decision = collect_scheduled_decisions( &mut self.strategy, &scheduler, execution_date, ScheduleStage::AfterTrading, &schedule_rules, decision_date, decision_index, &self.data, &portfolio, self.futures_account.as_ref(), &post_trade_open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, &mut process_events, &mut self.process_event_bus, default_stage_time(ScheduleStage::AfterTrading), visible_order_events.as_slice(), visible_fills.as_slice(), )?; self.apply_strategy_directives( execution_date, decision_date, decision_index, &mut portfolio, &post_trade_open_orders, &mut process_events, &mut after_trading_decision, &mut directive_report, )?; let mut close_report = self.broker.after_trading(execution_date); publish_process_events( &mut self.strategy, &mut self.process_event_bus, execution_date, decision_date, decision_index, &self.data, &portfolio, self.futures_account.as_ref(), &post_trade_open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, &mut process_events, &mut close_report.process_events, )?; report.order_events.extend(close_report.order_events); report.fill_events.extend(close_report.fill_events); report.position_events.extend(close_report.position_events); report.account_events.extend(close_report.account_events); report.diagnostics.extend(close_report.diagnostics); let post_close_open_orders = self.open_order_views(); let visible_order_events_after_close = result .order_events .iter() .cloned() .chain(report.order_events.iter().cloned()) .collect::>(); let visible_fills_after_close = result .fills .iter() .cloned() .chain(report.fill_events.iter().cloned()) .collect::>(); publish_phase_event( &mut self.strategy, &mut self.process_event_bus, execution_date, decision_date, decision_index, &self.data, &portfolio, self.futures_account.as_ref(), &post_close_open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, &mut process_events, execution_date, ProcessEventKind::PostAfterTrading, "after_trading:post", )?; publish_phase_event( &mut self.strategy, &mut self.process_event_bus, execution_date, decision_date, decision_index, &self.data, &portfolio, self.futures_account.as_ref(), &post_close_open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, &mut process_events, execution_date, ProcessEventKind::PreSettlement, "settlement:pre", )?; self.strategy.on_settlement(&StrategyContext { execution_date, decision_date, decision_index, data: &self.data, portfolio: &portfolio, futures_account: self.futures_account.as_ref(), open_orders: &post_close_open_orders, dynamic_universe: self.dynamic_universe.as_ref(), subscriptions: &self.subscriptions, process_events: &process_events, active_process_event: None, active_datetime: stage_datetime( execution_date, default_stage_time(ScheduleStage::Settlement), ), order_events: visible_order_events_after_close.as_slice(), fills: visible_fills_after_close.as_slice(), })?; publish_phase_event( &mut self.strategy, &mut self.process_event_bus, execution_date, decision_date, decision_index, &self.data, &portfolio, self.futures_account.as_ref(), &post_close_open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, &mut process_events, execution_date, ProcessEventKind::Settlement, "settlement", )?; let mut settlement_decision = collect_scheduled_decisions( &mut self.strategy, &scheduler, execution_date, ScheduleStage::Settlement, &schedule_rules, decision_date, decision_index, &self.data, &portfolio, self.futures_account.as_ref(), &post_close_open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, &mut process_events, &mut self.process_event_bus, default_stage_time(ScheduleStage::Settlement), visible_order_events_after_close.as_slice(), visible_fills_after_close.as_slice(), )?; self.apply_strategy_directives( execution_date, decision_date, decision_index, &mut portfolio, &post_close_open_orders, &mut process_events, &mut settlement_decision, &mut directive_report, )?; let futures_daily_settlement_report = self.settle_futures_daily(execution_date); merge_broker_report(&mut directive_report, futures_daily_settlement_report); let futures_expiration_report = self.settle_futures_expirations(execution_date); merge_broker_report(&mut directive_report, futures_expiration_report); let dynamic_universe_snapshot = self.dynamic_universe.clone(); let subscriptions_snapshot = self.subscriptions.clone(); let management_fee_report = self.apply_management_fee( execution_date, decision_date, decision_index, &mut portfolio, &post_close_open_orders, dynamic_universe_snapshot.as_ref(), &subscriptions_snapshot, &mut process_events, visible_order_events_after_close.as_slice(), visible_fills_after_close.as_slice(), )?; merge_broker_report(&mut directive_report, management_fee_report); publish_phase_event( &mut self.strategy, &mut self.process_event_bus, execution_date, decision_date, decision_index, &self.data, &portfolio, self.futures_account.as_ref(), &post_close_open_orders, self.dynamic_universe.as_ref(), &self.subscriptions, &mut process_events, execution_date, ProcessEventKind::PostSettlement, "settlement:post", )?; merge_broker_report(&mut report, directive_report); let daily_fill_count = report.fill_events.len(); let day_orders = report.order_events.clone(); let day_fills = report.fill_events.clone(); let broker_diagnostics = report.diagnostics.clone(); self.extend_result(&mut result, report); let benchmark = self.data .benchmark(execution_date) .ok_or(BacktestError::MissingBenchmark { date: execution_date, })?; let notes = corporate_action_notes .into_iter() .chain(decision.notes.into_iter()) .collect::>() .join(" | "); let diagnostics = decision .diagnostics .into_iter() .chain(broker_diagnostics.into_iter()) .collect::>() .join(" | "); let holdings_for_day = portfolio.holdings_summary(execution_date); let day_process_events = process_events.clone(); let aggregate_initial_cash = self.aggregate_initial_cash(); let aggregate_cash = self.aggregate_cash(&portfolio); let aggregate_market_value = self.aggregate_market_value(&portfolio); let aggregate_total_equity = self.aggregate_total_equity(&portfolio); result.equity_curve.push(DailyEquityPoint { date: execution_date, cash: aggregate_cash, market_value: aggregate_market_value, total_equity: aggregate_total_equity, benchmark_close: benchmark.close, notes, diagnostics, }); result.daily_holdings.extend(holdings_for_day.clone()); let latest = result .equity_curve .last() .expect("equity point pushed for progress event"); on_progress(&BacktestDayProgress { date: execution_date, cash: latest.cash, market_value: latest.market_value, total_equity: latest.total_equity, unit_nav: if aggregate_initial_cash.abs() < f64::EPSILON { 0.0 } else { latest.total_equity / aggregate_initial_cash }, total_return: if aggregate_initial_cash.abs() < f64::EPSILON { 0.0 } else { (latest.total_equity / aggregate_initial_cash) - 1.0 }, benchmark_close: latest.benchmark_close, daily_fill_count, cumulative_trade_count: result.fills.len(), holding_count: holdings_for_day.len(), notes: latest.notes.clone(), diagnostics: latest.diagnostics.clone(), orders: day_orders, fills: day_fills, holdings: holdings_for_day, process_events: day_process_events, }); result.process_events.extend(process_events); } if let Some(last_date) = execution_dates.last().copied() { result.holdings_summary = portfolio.holdings_summary(last_date); } result.metrics = compute_backtest_metrics( &result.equity_curve, &result.fills, &result.daily_holdings, self.aggregate_initial_cash(), ); Ok(result) } fn extend_result( &self, result: &mut BacktestResult, report: BrokerExecutionReport, ) -> BrokerExecutionReport { result.order_events.extend(report.order_events.clone()); result.fills.extend(report.fill_events.clone()); result .position_events .extend(report.position_events.clone()); result.account_events.extend(report.account_events.clone()); result.process_events.extend(report.process_events.clone()); report } fn apply_corporate_actions( &self, date: NaiveDate, portfolio: &mut PortfolioState, notes: &mut Vec, ) -> Result { let mut report = BrokerExecutionReport::default(); for action in self.data.corporate_actions_on(date) { if !action.has_effect() { continue; } let Some(existing_position) = portfolio.position(&action.symbol) else { continue; }; if existing_position.quantity == 0 { continue; } if action.share_cash.abs() > f64::EPSILON { let cash_before = portfolio.cash(); let (cash_delta, quantity_after, average_cost) = { let position = portfolio .position_mut_if_exists(&action.symbol) .expect("position exists for dividend action"); let cash_delta = position.apply_cash_dividend(action.share_cash); (cash_delta, position.quantity, position.average_cost) }; if cash_delta.abs() > f64::EPSILON { let payable_date = action.payable_date.unwrap_or(date); let immediate_cash = payable_date <= date; let note = if immediate_cash { portfolio.apply_cash_delta(cash_delta); format!( "cash_dividend {} share_cash={:.6} quantity={} cash={:.2}", action.symbol, action.share_cash, quantity_after, cash_delta ) } else { portfolio.add_cash_receivable(CashReceivable { symbol: action.symbol.clone(), ex_date: date, payable_date, amount: cash_delta, reason: format!("cash_dividend {:.6}", action.share_cash), }); format!( "cash_dividend_receivable {} share_cash={:.6} quantity={} payable_date={} cash={:.2}", action.symbol, action.share_cash, quantity_after, payable_date, cash_delta ) }; notes.push(note.clone()); report.account_events.push(AccountEvent { date, cash_before, cash_after: portfolio.cash(), total_equity: portfolio.total_equity(), note, }); report.position_events.push(PositionEvent { date, symbol: action.symbol.clone(), delta_quantity: 0, quantity_after, average_cost, realized_pnl_delta: 0.0, reason: format!("cash_dividend {:.6}", action.share_cash), }); } } let split_ratio = action.split_ratio(); if (split_ratio - 1.0).abs() > f64::EPSILON { let (delta_quantity, quantity_after, average_cost) = { let position = portfolio .position_mut_if_exists(&action.symbol) .expect("position exists for split action"); let delta_quantity = position.apply_split_ratio(split_ratio); (delta_quantity, position.quantity, position.average_cost) }; if delta_quantity != 0 { let note = format!( "stock_split {} ratio={:.6} delta_qty={}", action.symbol, split_ratio, delta_quantity ); notes.push(note); report.position_events.push(PositionEvent { date, symbol: action.symbol.clone(), delta_quantity, quantity_after, average_cost, realized_pnl_delta: 0.0, reason: format!("stock_split {:.6}", split_ratio), }); } } if action.has_successor_conversion() { let successor_symbol = action .successor_symbol .as_deref() .expect("successor symbol checked"); let Some(outcome) = portfolio.apply_successor_conversion( &action.symbol, successor_symbol, action.successor_ratio_value(), action.successor_cash_value(), ) else { continue; }; let reason = format!( "successor_conversion {}->{} ratio={:.6} cash_per_share={:.6}", outcome.old_symbol, outcome.new_symbol, action.successor_ratio_value(), action.successor_cash_value() ); notes.push(reason.clone()); report.position_events.push(PositionEvent { date, symbol: outcome.old_symbol.clone(), delta_quantity: -(outcome.old_quantity as i32), quantity_after: 0, average_cost: 0.0, realized_pnl_delta: 0.0, reason: reason.clone(), }); report.position_events.push(PositionEvent { date, symbol: outcome.new_symbol.clone(), delta_quantity: outcome.new_quantity_delta, quantity_after: outcome.new_quantity_after, average_cost: outcome.new_average_cost_after, realized_pnl_delta: 0.0, reason: reason.clone(), }); if outcome.cash_delta.abs() > f64::EPSILON { let cash_before = portfolio.cash(); portfolio.apply_cash_delta(outcome.cash_delta); report.account_events.push(AccountEvent { date, cash_before, cash_after: portfolio.cash(), total_equity: portfolio.total_equity(), note: format!("{} cash={:.2}", reason, outcome.cash_delta), }); } } } portfolio.prune_flat_positions(); Ok(report) } fn settle_cash_receivables( &self, date: NaiveDate, portfolio: &mut PortfolioState, notes: &mut Vec, ) -> Result { let mut report = BrokerExecutionReport::default(); let settled = portfolio.settle_cash_receivables(date); for receivable in settled { let mut note = format!( "cash_receivable_settled {} ex_date={} payable_date={} cash={:.2}", receivable.symbol, receivable.ex_date, receivable.payable_date, receivable.amount ); let cash_before = portfolio.cash() - receivable.amount; if self.dividend_reinvestment && receivable.reason.starts_with("cash_dividend") && receivable.amount > 0.0 { let reinvest_price = portfolio .position(&receivable.symbol) .map(|position| position.last_price) .filter(|price| price.is_finite() && *price > 0.0) .or_else(|| { self.data .calendar() .previous_day(date) .and_then(|prev_date| { self.data.price_on_or_before( prev_date, &receivable.symbol, PriceField::Close, ) }) }); let round_lot = self .data .instrument(&receivable.symbol) .map(|instrument| instrument.round_lot.max(1)) .unwrap_or(100); if let Some(price) = reinvest_price { let raw_quantity = (receivable.amount / price).floor() as u32; let reinvest_quantity = (raw_quantity / round_lot) * round_lot; if reinvest_quantity > 0 { let reinvest_cash = reinvest_quantity as f64 * price; let residual_cash = receivable.amount - reinvest_cash; portfolio.apply_cash_delta(-reinvest_cash); portfolio.position_mut(&receivable.symbol).buy( date, reinvest_quantity, price, ); note = format!( "cash_receivable_reinvested {} ex_date={} payable_date={} cash={:.2} reinvest_qty={} reinvest_price={:.4} residual_cash={:.2}", receivable.symbol, receivable.ex_date, receivable.payable_date, receivable.amount, reinvest_quantity, price, residual_cash ); report.fill_events.push(FillEvent { date, order_id: None, symbol: receivable.symbol.clone(), side: OrderSide::Buy, quantity: reinvest_quantity, price, gross_amount: reinvest_cash, commission: 0.0, stamp_tax: 0.0, net_cash_flow: -reinvest_cash, reason: "dividend_reinvestment".to_string(), }); report.position_events.push(PositionEvent { date, symbol: receivable.symbol.clone(), delta_quantity: reinvest_quantity as i32, quantity_after: portfolio .position(&receivable.symbol) .map(|position| position.quantity) .unwrap_or(0), average_cost: portfolio .position(&receivable.symbol) .map(|position| position.average_cost) .unwrap_or(0.0), realized_pnl_delta: 0.0, reason: "dividend_reinvestment".to_string(), }); report.process_events.push(ProcessEvent { date, kind: ProcessEventKind::Trade, order_id: None, symbol: Some(receivable.symbol.clone()), side: Some(OrderSide::Buy), detail: format!( "dividend_reinvestment quantity={} price={}", reinvest_quantity, price ), }); } } } notes.push(note.clone()); report.account_events.push(AccountEvent { date, cash_before, cash_after: portfolio.cash(), total_equity: portfolio.total_equity(), note, }); } Ok(report) } fn settle_pending_cash_flows( &self, date: NaiveDate, portfolio: &mut PortfolioState, notes: &mut Vec, ) -> BrokerExecutionReport { let mut report = BrokerExecutionReport::default(); for flow in portfolio.settle_pending_cash_flows(date) { let cash_before = portfolio.cash() - flow.amount; let note = format!( "deposit_withdraw_settled amount={:.2} payable_date={} reason={}", flow.amount, flow.payable_date, flow.reason ); notes.push(note.clone()); report.account_events.push(AccountEvent { date, cash_before, cash_after: portfolio.cash(), total_equity: portfolio.total_equity(), note, }); } report } fn settle_futures_expirations(&mut self, date: NaiveDate) -> BrokerExecutionReport { let mut report = BrokerExecutionReport::default(); let Some(expirations) = self.futures_expirations.remove(&date) else { return report; }; let Some(account) = self.futures_account.as_mut() else { report.diagnostics.push(format!( "futures_expiration_skipped date={date} reason=no_future_account count={}", expirations.len() )); return report; }; for (symbol, settlement_price) in expirations { let futures_report = account.expire_contract(date, &symbol, settlement_price, "data_driven_expiration"); merge_futures_report(&mut report, futures_report); } report } fn settle_futures_daily(&mut self, date: NaiveDate) -> BrokerExecutionReport { let mut report = BrokerExecutionReport::default(); let Some(account) = self.futures_account.as_mut() else { return report; }; let settlement_prices = account .positions() .values() .filter_map(|position| { self.data .futures_settlement_price( date, &position.symbol, &self.futures_settlement_price_mode, ) .map(|price| (position.symbol.clone(), price)) }) .collect::>(); if settlement_prices.is_empty() { return report; } let cash_before = account.total_cash(); let cash_delta = account.settle(&settlement_prices); report.account_events.push(AccountEvent { date, cash_before, cash_after: account.total_cash(), total_equity: account.total_value(), note: format!( "futures_daily_settlement mode={} cash_delta={cash_delta:.2} symbols={}", self.futures_settlement_price_mode, settlement_prices .keys() .cloned() .collect::>() .join(",") ), }); report.process_events.push(ProcessEvent { date, kind: ProcessEventKind::Settlement, order_id: None, symbol: None, side: None, detail: format!( "futures_daily_settlement mode={} cash_delta={cash_delta:.2} count={}", self.futures_settlement_price_mode, settlement_prices.len() ), }); report } fn apply_management_fee( &mut self, execution_date: NaiveDate, decision_date: NaiveDate, decision_index: usize, portfolio: &mut PortfolioState, open_orders: &[crate::strategy::OpenOrderView], dynamic_universe: Option<&BTreeSet>, subscriptions: &BTreeSet, process_events: &mut Vec, order_events: &[OrderEvent], fills: &[FillEvent], ) -> Result { let rate = portfolio.management_fee_rate(); if rate <= 0.0 { return Ok(BrokerExecutionReport::default()); } let fee = self .strategy .management_fee( &StrategyContext { execution_date, decision_date, decision_index, data: &self.data, portfolio, futures_account: self.futures_account.as_ref(), open_orders, dynamic_universe, subscriptions, process_events: process_events.as_slice(), active_process_event: None, active_datetime: stage_datetime( execution_date, default_stage_time(ScheduleStage::Settlement), ), order_events, fills, }, rate, )? .unwrap_or_else(|| portfolio.default_management_fee()); if fee <= 0.0 { return Ok(BrokerExecutionReport::default()); } let cash_before = portfolio.cash(); portfolio .apply_management_fee(fee) .map_err(BacktestError::Execution)?; let mut report = BrokerExecutionReport::default(); report.account_events.push(AccountEvent { date: execution_date, cash_before, cash_after: portfolio.cash(), total_equity: portfolio.total_equity(), note: format!("management_fee rate={rate:.6} fee={fee:.2}"), }); publish_custom_process_event( &mut self.strategy, &mut self.process_event_bus, execution_date, decision_date, decision_index, &self.data, &*portfolio, self.futures_account.as_ref(), open_orders, dynamic_universe, subscriptions, process_events, ProcessEvent { date: execution_date, kind: ProcessEventKind::AccountManagementFee, order_id: None, symbol: None, side: None, detail: format!( "rate={rate:.6} fee={fee:.2} cash_before={cash_before:.2} cash_after={:.2} management_fees={:.2}", portfolio.cash(), portfolio.management_fees() ), }, )?; Ok(report) } fn settle_delisted_positions( &self, date: NaiveDate, portfolio: &mut PortfolioState, notes: &mut Vec, ) -> Result { let mut report = BrokerExecutionReport::default(); let symbols = portfolio.positions().keys().cloned().collect::>(); for symbol in symbols { let Some(position) = portfolio.position(&symbol) else { continue; }; if position.quantity == 0 { continue; } let Some(instrument) = self.data.instrument(&symbol) else { continue; }; let should_settle = instrument.is_delisted_before(date) || (instrument.status.eq_ignore_ascii_case("delisted") && instrument.delisted_at.is_none() && self.data.market(date, &symbol).is_none()); if !should_settle { continue; } let quantity = position.quantity; let fallback_reference_price = if position.last_price > 0.0 { position.last_price } else { position.average_cost }; let effective_delisted_at = instrument .delisted_at .or_else(|| self.data.calendar().previous_day(date)) .unwrap_or(date); let settlement_price = self .data .price_on_or_before(effective_delisted_at, &symbol, PriceField::Close) .or_else(|| { self.data .price_on_or_before(date, &symbol, PriceField::Close) }) .filter(|price| price.is_finite() && *price > 0.0) .unwrap_or(fallback_reference_price); if !settlement_price.is_finite() || settlement_price <= 0.0 { return Err(BacktestError::Execution(format!( "missing delisting settlement price for {} on {}", symbol, date ))); } let cash_before = portfolio.cash(); let gross_amount = settlement_price * quantity as f64; let realized_pnl_delta = { let position = portfolio .position_mut_if_exists(&symbol) .expect("position exists for delisting settlement"); position .sell(quantity, settlement_price) .map_err(BacktestError::Execution)? }; portfolio.apply_cash_delta(gross_amount); portfolio.prune_flat_positions(); let reason = format!( "delisted_cash_settlement effective_date={} status={}", effective_delisted_at, instrument.status ); notes.push(reason.clone()); report.order_events.push(OrderEvent { date, order_id: None, symbol: symbol.clone(), side: OrderSide::Sell, requested_quantity: quantity, filled_quantity: quantity, status: OrderStatus::Filled, reason: reason.clone(), }); report.fill_events.push(FillEvent { date, order_id: None, symbol: symbol.clone(), side: OrderSide::Sell, quantity, price: settlement_price, gross_amount, commission: 0.0, stamp_tax: 0.0, net_cash_flow: gross_amount, reason: reason.clone(), }); report.position_events.push(PositionEvent { date, symbol: symbol.clone(), delta_quantity: -(quantity as i32), quantity_after: 0, average_cost: 0.0, realized_pnl_delta, reason: reason.clone(), }); report.account_events.push(AccountEvent { date, cash_before, cash_after: portfolio.cash(), total_equity: portfolio.total_equity(), note: reason, }); } Ok(report) } } fn collect_scheduled_decisions( strategy: &mut S, scheduler: &Scheduler<'_>, execution_date: NaiveDate, stage: ScheduleStage, rules: &[ScheduleRule], decision_date: NaiveDate, decision_index: usize, data: &crate::data::DataSet, portfolio: &PortfolioState, futures_account: Option<&FuturesAccountState>, open_orders: &[crate::strategy::OpenOrderView], dynamic_universe: Option<&BTreeSet>, subscriptions: &BTreeSet, process_events: &mut Vec, process_event_bus: &mut ProcessEventBus, current_time: Option, order_events: &[OrderEvent], fills: &[FillEvent], ) -> Result { let mut combined = crate::strategy::StrategyDecision::default(); for rule in scheduler.triggered_rules_at(execution_date, stage, current_time, rules) { publish_phase_event( strategy, process_event_bus, execution_date, decision_date, decision_index, data, portfolio, futures_account, open_orders, dynamic_universe, subscriptions, process_events, execution_date, ProcessEventKind::PreScheduled, format!("scheduled:{}:{}:pre", rule.name, stage_label(stage)), )?; combined.merge_from(strategy.on_scheduled( &StrategyContext { execution_date, decision_date, decision_index, data, portfolio, futures_account, open_orders, dynamic_universe, subscriptions, process_events: process_events.as_slice(), active_process_event: None, active_datetime: stage_datetime(execution_date, current_time), order_events, fills, }, rule, )?); publish_phase_event( strategy, process_event_bus, execution_date, decision_date, decision_index, data, portfolio, futures_account, open_orders, dynamic_universe, subscriptions, process_events, execution_date, ProcessEventKind::PostScheduled, format!("scheduled:{}:{}:post", rule.name, stage_label(stage)), )?; } Ok(combined) } fn publish_phase_event( strategy: &mut S, process_event_bus: &mut ProcessEventBus, execution_date: NaiveDate, decision_date: NaiveDate, decision_index: usize, data: &crate::data::DataSet, portfolio: &PortfolioState, futures_account: Option<&FuturesAccountState>, open_orders: &[crate::strategy::OpenOrderView], dynamic_universe: Option<&BTreeSet>, subscriptions: &BTreeSet, events: &mut Vec, date: NaiveDate, kind: ProcessEventKind, detail: impl Into, ) -> Result<(), BacktestError> { let event = ProcessEvent { date, kind, order_id: None, symbol: None, side: None, detail: detail.into(), }; process_event_bus.publish(&event); let process_events = events.as_slice(); let event_ctx = StrategyContext { execution_date, decision_date, decision_index, data, portfolio, futures_account, open_orders, dynamic_universe, subscriptions, process_events, active_process_event: Some(&event), active_datetime: None, order_events: &[], fills: &[], }; strategy.on_process_event(&event_ctx, &event)?; events.push(event); Ok(()) } fn publish_process_events( strategy: &mut S, process_event_bus: &mut ProcessEventBus, execution_date: NaiveDate, decision_date: NaiveDate, decision_index: usize, data: &crate::data::DataSet, portfolio: &PortfolioState, futures_account: Option<&FuturesAccountState>, open_orders: &[crate::strategy::OpenOrderView], dynamic_universe: Option<&BTreeSet>, subscriptions: &BTreeSet, target: &mut Vec, incoming: &mut Vec, ) -> Result<(), BacktestError> { for event in incoming.drain(..) { process_event_bus.publish(&event); let process_events = target.as_slice(); let event_ctx = StrategyContext { execution_date, decision_date, decision_index, data, portfolio, futures_account, open_orders, dynamic_universe, subscriptions, process_events, active_process_event: Some(&event), active_datetime: None, order_events: &[], fills: &[], }; strategy.on_process_event(&event_ctx, &event)?; target.push(event); } Ok(()) } fn publish_custom_process_event( strategy: &mut S, process_event_bus: &mut ProcessEventBus, execution_date: NaiveDate, decision_date: NaiveDate, decision_index: usize, data: &crate::data::DataSet, portfolio: &PortfolioState, futures_account: Option<&FuturesAccountState>, open_orders: &[crate::strategy::OpenOrderView], dynamic_universe: Option<&BTreeSet>, subscriptions: &BTreeSet, target: &mut Vec, event: ProcessEvent, ) -> Result<(), BacktestError> { process_event_bus.publish(&event); let process_events = target.as_slice(); let event_ctx = StrategyContext { execution_date, decision_date, decision_index, data, portfolio, futures_account, open_orders, dynamic_universe, subscriptions, process_events, active_process_event: Some(&event), active_datetime: None, order_events: &[], fills: &[], }; strategy.on_process_event(&event_ctx, &event)?; target.push(event); Ok(()) } fn stage_label(stage: ScheduleStage) -> &'static str { match stage { ScheduleStage::BeforeTrading => "before_trading", ScheduleStage::OpenAuction => "open_auction", ScheduleStage::Bar => "bar", ScheduleStage::Tick => "tick", ScheduleStage::OnDay => "on_day", ScheduleStage::AfterTrading => "after_trading", ScheduleStage::Settlement => "settlement", } } fn stage_datetime( date: NaiveDate, time: Option, ) -> Option { time.map(|value| date.and_time(value)) } fn should_run_tick_events(rules: &[ScheduleRule], subscriptions: &BTreeSet) -> bool { !subscriptions.is_empty() || rules.iter().any(|rule| rule.stage == ScheduleStage::Tick) } fn merge_broker_report(target: &mut BrokerExecutionReport, incoming: BrokerExecutionReport) { target.order_events.extend(incoming.order_events); target.fill_events.extend(incoming.fill_events); target.position_events.extend(incoming.position_events); target.account_events.extend(incoming.account_events); target.process_events.extend(incoming.process_events); target.diagnostics.extend(incoming.diagnostics); } fn merge_futures_report(target: &mut BrokerExecutionReport, incoming: FuturesExecutionReport) { target.order_events.extend(incoming.order_events); target.fill_events.extend(incoming.fill_events); target.position_events.extend(incoming.position_events); target.account_events.extend(incoming.account_events); target.process_events.extend(incoming.process_events); target.diagnostics.extend(incoming.diagnostics); } fn merge_futures_execution_report( target: &mut FuturesExecutionReport, incoming: FuturesExecutionReport, ) { target.order_events.extend(incoming.order_events); target.fill_events.extend(incoming.fill_events); target.position_events.extend(incoming.position_events); target.account_events.extend(incoming.account_events); target.process_events.extend(incoming.process_events); target.diagnostics.extend(incoming.diagnostics); } fn analyzer_ratio_change(start: f64, end: f64) -> f64 { if start.abs() <= f64::EPSILON { 0.0 } else { end / start - 1.0 } } fn futures_limit_satisfied(side: OrderSide, price: f64, limit_price: Option) -> bool { let Some(limit_price) = limit_price else { return price.is_finite() && price > 0.0; }; if !price.is_finite() || price <= 0.0 || !limit_price.is_finite() || limit_price <= 0.0 { return false; } match side { OrderSide::Buy => price <= limit_price + 1e-9, OrderSide::Sell => price + 1e-9 >= limit_price, } } fn futures_cancel_report( date: NaiveDate, order: FuturesOpenOrder, reason: &str, ) -> FuturesExecutionReport { let mut report = FuturesExecutionReport::default(); let side = order.intent.side(); report.process_events.push(ProcessEvent { date, kind: ProcessEventKind::OrderPendingCancel, order_id: Some(order.order_id), symbol: Some(order.intent.symbol.clone()), side: Some(side), detail: format!("reason={reason}"), }); report.order_events.push(OrderEvent { date, order_id: Some(order.order_id), symbol: order.intent.symbol.clone(), side, requested_quantity: order.requested_quantity, filled_quantity: order.filled_quantity, status: OrderStatus::Canceled, reason: format!("{reason}: futures order canceled by user"), }); report.process_events.push(ProcessEvent { date, kind: ProcessEventKind::OrderCancellationPass, order_id: Some(order.order_id), symbol: Some(order.intent.symbol), side: Some(side), detail: format!( "requested_quantity={} filled_quantity={} remaining_quantity={}", order.requested_quantity, order.filled_quantity, order.remaining_quantity ), }); report } mod date_format { use chrono::NaiveDate; use serde::Serializer; const FORMAT: &str = "%Y-%m-%d"; pub fn serialize(date: &NaiveDate, serializer: S) -> Result where S: Serializer, { serializer.serialize_str(&date.format(FORMAT).to_string()) } }