Compare commits

..

8 Commits

Author SHA1 Message Date
boris 6c7f7130cf 修复平台策略金额买入预算 2026-06-17 19:35:19 +08:00
boris d8b6130428 修复平台策略执行行情投影判断 2026-06-17 19:08:19 +08:00
boris dae573e318 修复AiQuant补位买入预算口径 2026-06-17 18:21:35 +08:00
boris 674e4b0b14 修复非周期补买候选失败中断 2026-06-17 12:15:43 +08:00
boris 828b55c747 共享因子候选索引内存 2026-06-17 10:05:55 +08:00
boris 596d64280b 优化行情序列内存结构 2026-06-17 09:55:31 +08:00
boris 1683d875a0 修正平台策略延迟卖出预算口径 2026-06-17 09:04:50 +08:00
boris ed4658ccd0 修正平台策略选股和弱市调仓口径 2026-06-17 07:40:27 +08:00
2 changed files with 1405 additions and 237 deletions
+225 -84
View File
@@ -1,6 +1,7 @@
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fs;
use std::path::Path;
use std::sync::Arc;
use chrono::{NaiveDate, NaiveDateTime};
use serde::{Deserialize, Serialize};
@@ -479,13 +480,27 @@ pub fn decision_free_float_cap_bn(
#[derive(Debug, Clone)]
struct SymbolPriceSeries {
snapshots: Vec<DailyMarketSnapshot>,
symbol: String,
dates: Vec<NaiveDate>,
timestamps: Vec<Option<String>>,
day_opens: Vec<f64>,
opens: Vec<f64>,
highs: Vec<f64>,
lows: Vec<f64>,
closes: Vec<f64>,
prev_closes: Vec<f64>,
last_prices: Vec<f64>,
volumes: Vec<f64>,
bid1s: Vec<f64>,
ask1s: Vec<f64>,
volumes: Vec<u64>,
tick_volumes: Vec<u64>,
bid1_volumes: Vec<u64>,
ask1_volumes: Vec<u64>,
trading_phases: Vec<Option<String>>,
paused: Vec<bool>,
upper_limits: Vec<f64>,
lower_limits: Vec<f64>,
price_ticks: Vec<f64>,
open_prefix: Vec<f64>,
close_prefix: Vec<f64>,
prev_close_prefix: Vec<f64>,
@@ -494,33 +509,68 @@ struct SymbolPriceSeries {
}
impl SymbolPriceSeries {
fn new(rows: &[DailyMarketSnapshot]) -> Self {
let mut sorted = rows.to_vec();
fn new(symbol: String, rows: &[DailyMarketSnapshot]) -> Self {
let mut sorted = rows.iter().collect::<Vec<_>>();
sorted.sort_by_key(|row| row.date);
let dates = sorted.iter().map(|row| row.date).collect::<Vec<_>>();
let timestamps = sorted
.iter()
.map(|row| row.timestamp.clone())
.collect::<Vec<_>>();
let day_opens = sorted.iter().map(|row| row.day_open).collect::<Vec<_>>();
let opens = sorted.iter().map(|row| row.open).collect::<Vec<_>>();
let highs = sorted.iter().map(|row| row.high).collect::<Vec<_>>();
let lows = sorted.iter().map(|row| row.low).collect::<Vec<_>>();
let closes = sorted.iter().map(|row| row.close).collect::<Vec<_>>();
let prev_closes = sorted.iter().map(|row| row.prev_close).collect::<Vec<_>>();
let last_prices = sorted.iter().map(|row| row.last_price).collect::<Vec<_>>();
let volumes = sorted
let bid1s = sorted.iter().map(|row| row.bid1).collect::<Vec<_>>();
let ask1s = sorted.iter().map(|row| row.ask1).collect::<Vec<_>>();
let volumes = sorted.iter().map(|row| row.volume).collect::<Vec<_>>();
let tick_volumes = sorted.iter().map(|row| row.tick_volume).collect::<Vec<_>>();
let bid1_volumes = sorted.iter().map(|row| row.bid1_volume).collect::<Vec<_>>();
let ask1_volumes = sorted.iter().map(|row| row.ask1_volume).collect::<Vec<_>>();
let trading_phases = sorted
.iter()
.map(|row| row.volume as f64)
.map(|row| row.trading_phase.clone())
.collect::<Vec<_>>();
let paused = sorted.iter().map(|row| row.paused).collect::<Vec<_>>();
let upper_limits = sorted.iter().map(|row| row.upper_limit).collect::<Vec<_>>();
let lower_limits = sorted.iter().map(|row| row.lower_limit).collect::<Vec<_>>();
let price_ticks = sorted.iter().map(|row| row.price_tick).collect::<Vec<_>>();
let open_prefix = prefix_sums(&opens);
let close_prefix = prefix_sums(&closes);
let prev_close_prefix = prefix_sums(&prev_closes);
let last_prefix = prefix_sums(&last_prices);
let volume_prefix = prefix_sums(&volumes);
let volume_values = volumes
.iter()
.map(|value| *value as f64)
.collect::<Vec<_>>();
let volume_prefix = prefix_sums(&volume_values);
Self {
snapshots: sorted,
symbol,
dates,
timestamps,
day_opens,
opens,
highs,
lows,
closes,
prev_closes,
last_prices,
bid1s,
ask1s,
volumes,
tick_volumes,
bid1_volumes,
ask1_volumes,
trading_phases,
paused,
upper_limits,
lower_limits,
price_ticks,
open_prefix,
close_prefix,
prev_close_prefix,
@@ -548,7 +598,7 @@ impl SymbolPriceSeries {
return Vec::new();
};
let start = end.saturating_sub(lookback);
self.values_for(field)[start..end].to_vec()
self.price_values_for(field)[start..end].to_vec()
}
fn trailing_snapshots(
@@ -569,7 +619,31 @@ impl SymbolPriceSeries {
return Vec::new();
};
let start = end.saturating_sub(lookback);
self.snapshots[start..end].to_vec()
(start..end).map(|index| self.snapshot_at(index)).collect()
}
fn trailing_numeric_values(
&self,
date: NaiveDate,
lookback: usize,
field: &str,
include_now: bool,
) -> Vec<f64> {
if lookback == 0 {
return Vec::new();
}
let end = if include_now {
self.end_index(date)
} else {
self.previous_completed_end_index(date)
};
let Some(end) = end else {
return Vec::new();
};
let start = end.saturating_sub(lookback);
(start..end)
.filter_map(|index| self.numeric_value_at(index, field))
.collect()
}
fn decision_price_on_or_before(&self, date: NaiveDate) -> Option<f64> {
@@ -656,7 +730,12 @@ impl SymbolPriceSeries {
return None;
}
let start = end - lookback;
Some(self.volumes[start..end].to_vec())
Some(
self.volumes[start..end]
.iter()
.map(|value| *value as f64)
.collect(),
)
}
fn end_index(&self, date: NaiveDate) -> Option<usize> {
@@ -667,9 +746,9 @@ impl SymbolPriceSeries {
}
}
fn values_for(&self, field: PriceField) -> &[f64] {
fn price_values_for(&self, field: PriceField) -> &[f64] {
match field {
PriceField::DayOpen => &self.opens,
PriceField::DayOpen => &self.day_opens,
PriceField::Open => &self.opens,
PriceField::Close => &self.closes,
PriceField::Last => &self.last_prices,
@@ -681,15 +760,7 @@ impl SymbolPriceSeries {
if end == 0 {
return None;
}
self.values_for(field).get(end - 1).copied()
}
fn snapshot_before(&self, date: NaiveDate) -> Option<&DailyMarketSnapshot> {
let end = self.previous_completed_end_index(date)?;
if end == 0 {
return None;
}
self.snapshots.get(end - 1)
self.price_values_for(field).get(end - 1).copied()
}
fn prefix_for(&self, field: PriceField) -> &[f64] {
@@ -700,6 +771,54 @@ impl SymbolPriceSeries {
PriceField::Last => &self.last_prefix,
}
}
fn snapshot_at(&self, index: usize) -> DailyMarketSnapshot {
DailyMarketSnapshot {
date: self.dates[index],
symbol: self.symbol.clone(),
timestamp: self.timestamps[index].clone(),
day_open: self.day_opens[index],
open: self.opens[index],
high: self.highs[index],
low: self.lows[index],
close: self.closes[index],
last_price: self.last_prices[index],
bid1: self.bid1s[index],
ask1: self.ask1s[index],
prev_close: self.prev_closes[index],
volume: self.volumes[index],
tick_volume: self.tick_volumes[index],
bid1_volume: self.bid1_volumes[index],
ask1_volume: self.ask1_volumes[index],
trading_phase: self.trading_phases[index].clone(),
paused: self.paused[index],
upper_limit: self.upper_limits[index],
lower_limit: self.lower_limits[index],
price_tick: self.price_ticks[index],
}
}
fn numeric_value_at(&self, index: usize, field: &str) -> Option<f64> {
match normalize_field(field).as_str() {
"day_open" | "dayopen" => Some(self.day_opens[index]),
"open" => Some(self.opens[index]),
"high" => Some(self.highs[index]),
"low" => Some(self.lows[index]),
"close" | "price" => Some(self.closes[index]),
"last" | "last_price" => Some(self.last_prices[index]),
"prev_close" | "pre_close" => Some(self.prev_closes[index]),
"volume" => Some(self.volumes[index] as f64),
"tick_volume" => Some(self.tick_volumes[index] as f64),
"bid1" => Some(self.bid1s[index]),
"ask1" => Some(self.ask1s[index]),
"bid1_volume" => Some(self.bid1_volumes[index] as f64),
"ask1_volume" => Some(self.ask1_volumes[index] as f64),
"upper_limit" => Some(self.upper_limits[index]),
"lower_limit" => Some(self.lower_limits[index]),
"price_tick" => Some(self.price_ticks[index]),
_ => None,
}
}
}
#[derive(Debug, Clone)]
@@ -837,12 +956,12 @@ pub struct DataSet {
calendar: TradingCalendar,
market_by_date: BTreeMap<NaiveDate, Vec<DailyMarketSnapshot>>,
market_index: HashMap<(NaiveDate, String), DailyMarketSnapshot>,
factor_by_date: BTreeMap<NaiveDate, Vec<DailyFactorSnapshot>>,
factor_index: HashMap<(NaiveDate, String), DailyFactorSnapshot>,
factor_by_date: BTreeMap<NaiveDate, Vec<Arc<DailyFactorSnapshot>>>,
factor_index: HashMap<(NaiveDate, String), Arc<DailyFactorSnapshot>>,
factor_text_by_date: BTreeMap<NaiveDate, Vec<FactorTextValue>>,
factor_text_index: HashMap<(NaiveDate, String, String), FactorTextValue>,
candidate_by_date: BTreeMap<NaiveDate, Vec<CandidateEligibility>>,
candidate_index: HashMap<(NaiveDate, String), CandidateEligibility>,
candidate_by_date: BTreeMap<NaiveDate, Vec<Arc<CandidateEligibility>>>,
candidate_index: HashMap<(NaiveDate, String), Arc<CandidateEligibility>>,
corporate_actions_by_date: BTreeMap<NaiveDate, Vec<CorporateAction>>,
execution_quotes_by_date: HashMap<NaiveDate, HashMap<String, Vec<IntradayExecutionQuote>>>,
order_book_depth_index: HashMap<(NaiveDate, String), Vec<IntradayOrderBookDepthLevel>>,
@@ -1087,7 +1206,11 @@ impl DataSet {
) -> Result<Self, DataSetError> {
let benchmark_code = collect_benchmark_code(&benchmarks)?;
let calendar = TradingCalendar::new(benchmarks.iter().map(|item| item.date).collect());
let factors = normalize_factor_snapshots(factors);
let factors = normalize_factor_snapshots(factors)
.into_iter()
.map(Arc::new)
.collect::<Vec<_>>();
let candidates = candidates.into_iter().map(Arc::new).collect::<Vec<_>>();
let instruments = instruments
.into_iter()
@@ -1100,7 +1223,7 @@ impl DataSet {
.map(|item| ((item.date, item.symbol.clone()), item))
.collect::<HashMap<_, _>>();
let factor_by_date = group_by_date(factors.clone(), |item| item.date);
let factor_by_date = group_arc_by_date(&factors, |item| item.date);
let factor_index = factors
.into_iter()
.map(|item| ((item.date, item.symbol.clone()), item))
@@ -1122,7 +1245,7 @@ impl DataSet {
.map(|item| ((item.date, item.symbol.clone(), item.field.clone()), item))
.collect::<HashMap<_, _>>();
let candidate_by_date = group_by_date(candidates.clone(), |item| item.date);
let candidate_by_date = group_arc_by_date(&candidates, |item| item.date);
let candidate_index = candidates
.into_iter()
.map(|item| ((item.date, item.symbol.clone()), item))
@@ -1211,11 +1334,15 @@ impl DataSet {
}
pub fn factor(&self, date: NaiveDate, symbol: &str) -> Option<&DailyFactorSnapshot> {
self.factor_index.get(&(date, symbol.to_string()))
self.factor_index
.get(&(date, symbol.to_string()))
.map(Arc::as_ref)
}
pub fn candidate(&self, date: NaiveDate, symbol: &str) -> Option<&CandidateEligibility> {
self.candidate_index.get(&(date, symbol.to_string()))
self.candidate_index
.get(&(date, symbol.to_string()))
.map(Arc::as_ref)
}
pub fn benchmark(&self, date: NaiveDate) -> Option<&BenchmarkSnapshot> {
@@ -1959,15 +2086,19 @@ impl DataSet {
}
pub fn market_before(&self, date: NaiveDate, symbol: &str) -> Option<&DailyMarketSnapshot> {
self.market_series_by_symbol
.get(symbol)
.and_then(|series| series.snapshot_before(date))
let series = self.market_series_by_symbol.get(symbol)?;
let end = series.previous_completed_end_index(date)?;
if end == 0 {
return None;
}
let previous_date = *series.dates.get(end - 1)?;
self.market_index.get(&(previous_date, symbol.to_string()))
}
pub fn factor_snapshots_on(&self, date: NaiveDate) -> Vec<&DailyFactorSnapshot> {
self.factor_by_date
.get(&date)
.map(|rows| rows.iter().collect())
.map(|rows| rows.iter().map(Arc::as_ref).collect())
.unwrap_or_default()
}
@@ -1988,7 +2119,7 @@ impl DataSet {
pub fn candidate_snapshots_on(&self, date: NaiveDate) -> Vec<&CandidateEligibility> {
self.candidate_by_date
.get(&date)
.map(|rows| rows.iter().collect())
.map(|rows| rows.iter().map(Arc::as_ref).collect())
.unwrap_or_default()
}
@@ -2001,11 +2132,15 @@ impl DataSet {
date,
benchmark,
market: self.market_by_date.get(&date).cloned().unwrap_or_default(),
factors: self.factor_by_date.get(&date).cloned().unwrap_or_default(),
factors: self
.factor_by_date
.get(&date)
.map(|rows| rows.iter().map(|row| row.as_ref().clone()).collect())
.unwrap_or_default(),
candidates: self
.candidate_by_date
.get(&date)
.cloned()
.map(|rows| rows.iter().map(|row| row.as_ref().clone()).collect())
.unwrap_or_default(),
corporate_actions: self
.corporate_actions_by_date
@@ -2034,10 +2169,10 @@ impl DataSet {
field: &str,
include_now: bool,
) -> Vec<f64> {
self.history_daily_snapshots(date, symbol, bar_count, include_now)
.into_iter()
.filter_map(|row| daily_market_numeric_value(&row, field))
.collect()
self.market_series_by_symbol
.get(symbol)
.map(|series| series.trailing_numeric_values(date, bar_count, field, include_now))
.unwrap_or_default()
}
fn history_intraday_values(
@@ -2078,7 +2213,9 @@ impl DataSet {
.iter()
.map(|day| {
evaluator(
self.candidate_index.get(&(*day, symbol.to_string())),
self.candidate_index
.get(&(*day, symbol.to_string()))
.map(Arc::as_ref),
self.market_index.get(&(*day, symbol.to_string())),
)
})
@@ -2761,28 +2898,6 @@ fn factor_numeric_value(snapshot: &DailyFactorSnapshot, field: &str) -> Option<f
}
}
fn daily_market_numeric_value(snapshot: &DailyMarketSnapshot, field: &str) -> Option<f64> {
match normalize_field(field).as_str() {
"day_open" | "dayopen" => Some(snapshot.day_open),
"open" => Some(snapshot.open),
"high" => Some(snapshot.high),
"low" => Some(snapshot.low),
"close" | "price" => Some(snapshot.close),
"last" | "last_price" => Some(snapshot.last_price),
"prev_close" | "pre_close" => Some(snapshot.prev_close),
"volume" => Some(snapshot.volume as f64),
"tick_volume" => Some(snapshot.tick_volume as f64),
"bid1" => Some(snapshot.bid1),
"ask1" => Some(snapshot.ask1),
"bid1_volume" => Some(snapshot.bid1_volume as f64),
"ask1_volume" => Some(snapshot.ask1_volume as f64),
"upper_limit" => Some(snapshot.upper_limit),
"lower_limit" => Some(snapshot.lower_limit),
"price_tick" => Some(snapshot.price_tick),
_ => None,
}
}
fn intraday_quote_numeric_value(snapshot: &IntradayExecutionQuote, field: &str) -> Option<f64> {
match normalize_field(field).as_str() {
"last" | "last_price" | "close" | "price" => Some(snapshot.last_price),
@@ -3275,6 +3390,20 @@ where
grouped
}
fn group_arc_by_date<T, F>(rows: &[Arc<T>], mut date_of: F) -> BTreeMap<NaiveDate, Vec<Arc<T>>>
where
F: FnMut(&T) -> NaiveDate,
{
let mut grouped = BTreeMap::<NaiveDate, Vec<Arc<T>>>::new();
for row in rows {
grouped
.entry(date_of(row.as_ref()))
.or_default()
.push(Arc::clone(row));
}
grouped
}
fn collect_benchmark_code(benchmarks: &[BenchmarkSnapshot]) -> Result<String, DataSetError> {
let mut codes = benchmarks
.iter()
@@ -3358,7 +3487,10 @@ fn build_market_series(
grouped
.into_iter()
.map(|(symbol, rows)| (symbol, SymbolPriceSeries::new(&rows)))
.map(|(symbol, rows)| {
let series = SymbolPriceSeries::new(symbol.clone(), &rows);
(symbol, series)
})
.collect()
}
@@ -3420,8 +3552,8 @@ fn build_order_book_depth_index(
}
fn build_eligible_universe(
factor_by_date: &BTreeMap<NaiveDate, Vec<DailyFactorSnapshot>>,
candidate_index: &HashMap<(NaiveDate, String), CandidateEligibility>,
factor_by_date: &BTreeMap<NaiveDate, Vec<Arc<DailyFactorSnapshot>>>,
candidate_index: &HashMap<(NaiveDate, String), Arc<CandidateEligibility>>,
market_index: &HashMap<(NaiveDate, String), DailyMarketSnapshot>,
instruments: &HashMap<String, Instrument>,
) -> BTreeMap<NaiveDate, Vec<EligibleUniverseSnapshot>> {
@@ -3581,11 +3713,14 @@ mod tests {
#[test]
fn decision_volume_average_uses_previous_completed_days_only() {
let series = SymbolPriceSeries::new(&[
market_row("2025-01-02", 10.0, 100),
market_row("2025-01-03", 11.0, 200),
market_row("2025-01-06", 12.0, 10_000),
]);
let series = SymbolPriceSeries::new(
"000001.SZ".to_string(),
&[
market_row("2025-01-02", 10.0, 100),
market_row("2025-01-03", 11.0, 200),
market_row("2025-01-06", 12.0, 10_000),
],
);
assert_eq!(
series.decision_close_moving_average(
@@ -3615,11 +3750,14 @@ mod tests {
let mut current = market_row("2025-01-06", 12.0, 10_000);
current.close = 9_999.0;
current.last_price = 9_999.0;
let series = SymbolPriceSeries::new(&[
market_row("2025-01-02", 10.0, 100),
market_row("2025-01-03", 11.0, 200),
current,
]);
let series = SymbolPriceSeries::new(
"000001.SZ".to_string(),
&[
market_row("2025-01-02", 10.0, 100),
market_row("2025-01-03", 11.0, 200),
current,
],
);
let decision_date = NaiveDate::parse_from_str("2025-01-06", "%Y-%m-%d").unwrap();
assert_eq!(
@@ -3636,12 +3774,15 @@ mod tests {
fn decision_volume_average_includes_paused_zero_volume_days() {
let mut paused = market_row("2025-01-03", 11.0, 0);
paused.paused = true;
let series = SymbolPriceSeries::new(&[
market_row("2025-01-02", 10.0, 100),
paused,
market_row("2025-01-06", 12.0, 300),
market_row("2025-01-07", 13.0, 10_000),
]);
let series = SymbolPriceSeries::new(
"000001.SZ".to_string(),
&[
market_row("2025-01-02", 10.0, 100),
paused,
market_row("2025-01-06", 12.0, 300),
market_row("2025-01-07", 13.0, 10_000),
],
);
assert_eq!(
series.decision_volume_moving_average(
File diff suppressed because it is too large Load Diff