优化行情序列内存结构

This commit is contained in:
boris
2026-06-17 09:55:31 +08:00
parent 1683d875a0
commit 596d64280b
+180 -68
View File
@@ -479,13 +479,27 @@ pub fn decision_free_float_cap_bn(
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct SymbolPriceSeries { struct SymbolPriceSeries {
snapshots: Vec<DailyMarketSnapshot>, symbol: String,
dates: Vec<NaiveDate>, dates: Vec<NaiveDate>,
timestamps: Vec<Option<String>>,
day_opens: Vec<f64>,
opens: Vec<f64>, opens: Vec<f64>,
highs: Vec<f64>,
lows: Vec<f64>,
closes: Vec<f64>, closes: Vec<f64>,
prev_closes: Vec<f64>, prev_closes: Vec<f64>,
last_prices: Vec<f64>, last_prices: Vec<f64>,
volumes: Vec<f64>, bid1s: Vec<f64>,
ask1s: Vec<f64>,
volumes: Vec<u64>,
tick_volumes: Vec<u64>,
bid1_volumes: Vec<u64>,
ask1_volumes: Vec<u64>,
trading_phases: Vec<Option<String>>,
paused: Vec<bool>,
upper_limits: Vec<f64>,
lower_limits: Vec<f64>,
price_ticks: Vec<f64>,
open_prefix: Vec<f64>, open_prefix: Vec<f64>,
close_prefix: Vec<f64>, close_prefix: Vec<f64>,
prev_close_prefix: Vec<f64>, prev_close_prefix: Vec<f64>,
@@ -494,33 +508,68 @@ struct SymbolPriceSeries {
} }
impl SymbolPriceSeries { impl SymbolPriceSeries {
fn new(rows: &[DailyMarketSnapshot]) -> Self { fn new(symbol: String, rows: &[DailyMarketSnapshot]) -> Self {
let mut sorted = rows.to_vec(); let mut sorted = rows.iter().collect::<Vec<_>>();
sorted.sort_by_key(|row| row.date); sorted.sort_by_key(|row| row.date);
let dates = sorted.iter().map(|row| row.date).collect::<Vec<_>>(); let dates = sorted.iter().map(|row| row.date).collect::<Vec<_>>();
let timestamps = sorted
.iter()
.map(|row| row.timestamp.clone())
.collect::<Vec<_>>();
let day_opens = sorted.iter().map(|row| row.day_open).collect::<Vec<_>>();
let opens = sorted.iter().map(|row| row.open).collect::<Vec<_>>(); let opens = sorted.iter().map(|row| row.open).collect::<Vec<_>>();
let highs = sorted.iter().map(|row| row.high).collect::<Vec<_>>();
let lows = sorted.iter().map(|row| row.low).collect::<Vec<_>>();
let closes = sorted.iter().map(|row| row.close).collect::<Vec<_>>(); let closes = sorted.iter().map(|row| row.close).collect::<Vec<_>>();
let prev_closes = sorted.iter().map(|row| row.prev_close).collect::<Vec<_>>(); let prev_closes = sorted.iter().map(|row| row.prev_close).collect::<Vec<_>>();
let last_prices = sorted.iter().map(|row| row.last_price).collect::<Vec<_>>(); let last_prices = sorted.iter().map(|row| row.last_price).collect::<Vec<_>>();
let volumes = sorted let bid1s = sorted.iter().map(|row| row.bid1).collect::<Vec<_>>();
let ask1s = sorted.iter().map(|row| row.ask1).collect::<Vec<_>>();
let volumes = sorted.iter().map(|row| row.volume).collect::<Vec<_>>();
let tick_volumes = sorted.iter().map(|row| row.tick_volume).collect::<Vec<_>>();
let bid1_volumes = sorted.iter().map(|row| row.bid1_volume).collect::<Vec<_>>();
let ask1_volumes = sorted.iter().map(|row| row.ask1_volume).collect::<Vec<_>>();
let trading_phases = sorted
.iter() .iter()
.map(|row| row.volume as f64) .map(|row| row.trading_phase.clone())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let paused = sorted.iter().map(|row| row.paused).collect::<Vec<_>>();
let upper_limits = sorted.iter().map(|row| row.upper_limit).collect::<Vec<_>>();
let lower_limits = sorted.iter().map(|row| row.lower_limit).collect::<Vec<_>>();
let price_ticks = sorted.iter().map(|row| row.price_tick).collect::<Vec<_>>();
let open_prefix = prefix_sums(&opens); let open_prefix = prefix_sums(&opens);
let close_prefix = prefix_sums(&closes); let close_prefix = prefix_sums(&closes);
let prev_close_prefix = prefix_sums(&prev_closes); let prev_close_prefix = prefix_sums(&prev_closes);
let last_prefix = prefix_sums(&last_prices); let last_prefix = prefix_sums(&last_prices);
let volume_prefix = prefix_sums(&volumes); let volume_values = volumes
.iter()
.map(|value| *value as f64)
.collect::<Vec<_>>();
let volume_prefix = prefix_sums(&volume_values);
Self { Self {
snapshots: sorted, symbol,
dates, dates,
timestamps,
day_opens,
opens, opens,
highs,
lows,
closes, closes,
prev_closes, prev_closes,
last_prices, last_prices,
bid1s,
ask1s,
volumes, volumes,
tick_volumes,
bid1_volumes,
ask1_volumes,
trading_phases,
paused,
upper_limits,
lower_limits,
price_ticks,
open_prefix, open_prefix,
close_prefix, close_prefix,
prev_close_prefix, prev_close_prefix,
@@ -548,7 +597,7 @@ impl SymbolPriceSeries {
return Vec::new(); return Vec::new();
}; };
let start = end.saturating_sub(lookback); let start = end.saturating_sub(lookback);
self.values_for(field)[start..end].to_vec() self.price_values_for(field)[start..end].to_vec()
} }
fn trailing_snapshots( fn trailing_snapshots(
@@ -569,7 +618,31 @@ impl SymbolPriceSeries {
return Vec::new(); return Vec::new();
}; };
let start = end.saturating_sub(lookback); let start = end.saturating_sub(lookback);
self.snapshots[start..end].to_vec() (start..end).map(|index| self.snapshot_at(index)).collect()
}
fn trailing_numeric_values(
&self,
date: NaiveDate,
lookback: usize,
field: &str,
include_now: bool,
) -> Vec<f64> {
if lookback == 0 {
return Vec::new();
}
let end = if include_now {
self.end_index(date)
} else {
self.previous_completed_end_index(date)
};
let Some(end) = end else {
return Vec::new();
};
let start = end.saturating_sub(lookback);
(start..end)
.filter_map(|index| self.numeric_value_at(index, field))
.collect()
} }
fn decision_price_on_or_before(&self, date: NaiveDate) -> Option<f64> { fn decision_price_on_or_before(&self, date: NaiveDate) -> Option<f64> {
@@ -656,7 +729,12 @@ impl SymbolPriceSeries {
return None; return None;
} }
let start = end - lookback; let start = end - lookback;
Some(self.volumes[start..end].to_vec()) Some(
self.volumes[start..end]
.iter()
.map(|value| *value as f64)
.collect(),
)
} }
fn end_index(&self, date: NaiveDate) -> Option<usize> { fn end_index(&self, date: NaiveDate) -> Option<usize> {
@@ -667,9 +745,9 @@ impl SymbolPriceSeries {
} }
} }
fn values_for(&self, field: PriceField) -> &[f64] { fn price_values_for(&self, field: PriceField) -> &[f64] {
match field { match field {
PriceField::DayOpen => &self.opens, PriceField::DayOpen => &self.day_opens,
PriceField::Open => &self.opens, PriceField::Open => &self.opens,
PriceField::Close => &self.closes, PriceField::Close => &self.closes,
PriceField::Last => &self.last_prices, PriceField::Last => &self.last_prices,
@@ -681,15 +759,7 @@ impl SymbolPriceSeries {
if end == 0 { if end == 0 {
return None; return None;
} }
self.values_for(field).get(end - 1).copied() self.price_values_for(field).get(end - 1).copied()
}
fn snapshot_before(&self, date: NaiveDate) -> Option<&DailyMarketSnapshot> {
let end = self.previous_completed_end_index(date)?;
if end == 0 {
return None;
}
self.snapshots.get(end - 1)
} }
fn prefix_for(&self, field: PriceField) -> &[f64] { fn prefix_for(&self, field: PriceField) -> &[f64] {
@@ -700,6 +770,54 @@ impl SymbolPriceSeries {
PriceField::Last => &self.last_prefix, PriceField::Last => &self.last_prefix,
} }
} }
fn snapshot_at(&self, index: usize) -> DailyMarketSnapshot {
DailyMarketSnapshot {
date: self.dates[index],
symbol: self.symbol.clone(),
timestamp: self.timestamps[index].clone(),
day_open: self.day_opens[index],
open: self.opens[index],
high: self.highs[index],
low: self.lows[index],
close: self.closes[index],
last_price: self.last_prices[index],
bid1: self.bid1s[index],
ask1: self.ask1s[index],
prev_close: self.prev_closes[index],
volume: self.volumes[index],
tick_volume: self.tick_volumes[index],
bid1_volume: self.bid1_volumes[index],
ask1_volume: self.ask1_volumes[index],
trading_phase: self.trading_phases[index].clone(),
paused: self.paused[index],
upper_limit: self.upper_limits[index],
lower_limit: self.lower_limits[index],
price_tick: self.price_ticks[index],
}
}
fn numeric_value_at(&self, index: usize, field: &str) -> Option<f64> {
match normalize_field(field).as_str() {
"day_open" | "dayopen" => Some(self.day_opens[index]),
"open" => Some(self.opens[index]),
"high" => Some(self.highs[index]),
"low" => Some(self.lows[index]),
"close" | "price" => Some(self.closes[index]),
"last" | "last_price" => Some(self.last_prices[index]),
"prev_close" | "pre_close" => Some(self.prev_closes[index]),
"volume" => Some(self.volumes[index] as f64),
"tick_volume" => Some(self.tick_volumes[index] as f64),
"bid1" => Some(self.bid1s[index]),
"ask1" => Some(self.ask1s[index]),
"bid1_volume" => Some(self.bid1_volumes[index] as f64),
"ask1_volume" => Some(self.ask1_volumes[index] as f64),
"upper_limit" => Some(self.upper_limits[index]),
"lower_limit" => Some(self.lower_limits[index]),
"price_tick" => Some(self.price_ticks[index]),
_ => None,
}
}
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@@ -1959,9 +2077,13 @@ impl DataSet {
} }
pub fn market_before(&self, date: NaiveDate, symbol: &str) -> Option<&DailyMarketSnapshot> { pub fn market_before(&self, date: NaiveDate, symbol: &str) -> Option<&DailyMarketSnapshot> {
self.market_series_by_symbol let series = self.market_series_by_symbol.get(symbol)?;
.get(symbol) let end = series.previous_completed_end_index(date)?;
.and_then(|series| series.snapshot_before(date)) if end == 0 {
return None;
}
let previous_date = *series.dates.get(end - 1)?;
self.market_index.get(&(previous_date, symbol.to_string()))
} }
pub fn factor_snapshots_on(&self, date: NaiveDate) -> Vec<&DailyFactorSnapshot> { pub fn factor_snapshots_on(&self, date: NaiveDate) -> Vec<&DailyFactorSnapshot> {
@@ -2034,10 +2156,10 @@ impl DataSet {
field: &str, field: &str,
include_now: bool, include_now: bool,
) -> Vec<f64> { ) -> Vec<f64> {
self.history_daily_snapshots(date, symbol, bar_count, include_now) self.market_series_by_symbol
.into_iter() .get(symbol)
.filter_map(|row| daily_market_numeric_value(&row, field)) .map(|series| series.trailing_numeric_values(date, bar_count, field, include_now))
.collect() .unwrap_or_default()
} }
fn history_intraday_values( fn history_intraday_values(
@@ -2761,28 +2883,6 @@ fn factor_numeric_value(snapshot: &DailyFactorSnapshot, field: &str) -> Option<f
} }
} }
fn daily_market_numeric_value(snapshot: &DailyMarketSnapshot, field: &str) -> Option<f64> {
match normalize_field(field).as_str() {
"day_open" | "dayopen" => Some(snapshot.day_open),
"open" => Some(snapshot.open),
"high" => Some(snapshot.high),
"low" => Some(snapshot.low),
"close" | "price" => Some(snapshot.close),
"last" | "last_price" => Some(snapshot.last_price),
"prev_close" | "pre_close" => Some(snapshot.prev_close),
"volume" => Some(snapshot.volume as f64),
"tick_volume" => Some(snapshot.tick_volume as f64),
"bid1" => Some(snapshot.bid1),
"ask1" => Some(snapshot.ask1),
"bid1_volume" => Some(snapshot.bid1_volume as f64),
"ask1_volume" => Some(snapshot.ask1_volume as f64),
"upper_limit" => Some(snapshot.upper_limit),
"lower_limit" => Some(snapshot.lower_limit),
"price_tick" => Some(snapshot.price_tick),
_ => None,
}
}
fn intraday_quote_numeric_value(snapshot: &IntradayExecutionQuote, field: &str) -> Option<f64> { fn intraday_quote_numeric_value(snapshot: &IntradayExecutionQuote, field: &str) -> Option<f64> {
match normalize_field(field).as_str() { match normalize_field(field).as_str() {
"last" | "last_price" | "close" | "price" => Some(snapshot.last_price), "last" | "last_price" | "close" | "price" => Some(snapshot.last_price),
@@ -3358,7 +3458,10 @@ fn build_market_series(
grouped grouped
.into_iter() .into_iter()
.map(|(symbol, rows)| (symbol, SymbolPriceSeries::new(&rows))) .map(|(symbol, rows)| {
let series = SymbolPriceSeries::new(symbol.clone(), &rows);
(symbol, series)
})
.collect() .collect()
} }
@@ -3581,11 +3684,14 @@ mod tests {
#[test] #[test]
fn decision_volume_average_uses_previous_completed_days_only() { fn decision_volume_average_uses_previous_completed_days_only() {
let series = SymbolPriceSeries::new(&[ let series = SymbolPriceSeries::new(
market_row("2025-01-02", 10.0, 100), "000001.SZ".to_string(),
market_row("2025-01-03", 11.0, 200), &[
market_row("2025-01-06", 12.0, 10_000), market_row("2025-01-02", 10.0, 100),
]); market_row("2025-01-03", 11.0, 200),
market_row("2025-01-06", 12.0, 10_000),
],
);
assert_eq!( assert_eq!(
series.decision_close_moving_average( series.decision_close_moving_average(
@@ -3615,11 +3721,14 @@ mod tests {
let mut current = market_row("2025-01-06", 12.0, 10_000); let mut current = market_row("2025-01-06", 12.0, 10_000);
current.close = 9_999.0; current.close = 9_999.0;
current.last_price = 9_999.0; current.last_price = 9_999.0;
let series = SymbolPriceSeries::new(&[ let series = SymbolPriceSeries::new(
market_row("2025-01-02", 10.0, 100), "000001.SZ".to_string(),
market_row("2025-01-03", 11.0, 200), &[
current, market_row("2025-01-02", 10.0, 100),
]); market_row("2025-01-03", 11.0, 200),
current,
],
);
let decision_date = NaiveDate::parse_from_str("2025-01-06", "%Y-%m-%d").unwrap(); let decision_date = NaiveDate::parse_from_str("2025-01-06", "%Y-%m-%d").unwrap();
assert_eq!( assert_eq!(
@@ -3636,12 +3745,15 @@ mod tests {
fn decision_volume_average_includes_paused_zero_volume_days() { fn decision_volume_average_includes_paused_zero_volume_days() {
let mut paused = market_row("2025-01-03", 11.0, 0); let mut paused = market_row("2025-01-03", 11.0, 0);
paused.paused = true; paused.paused = true;
let series = SymbolPriceSeries::new(&[ let series = SymbolPriceSeries::new(
market_row("2025-01-02", 10.0, 100), "000001.SZ".to_string(),
paused, &[
market_row("2025-01-06", 12.0, 300), market_row("2025-01-02", 10.0, 100),
market_row("2025-01-07", 13.0, 10_000), paused,
]); market_row("2025-01-06", 12.0, 300),
market_row("2025-01-07", 13.0, 10_000),
],
);
assert_eq!( assert_eq!(
series.decision_volume_moving_average( series.decision_volume_moving_average(