共享因子候选索引内存

This commit is contained in:
boris
2026-06-17 10:05:55 +08:00
parent 596d64280b
commit 828b55c747
+45 -16
View File
@@ -1,6 +1,7 @@
use std::collections::{BTreeMap, HashMap, HashSet}; use std::collections::{BTreeMap, HashMap, HashSet};
use std::fs; use std::fs;
use std::path::Path; use std::path::Path;
use std::sync::Arc;
use chrono::{NaiveDate, NaiveDateTime}; use chrono::{NaiveDate, NaiveDateTime};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@@ -955,12 +956,12 @@ pub struct DataSet {
calendar: TradingCalendar, calendar: TradingCalendar,
market_by_date: BTreeMap<NaiveDate, Vec<DailyMarketSnapshot>>, market_by_date: BTreeMap<NaiveDate, Vec<DailyMarketSnapshot>>,
market_index: HashMap<(NaiveDate, String), DailyMarketSnapshot>, market_index: HashMap<(NaiveDate, String), DailyMarketSnapshot>,
factor_by_date: BTreeMap<NaiveDate, Vec<DailyFactorSnapshot>>, factor_by_date: BTreeMap<NaiveDate, Vec<Arc<DailyFactorSnapshot>>>,
factor_index: HashMap<(NaiveDate, String), DailyFactorSnapshot>, factor_index: HashMap<(NaiveDate, String), Arc<DailyFactorSnapshot>>,
factor_text_by_date: BTreeMap<NaiveDate, Vec<FactorTextValue>>, factor_text_by_date: BTreeMap<NaiveDate, Vec<FactorTextValue>>,
factor_text_index: HashMap<(NaiveDate, String, String), FactorTextValue>, factor_text_index: HashMap<(NaiveDate, String, String), FactorTextValue>,
candidate_by_date: BTreeMap<NaiveDate, Vec<CandidateEligibility>>, candidate_by_date: BTreeMap<NaiveDate, Vec<Arc<CandidateEligibility>>>,
candidate_index: HashMap<(NaiveDate, String), CandidateEligibility>, candidate_index: HashMap<(NaiveDate, String), Arc<CandidateEligibility>>,
corporate_actions_by_date: BTreeMap<NaiveDate, Vec<CorporateAction>>, corporate_actions_by_date: BTreeMap<NaiveDate, Vec<CorporateAction>>,
execution_quotes_by_date: HashMap<NaiveDate, HashMap<String, Vec<IntradayExecutionQuote>>>, execution_quotes_by_date: HashMap<NaiveDate, HashMap<String, Vec<IntradayExecutionQuote>>>,
order_book_depth_index: HashMap<(NaiveDate, String), Vec<IntradayOrderBookDepthLevel>>, order_book_depth_index: HashMap<(NaiveDate, String), Vec<IntradayOrderBookDepthLevel>>,
@@ -1205,7 +1206,11 @@ impl DataSet {
) -> Result<Self, DataSetError> { ) -> Result<Self, DataSetError> {
let benchmark_code = collect_benchmark_code(&benchmarks)?; let benchmark_code = collect_benchmark_code(&benchmarks)?;
let calendar = TradingCalendar::new(benchmarks.iter().map(|item| item.date).collect()); let calendar = TradingCalendar::new(benchmarks.iter().map(|item| item.date).collect());
let factors = normalize_factor_snapshots(factors); let factors = normalize_factor_snapshots(factors)
.into_iter()
.map(Arc::new)
.collect::<Vec<_>>();
let candidates = candidates.into_iter().map(Arc::new).collect::<Vec<_>>();
let instruments = instruments let instruments = instruments
.into_iter() .into_iter()
@@ -1218,7 +1223,7 @@ impl DataSet {
.map(|item| ((item.date, item.symbol.clone()), item)) .map(|item| ((item.date, item.symbol.clone()), item))
.collect::<HashMap<_, _>>(); .collect::<HashMap<_, _>>();
let factor_by_date = group_by_date(factors.clone(), |item| item.date); let factor_by_date = group_arc_by_date(&factors, |item| item.date);
let factor_index = factors let factor_index = factors
.into_iter() .into_iter()
.map(|item| ((item.date, item.symbol.clone()), item)) .map(|item| ((item.date, item.symbol.clone()), item))
@@ -1240,7 +1245,7 @@ impl DataSet {
.map(|item| ((item.date, item.symbol.clone(), item.field.clone()), item)) .map(|item| ((item.date, item.symbol.clone(), item.field.clone()), item))
.collect::<HashMap<_, _>>(); .collect::<HashMap<_, _>>();
let candidate_by_date = group_by_date(candidates.clone(), |item| item.date); let candidate_by_date = group_arc_by_date(&candidates, |item| item.date);
let candidate_index = candidates let candidate_index = candidates
.into_iter() .into_iter()
.map(|item| ((item.date, item.symbol.clone()), item)) .map(|item| ((item.date, item.symbol.clone()), item))
@@ -1329,11 +1334,15 @@ impl DataSet {
} }
pub fn factor(&self, date: NaiveDate, symbol: &str) -> Option<&DailyFactorSnapshot> { pub fn factor(&self, date: NaiveDate, symbol: &str) -> Option<&DailyFactorSnapshot> {
self.factor_index.get(&(date, symbol.to_string())) self.factor_index
.get(&(date, symbol.to_string()))
.map(Arc::as_ref)
} }
pub fn candidate(&self, date: NaiveDate, symbol: &str) -> Option<&CandidateEligibility> { pub fn candidate(&self, date: NaiveDate, symbol: &str) -> Option<&CandidateEligibility> {
self.candidate_index.get(&(date, symbol.to_string())) self.candidate_index
.get(&(date, symbol.to_string()))
.map(Arc::as_ref)
} }
pub fn benchmark(&self, date: NaiveDate) -> Option<&BenchmarkSnapshot> { pub fn benchmark(&self, date: NaiveDate) -> Option<&BenchmarkSnapshot> {
@@ -2089,7 +2098,7 @@ impl DataSet {
pub fn factor_snapshots_on(&self, date: NaiveDate) -> Vec<&DailyFactorSnapshot> { pub fn factor_snapshots_on(&self, date: NaiveDate) -> Vec<&DailyFactorSnapshot> {
self.factor_by_date self.factor_by_date
.get(&date) .get(&date)
.map(|rows| rows.iter().collect()) .map(|rows| rows.iter().map(Arc::as_ref).collect())
.unwrap_or_default() .unwrap_or_default()
} }
@@ -2110,7 +2119,7 @@ impl DataSet {
pub fn candidate_snapshots_on(&self, date: NaiveDate) -> Vec<&CandidateEligibility> { pub fn candidate_snapshots_on(&self, date: NaiveDate) -> Vec<&CandidateEligibility> {
self.candidate_by_date self.candidate_by_date
.get(&date) .get(&date)
.map(|rows| rows.iter().collect()) .map(|rows| rows.iter().map(Arc::as_ref).collect())
.unwrap_or_default() .unwrap_or_default()
} }
@@ -2123,11 +2132,15 @@ impl DataSet {
date, date,
benchmark, benchmark,
market: self.market_by_date.get(&date).cloned().unwrap_or_default(), market: self.market_by_date.get(&date).cloned().unwrap_or_default(),
factors: self.factor_by_date.get(&date).cloned().unwrap_or_default(), factors: self
.factor_by_date
.get(&date)
.map(|rows| rows.iter().map(|row| row.as_ref().clone()).collect())
.unwrap_or_default(),
candidates: self candidates: self
.candidate_by_date .candidate_by_date
.get(&date) .get(&date)
.cloned() .map(|rows| rows.iter().map(|row| row.as_ref().clone()).collect())
.unwrap_or_default(), .unwrap_or_default(),
corporate_actions: self corporate_actions: self
.corporate_actions_by_date .corporate_actions_by_date
@@ -2200,7 +2213,9 @@ impl DataSet {
.iter() .iter()
.map(|day| { .map(|day| {
evaluator( evaluator(
self.candidate_index.get(&(*day, symbol.to_string())), self.candidate_index
.get(&(*day, symbol.to_string()))
.map(Arc::as_ref),
self.market_index.get(&(*day, symbol.to_string())), self.market_index.get(&(*day, symbol.to_string())),
) )
}) })
@@ -3375,6 +3390,20 @@ where
grouped grouped
} }
fn group_arc_by_date<T, F>(rows: &[Arc<T>], mut date_of: F) -> BTreeMap<NaiveDate, Vec<Arc<T>>>
where
F: FnMut(&T) -> NaiveDate,
{
let mut grouped = BTreeMap::<NaiveDate, Vec<Arc<T>>>::new();
for row in rows {
grouped
.entry(date_of(row.as_ref()))
.or_default()
.push(Arc::clone(row));
}
grouped
}
fn collect_benchmark_code(benchmarks: &[BenchmarkSnapshot]) -> Result<String, DataSetError> { fn collect_benchmark_code(benchmarks: &[BenchmarkSnapshot]) -> Result<String, DataSetError> {
let mut codes = benchmarks let mut codes = benchmarks
.iter() .iter()
@@ -3523,8 +3552,8 @@ fn build_order_book_depth_index(
} }
fn build_eligible_universe( fn build_eligible_universe(
factor_by_date: &BTreeMap<NaiveDate, Vec<DailyFactorSnapshot>>, factor_by_date: &BTreeMap<NaiveDate, Vec<Arc<DailyFactorSnapshot>>>,
candidate_index: &HashMap<(NaiveDate, String), CandidateEligibility>, candidate_index: &HashMap<(NaiveDate, String), Arc<CandidateEligibility>>,
market_index: &HashMap<(NaiveDate, String), DailyMarketSnapshot>, market_index: &HashMap<(NaiveDate, String), DailyMarketSnapshot>,
instruments: &HashMap<String, Instrument>, instruments: &HashMap<String, Instrument>,
) -> BTreeMap<NaiveDate, Vec<EligibleUniverseSnapshot>> { ) -> BTreeMap<NaiveDate, Vec<EligibleUniverseSnapshot>> {