Add bar and tick strategy lifecycle

This commit is contained in:
boris
2026-04-23 19:17:04 -07:00
parent 86e4db6272
commit 1760fc6cd1
9 changed files with 525 additions and 10 deletions

View File

@@ -640,6 +640,68 @@ where
ProcessEventKind::OnDay,
"on_day",
)?;
let bar_open_orders = self.broker.open_order_views();
publish_phase_event(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
&portfolio,
&bar_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::PreBar,
"bar:pre",
)?;
decision.merge_from(collect_scheduled_decisions(
&mut self.strategy,
&scheduler,
execution_date,
ScheduleStage::Bar,
&schedule_rules,
decision_date,
decision_index,
&self.data,
&portfolio,
&bar_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
&mut self.process_event_bus,
default_stage_time(ScheduleStage::Bar),
)?);
decision.merge_from(self.strategy.on_bar(&StrategyContext {
execution_date,
decision_date,
decision_index,
data: &self.data,
portfolio: &portfolio,
open_orders: &bar_open_orders,
dynamic_universe: self.dynamic_universe.as_ref(),
subscriptions: &self.subscriptions,
process_events: &process_events,
active_process_event: None,
})?);
publish_phase_event(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
&portfolio,
&bar_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::Bar,
"bar",
)?;
self.apply_strategy_directives(
execution_date,
decision_date,
@@ -691,6 +753,151 @@ where
ProcessEventKind::PostOnDay,
"on_day:post",
)?;
publish_phase_event(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
&portfolio,
&post_intraday_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::PostBar,
"bar:post",
)?;
if should_run_tick_events(&schedule_rules, &self.subscriptions) {
let filter_by_subscription = !self.subscriptions.is_empty();
let tick_quotes = self
.data
.execution_quotes_on_date(execution_date)
.into_iter()
.filter(|quote| {
!filter_by_subscription || self.subscriptions.contains(&quote.symbol)
})
.collect::<Vec<_>>();
for quote in tick_quotes {
let tick_time = quote.timestamp.time();
let tick_open_orders = self.broker.open_order_views();
publish_phase_event(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
&portfolio,
&tick_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::PreTick,
format!("tick:{}:{}:pre", quote.symbol, quote.timestamp),
)?;
let mut tick_decision = collect_scheduled_decisions(
&mut self.strategy,
&scheduler,
execution_date,
ScheduleStage::Tick,
&schedule_rules,
decision_date,
decision_index,
&self.data,
&portfolio,
&tick_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
&mut self.process_event_bus,
Some(tick_time),
)?;
tick_decision.merge_from(self.strategy.on_tick(
&StrategyContext {
execution_date,
decision_date,
decision_index,
data: &self.data,
portfolio: &portfolio,
open_orders: &tick_open_orders,
dynamic_universe: self.dynamic_universe.as_ref(),
subscriptions: &self.subscriptions,
process_events: &process_events,
active_process_event: None,
},
&quote,
)?);
publish_phase_event(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
&portfolio,
&tick_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::Tick,
format!("tick:{}:{}", quote.symbol, quote.timestamp),
)?;
self.apply_strategy_directives(
execution_date,
decision_date,
decision_index,
&portfolio,
&tick_open_orders,
&mut process_events,
&mut tick_decision,
)?;
let mut tick_report = self.broker.execute_between(
execution_date,
&mut portfolio,
&self.data,
&tick_decision,
Some(tick_time),
Some(tick_time),
)?;
let post_tick_open_orders = self.broker.open_order_views();
publish_process_events(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
&portfolio,
&post_tick_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
&mut tick_report.process_events,
)?;
merge_broker_report(&mut report, tick_report);
publish_phase_event(
&mut self.strategy,
&mut self.process_event_bus,
execution_date,
decision_date,
decision_index,
&self.data,
&portfolio,
&post_tick_open_orders,
self.dynamic_universe.as_ref(),
&self.subscriptions,
&mut process_events,
execution_date,
ProcessEventKind::PostTick,
format!("tick:{}:{}:post", quote.symbol, quote.timestamp),
)?;
}
}
portfolio.update_prices(execution_date, &self.data, PriceField::Close)?;
@@ -1557,12 +1764,27 @@ fn stage_label(stage: ScheduleStage) -> &'static str {
match stage {
ScheduleStage::BeforeTrading => "before_trading",
ScheduleStage::OpenAuction => "open_auction",
ScheduleStage::Bar => "bar",
ScheduleStage::Tick => "tick",
ScheduleStage::OnDay => "on_day",
ScheduleStage::AfterTrading => "after_trading",
ScheduleStage::Settlement => "settlement",
}
}
fn should_run_tick_events(rules: &[ScheduleRule], subscriptions: &BTreeSet<String>) -> bool {
!subscriptions.is_empty() || rules.iter().any(|rule| rule.stage == ScheduleStage::Tick)
}
fn merge_broker_report(target: &mut BrokerExecutionReport, incoming: BrokerExecutionReport) {
target.order_events.extend(incoming.order_events);
target.fill_events.extend(incoming.fill_events);
target.position_events.extend(incoming.position_events);
target.account_events.extend(incoming.account_events);
target.process_events.extend(incoming.process_events);
target.diagnostics.extend(incoming.diagnostics);
}
mod date_format {
use chrono::NaiveDate;
use serde::Serializer;