Files
fidc-backtest-engine/crates/fidc-core/src/data.rs
2026-04-23 19:43:36 -07:00

1995 lines
63 KiB
Rust

use std::collections::{BTreeMap, HashMap};
use std::fs;
use std::path::Path;
use chrono::{NaiveDate, NaiveDateTime};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use crate::calendar::TradingCalendar;
use crate::instrument::Instrument;
mod date_format {
use chrono::NaiveDate;
use serde::{self, Deserialize, Deserializer, Serializer};
const FORMAT: &str = "%Y-%m-%d";
pub fn serialize<S>(date: &NaiveDate, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&date.format(FORMAT).to_string())
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<NaiveDate, D::Error>
where
D: Deserializer<'de>,
{
let text = String::deserialize(deserializer)?;
NaiveDate::parse_from_str(&text, FORMAT).map_err(serde::de::Error::custom)
}
}
mod datetime_format {
use chrono::NaiveDateTime;
use serde::{self, Deserialize, Deserializer, Serializer};
const FORMAT: &str = "%Y-%m-%d %H:%M:%S";
pub fn serialize<S>(date: &NaiveDateTime, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&date.format(FORMAT).to_string())
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<NaiveDateTime, D::Error>
where
D: Deserializer<'de>,
{
let text = String::deserialize(deserializer)?;
NaiveDateTime::parse_from_str(&text, FORMAT).map_err(serde::de::Error::custom)
}
}
#[derive(Debug, Error)]
pub enum DataSetError {
#[error("failed to read file {path}: {source}")]
Io {
path: String,
#[source]
source: std::io::Error,
},
#[error("invalid csv row in {path} at line {line}: {message}")]
InvalidRow {
path: String,
line: usize,
message: String,
},
#[error("benchmark file contains multiple benchmark codes")]
MultipleBenchmarks,
#[error("missing data for {kind} on {date} / {symbol}")]
MissingSnapshot {
kind: &'static str,
date: NaiveDate,
symbol: String,
},
#[error("benchmark snapshot missing for {date}")]
MissingBenchmark { date: NaiveDate },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PriceField {
DayOpen,
Open,
Close,
Last,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DailyMarketSnapshot {
#[serde(with = "date_format")]
pub date: NaiveDate,
pub symbol: String,
pub timestamp: Option<String>,
pub day_open: f64,
pub open: f64,
pub high: f64,
pub low: f64,
pub close: f64,
pub last_price: f64,
pub bid1: f64,
pub ask1: f64,
pub prev_close: f64,
pub volume: u64,
pub tick_volume: u64,
pub bid1_volume: u64,
pub ask1_volume: u64,
pub trading_phase: Option<String>,
pub paused: bool,
pub upper_limit: f64,
pub lower_limit: f64,
pub price_tick: f64,
}
impl DailyMarketSnapshot {
pub fn price(&self, field: PriceField) -> f64 {
match field {
PriceField::DayOpen => self.day_open,
PriceField::Open => self.open,
PriceField::Close => self.close,
PriceField::Last => self.last_price,
}
}
pub fn buy_price(&self, field: PriceField) -> f64 {
match field {
PriceField::Last if self.ask1.is_finite() && self.ask1 > 0.0 => self.ask1,
_ => self.price(field),
}
}
pub fn sell_price(&self, field: PriceField) -> f64 {
match field {
PriceField::Last if self.bid1.is_finite() && self.bid1 > 0.0 => self.bid1,
_ => self.price(field),
}
}
pub fn liquidity_for_buy(&self) -> u64 {
self.ask1_volume
}
pub fn liquidity_for_sell(&self) -> u64 {
self.bid1_volume
}
pub fn effective_price_tick(&self) -> f64 {
if self.price_tick.is_finite() && self.price_tick > 0.0 {
self.price_tick
} else {
0.01
}
}
pub fn is_at_upper_limit_price(&self, price: f64) -> bool {
if !self.upper_limit.is_finite() || self.upper_limit <= 0.0 {
return false;
}
price >= self.upper_limit - self.effective_price_tick() + 1e-6
}
pub fn is_at_lower_limit_price(&self, price: f64) -> bool {
if !self.lower_limit.is_finite() || self.lower_limit <= 0.0 {
return false;
}
price <= self.lower_limit + self.effective_price_tick() - 1e-6
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DailyFactorSnapshot {
#[serde(with = "date_format")]
pub date: NaiveDate,
pub symbol: String,
pub market_cap_bn: f64,
pub free_float_cap_bn: f64,
pub pe_ttm: f64,
pub turnover_ratio: Option<f64>,
pub effective_turnover_ratio: Option<f64>,
#[serde(default)]
pub extra_factors: BTreeMap<String, f64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BenchmarkSnapshot {
#[serde(with = "date_format")]
pub date: NaiveDate,
pub benchmark: String,
pub open: f64,
pub close: f64,
pub prev_close: f64,
pub volume: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CandidateEligibility {
#[serde(with = "date_format")]
pub date: NaiveDate,
pub symbol: String,
pub is_st: bool,
pub is_new_listing: bool,
pub is_paused: bool,
pub allow_buy: bool,
pub allow_sell: bool,
pub is_kcb: bool,
pub is_one_yuan: bool,
}
impl CandidateEligibility {
pub fn eligible_for_selection(&self) -> bool {
!self.is_st
&& !self.is_new_listing
&& !self.is_paused
&& !self.is_kcb
&& !self.is_one_yuan
&& self.allow_buy
&& self.allow_sell
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CorporateAction {
#[serde(with = "date_format")]
pub date: NaiveDate,
pub symbol: String,
#[serde(default, with = "optional_date_format")]
pub payable_date: Option<NaiveDate>,
pub share_cash: f64,
pub share_bonus: f64,
pub share_gift: f64,
pub issue_quantity: f64,
pub issue_price: f64,
pub reform: bool,
pub adjust_factor: Option<f64>,
#[serde(default)]
pub successor_symbol: Option<String>,
#[serde(default)]
pub successor_ratio: Option<f64>,
#[serde(default)]
pub successor_cash: Option<f64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IntradayExecutionQuote {
#[serde(with = "date_format")]
pub date: NaiveDate,
pub symbol: String,
#[serde(with = "datetime_format")]
pub timestamp: NaiveDateTime,
pub last_price: f64,
pub bid1: f64,
pub ask1: f64,
pub bid1_volume: u64,
pub ask1_volume: u64,
#[serde(default)]
pub volume_delta: u64,
#[serde(default)]
pub amount_delta: f64,
pub trading_phase: Option<String>,
}
impl IntradayExecutionQuote {
pub fn buy_price(&self) -> Option<f64> {
if self.ask1.is_finite() && self.ask1 > 0.0 {
Some(self.ask1)
} else if self.last_price.is_finite() && self.last_price > 0.0 {
Some(self.last_price)
} else {
None
}
}
pub fn sell_price(&self) -> Option<f64> {
if self.bid1.is_finite() && self.bid1 > 0.0 {
Some(self.bid1)
} else if self.last_price.is_finite() && self.last_price > 0.0 {
Some(self.last_price)
} else {
None
}
}
}
impl CorporateAction {
pub fn split_ratio(&self) -> f64 {
1.0 + self.share_bonus.max(0.0) + self.share_gift.max(0.0)
}
pub fn has_effect(&self) -> bool {
self.share_cash.abs() > f64::EPSILON
|| (self.split_ratio() - 1.0).abs() > f64::EPSILON
|| self.issue_quantity.abs() > f64::EPSILON
|| self.reform
|| self.has_successor_conversion()
}
pub fn has_successor_conversion(&self) -> bool {
self.successor_symbol
.as_ref()
.is_some_and(|symbol| !symbol.trim().is_empty())
&& self.successor_ratio_value() > 0.0
}
pub fn successor_ratio_value(&self) -> f64 {
self.successor_ratio
.filter(|ratio| ratio.is_finite() && *ratio > 0.0)
.unwrap_or(1.0)
}
pub fn successor_cash_value(&self) -> f64 {
self.successor_cash
.filter(|cash| cash.is_finite())
.unwrap_or(0.0)
}
}
#[derive(Debug, Clone)]
pub struct DailySnapshotBundle {
pub date: NaiveDate,
pub benchmark: BenchmarkSnapshot,
pub market: Vec<DailyMarketSnapshot>,
pub factors: Vec<DailyFactorSnapshot>,
pub candidates: Vec<CandidateEligibility>,
pub corporate_actions: Vec<CorporateAction>,
}
#[derive(Debug, Clone, Serialize)]
pub struct PriceBar {
#[serde(with = "date_format")]
pub date: NaiveDate,
pub timestamp: Option<String>,
pub symbol: String,
pub frequency: String,
pub open: f64,
pub high: f64,
pub low: f64,
pub close: f64,
pub last_price: f64,
pub volume: u64,
pub amount: f64,
pub bid1: f64,
pub ask1: f64,
pub bid1_volume: u64,
pub ask1_volume: u64,
}
#[derive(Debug, Clone)]
pub struct EligibleUniverseSnapshot {
pub symbol: String,
pub market_cap_bn: f64,
pub free_float_cap_bn: f64,
}
#[derive(Debug, Clone)]
struct SymbolPriceSeries {
dates: Vec<NaiveDate>,
opens: Vec<f64>,
closes: Vec<f64>,
prev_closes: Vec<f64>,
last_prices: Vec<f64>,
volumes: Vec<f64>,
open_prefix: Vec<f64>,
close_prefix: Vec<f64>,
prev_close_prefix: Vec<f64>,
last_prefix: Vec<f64>,
volume_prefix: Vec<f64>,
}
impl SymbolPriceSeries {
fn new(rows: &[DailyMarketSnapshot]) -> Self {
let mut sorted = rows.to_vec();
sorted.sort_by_key(|row| row.date);
let dates = sorted.iter().map(|row| row.date).collect::<Vec<_>>();
let opens = sorted.iter().map(|row| row.open).collect::<Vec<_>>();
let closes = sorted.iter().map(|row| row.close).collect::<Vec<_>>();
let prev_closes = sorted.iter().map(|row| row.prev_close).collect::<Vec<_>>();
let last_prices = sorted.iter().map(|row| row.last_price).collect::<Vec<_>>();
let volumes = sorted
.iter()
.map(|row| row.volume as f64)
.collect::<Vec<_>>();
let open_prefix = prefix_sums(&opens);
let close_prefix = prefix_sums(&closes);
let prev_close_prefix = prefix_sums(&prev_closes);
let last_prefix = prefix_sums(&last_prices);
let volume_prefix = prefix_sums(&volumes);
Self {
dates,
opens,
closes,
prev_closes,
last_prices,
volumes,
open_prefix,
close_prefix,
prev_close_prefix,
last_prefix,
volume_prefix,
}
}
fn moving_average(&self, date: NaiveDate, lookback: usize, field: PriceField) -> Option<f64> {
if lookback == 0 {
return None;
}
let end = self.end_index(date)?;
if end < lookback {
return None;
}
let start = end - lookback;
let prefix = self.prefix_for(field);
let sum = prefix[end] - prefix[start];
Some(sum / lookback as f64)
}
fn trailing_values(&self, date: NaiveDate, lookback: usize, field: PriceField) -> Vec<f64> {
let Some(end) = self.end_index(date) else {
return Vec::new();
};
let start = end.saturating_sub(lookback);
self.values_for(field)[start..end].to_vec()
}
fn decision_price_on_or_before(&self, date: NaiveDate) -> Option<f64> {
let end = self.decision_end_index(date)?;
if end == 0 {
return None;
}
self.prev_closes.get(end - 1).copied()
}
fn decision_end_index(&self, date: NaiveDate) -> Option<usize> {
match self.dates.binary_search(&date) {
Ok(idx) => Some(idx + 1),
Err(0) => None,
Err(idx) => Some(idx),
}
}
fn decision_close_moving_average(&self, date: NaiveDate, lookback: usize) -> Option<f64> {
if lookback == 0 {
return None;
}
let end = self.decision_end_index(date)?;
if end < lookback {
return None;
}
let start = end - lookback;
let sum = self.prev_close_prefix[end] - self.prev_close_prefix[start];
Some(sum / lookback as f64)
}
fn decision_close_rolling_average(&self, date: NaiveDate, lookback: usize) -> Option<f64> {
if lookback == 0 {
return None;
}
let end = self.decision_end_index(date)?;
if end == 0 {
return None;
}
let start = end.saturating_sub(lookback);
let count = end.saturating_sub(start);
if count == 0 {
return None;
}
let sum = self.prev_close_prefix[end] - self.prev_close_prefix[start];
Some(sum / count as f64)
}
fn decision_volume_moving_average(&self, date: NaiveDate, lookback: usize) -> Option<f64> {
if lookback == 0 {
return None;
}
let end = self.decision_end_index(date)?;
if end < lookback {
return None;
}
let start = end - lookback;
let sum = self.volume_prefix[end] - self.volume_prefix[start];
Some(sum / lookback as f64)
}
fn decision_volume_rolling_average(&self, date: NaiveDate, lookback: usize) -> Option<f64> {
if lookback == 0 {
return None;
}
let end = self.decision_end_index(date)?;
if end == 0 {
return None;
}
let start = end.saturating_sub(lookback);
let count = end.saturating_sub(start);
if count == 0 {
return None;
}
let sum = self.volume_prefix[end] - self.volume_prefix[start];
Some(sum / count as f64)
}
fn end_index(&self, date: NaiveDate) -> Option<usize> {
match self.dates.binary_search(&date) {
Ok(idx) => Some(idx + 1),
Err(0) => None,
Err(idx) => Some(idx),
}
}
fn values_for(&self, field: PriceField) -> &[f64] {
match field {
PriceField::DayOpen => &self.opens,
PriceField::Open => &self.opens,
PriceField::Close => &self.closes,
PriceField::Last => &self.last_prices,
}
}
fn price_on_or_before(&self, date: NaiveDate, field: PriceField) -> Option<f64> {
let end = self.end_index(date)?;
if end == 0 {
return None;
}
self.values_for(field).get(end - 1).copied()
}
fn prefix_for(&self, field: PriceField) -> &[f64] {
match field {
PriceField::DayOpen => &self.open_prefix,
PriceField::Open => &self.open_prefix,
PriceField::Close => &self.close_prefix,
PriceField::Last => &self.last_prefix,
}
}
}
#[derive(Debug, Clone)]
struct BenchmarkPriceSeries {
dates: Vec<NaiveDate>,
opens: Vec<f64>,
closes: Vec<f64>,
open_prefix: Vec<f64>,
close_prefix: Vec<f64>,
}
impl BenchmarkPriceSeries {
fn new(rows: &[BenchmarkSnapshot]) -> Self {
let mut sorted = rows.to_vec();
sorted.sort_by_key(|row| row.date);
let dates = sorted.iter().map(|row| row.date).collect::<Vec<_>>();
let opens = sorted.iter().map(|row| row.open).collect::<Vec<_>>();
let closes = sorted.iter().map(|row| row.close).collect::<Vec<_>>();
let open_prefix = prefix_sums(&opens);
let close_prefix = prefix_sums(&closes);
Self {
dates,
opens,
closes,
open_prefix,
close_prefix,
}
}
fn moving_average(&self, date: NaiveDate, lookback: usize) -> Option<f64> {
self.moving_average_for(date, lookback, PriceField::Close)
}
fn moving_average_for(
&self,
date: NaiveDate,
lookback: usize,
field: PriceField,
) -> Option<f64> {
if lookback == 0 {
return None;
}
let end = match self.dates.binary_search(&date) {
Ok(idx) => idx + 1,
Err(0) => return None,
Err(idx) => idx,
};
if end < lookback {
return None;
}
let start = end - lookback;
let prefix = match field {
PriceField::DayOpen | PriceField::Open => &self.open_prefix,
PriceField::Close | PriceField::Last => &self.close_prefix,
};
let sum = prefix[end] - prefix[start];
Some(sum / lookback as f64)
}
fn trailing_values(&self, date: NaiveDate, lookback: usize) -> Vec<f64> {
let end = match self.dates.binary_search(&date) {
Ok(idx) => idx + 1,
Err(0) => return Vec::new(),
Err(idx) => idx,
};
let start = end.saturating_sub(lookback);
self.closes[start..end].to_vec()
}
}
#[derive(Debug, Clone)]
pub struct DataSet {
instruments: HashMap<String, Instrument>,
calendar: TradingCalendar,
market_by_date: BTreeMap<NaiveDate, Vec<DailyMarketSnapshot>>,
market_index: HashMap<(NaiveDate, String), DailyMarketSnapshot>,
factor_by_date: BTreeMap<NaiveDate, Vec<DailyFactorSnapshot>>,
factor_index: HashMap<(NaiveDate, String), DailyFactorSnapshot>,
candidate_by_date: BTreeMap<NaiveDate, Vec<CandidateEligibility>>,
candidate_index: HashMap<(NaiveDate, String), CandidateEligibility>,
corporate_actions_by_date: BTreeMap<NaiveDate, Vec<CorporateAction>>,
execution_quotes_index: HashMap<(NaiveDate, String), Vec<IntradayExecutionQuote>>,
benchmark_by_date: BTreeMap<NaiveDate, BenchmarkSnapshot>,
market_series_by_symbol: HashMap<String, SymbolPriceSeries>,
benchmark_series_cache: BenchmarkPriceSeries,
eligible_universe_by_date: BTreeMap<NaiveDate, Vec<EligibleUniverseSnapshot>>,
benchmark_code: String,
}
impl DataSet {
pub fn from_csv_dir(path: &Path) -> Result<Self, DataSetError> {
let instruments = read_instruments(&path.join("instruments.csv"))?;
let market = read_market(&path.join("market.csv"))?;
let factors = read_factors(&path.join("factors.csv"))?;
let candidates = read_candidates(&path.join("candidate_flags.csv"))?;
let benchmarks = read_benchmarks(&path.join("benchmark.csv"))?;
let corporate_actions_path = path.join("corporate_actions.csv");
let corporate_actions = if corporate_actions_path.exists() {
read_corporate_actions(&corporate_actions_path)?
} else {
Vec::new()
};
let execution_quotes_path = path.join("execution_quotes.csv");
let execution_quotes = if execution_quotes_path.exists() {
read_execution_quotes(&execution_quotes_path)?
} else {
Vec::new()
};
Self::from_components_with_actions_and_quotes(
instruments,
market,
factors,
candidates,
benchmarks,
corporate_actions,
execution_quotes,
)
}
pub fn from_partitioned_dir(path: &Path) -> Result<Self, DataSetError> {
let instruments = read_instruments(&path.join("instruments.csv"))?;
let benchmarks = read_partitioned_dir(&path.join("benchmark"), read_benchmarks)?;
let market = read_partitioned_dir(&path.join("market"), read_market)?;
let factors = read_partitioned_dir(&path.join("factors"), read_factors)?;
let candidates = read_partitioned_dir(&path.join("candidates"), read_candidates)?;
let corporate_actions_dir = path.join("corporate_actions");
let corporate_actions = if corporate_actions_dir.exists() {
read_partitioned_dir(&corporate_actions_dir, read_corporate_actions)?
} else {
Vec::new()
};
let execution_quotes_dir = path.join("execution_quotes");
let execution_quotes = if execution_quotes_dir.exists() {
read_partitioned_dir(&execution_quotes_dir, read_execution_quotes)?
} else {
Vec::new()
};
Self::from_components_with_actions_and_quotes(
instruments,
market,
factors,
candidates,
benchmarks,
corporate_actions,
execution_quotes,
)
}
pub fn from_components(
instruments: Vec<Instrument>,
market: Vec<DailyMarketSnapshot>,
factors: Vec<DailyFactorSnapshot>,
candidates: Vec<CandidateEligibility>,
benchmarks: Vec<BenchmarkSnapshot>,
) -> Result<Self, DataSetError> {
Self::from_components_with_actions_and_quotes(
instruments,
market,
factors,
candidates,
benchmarks,
Vec::new(),
Vec::new(),
)
}
pub fn from_components_with_actions(
instruments: Vec<Instrument>,
market: Vec<DailyMarketSnapshot>,
factors: Vec<DailyFactorSnapshot>,
candidates: Vec<CandidateEligibility>,
benchmarks: Vec<BenchmarkSnapshot>,
corporate_actions: Vec<CorporateAction>,
) -> Result<Self, DataSetError> {
Self::from_components_with_actions_and_quotes(
instruments,
market,
factors,
candidates,
benchmarks,
corporate_actions,
Vec::new(),
)
}
pub fn from_components_with_actions_and_quotes(
instruments: Vec<Instrument>,
market: Vec<DailyMarketSnapshot>,
factors: Vec<DailyFactorSnapshot>,
candidates: Vec<CandidateEligibility>,
benchmarks: Vec<BenchmarkSnapshot>,
corporate_actions: Vec<CorporateAction>,
execution_quotes: Vec<IntradayExecutionQuote>,
) -> Result<Self, DataSetError> {
let benchmark_code = collect_benchmark_code(&benchmarks)?;
let calendar = TradingCalendar::new(benchmarks.iter().map(|item| item.date).collect());
let instruments = instruments
.into_iter()
.map(|instrument| (instrument.symbol.clone(), instrument))
.collect::<HashMap<_, _>>();
let market_by_date = group_by_date(market.clone(), |item| item.date);
let market_index = market
.into_iter()
.map(|item| ((item.date, item.symbol.clone()), item))
.collect::<HashMap<_, _>>();
let factor_by_date = group_by_date(factors.clone(), |item| item.date);
let factor_index = factors
.into_iter()
.map(|item| ((item.date, item.symbol.clone()), item))
.collect::<HashMap<_, _>>();
let candidate_by_date = group_by_date(candidates.clone(), |item| item.date);
let candidate_index = candidates
.into_iter()
.map(|item| ((item.date, item.symbol.clone()), item))
.collect::<HashMap<_, _>>();
let corporate_actions_by_date = group_by_date(corporate_actions, |item| item.date);
let execution_quotes_index = build_execution_quote_index(execution_quotes);
let benchmark_by_date = benchmarks
.into_iter()
.map(|item| (item.date, item))
.collect::<BTreeMap<_, _>>();
let market_series_by_symbol = build_market_series(&market_by_date);
let benchmark_series_cache =
BenchmarkPriceSeries::new(&benchmark_by_date.values().cloned().collect::<Vec<_>>());
let eligible_universe_by_date =
build_eligible_universe(&factor_by_date, &candidate_index, &market_index);
Ok(Self {
instruments,
calendar,
market_by_date,
market_index,
factor_by_date,
factor_index,
candidate_by_date,
candidate_index,
corporate_actions_by_date,
execution_quotes_index,
benchmark_by_date,
market_series_by_symbol,
benchmark_series_cache,
eligible_universe_by_date,
benchmark_code,
})
}
pub fn calendar(&self) -> &TradingCalendar {
&self.calendar
}
pub fn benchmark_code(&self) -> &str {
&self.benchmark_code
}
pub fn instruments(&self) -> &HashMap<String, Instrument> {
&self.instruments
}
pub fn all_instruments(&self) -> Vec<&Instrument> {
let mut instruments = self.instruments.values().collect::<Vec<_>>();
instruments.sort_by(|left, right| left.symbol.cmp(&right.symbol));
instruments
}
pub fn instruments_history(&self, symbols: &[&str]) -> Vec<&Instrument> {
symbols
.iter()
.filter_map(|symbol| self.instruments.get(*symbol))
.collect()
}
pub fn active_instruments(&self, date: NaiveDate, symbols: &[&str]) -> Vec<&Instrument> {
symbols
.iter()
.filter_map(|symbol| self.instruments.get(*symbol))
.filter(|instrument| instrument.is_active_on(date))
.collect()
}
pub fn instrument(&self, symbol: &str) -> Option<&Instrument> {
self.instruments.get(symbol)
}
pub fn market(&self, date: NaiveDate, symbol: &str) -> Option<&DailyMarketSnapshot> {
self.market_index.get(&(date, symbol.to_string()))
}
pub fn factor(&self, date: NaiveDate, symbol: &str) -> Option<&DailyFactorSnapshot> {
self.factor_index.get(&(date, symbol.to_string()))
}
pub fn candidate(&self, date: NaiveDate, symbol: &str) -> Option<&CandidateEligibility> {
self.candidate_index.get(&(date, symbol.to_string()))
}
pub fn benchmark(&self, date: NaiveDate) -> Option<&BenchmarkSnapshot> {
self.benchmark_by_date.get(&date)
}
pub fn corporate_actions_on(&self, date: NaiveDate) -> &[CorporateAction] {
self.corporate_actions_by_date
.get(&date)
.map(Vec::as_slice)
.unwrap_or(&[])
}
pub fn execution_quotes_on(&self, date: NaiveDate, symbol: &str) -> &[IntradayExecutionQuote] {
self.execution_quotes_index
.get(&(date, symbol.to_string()))
.map(Vec::as_slice)
.unwrap_or(&[])
}
pub fn execution_quotes_on_date(&self, date: NaiveDate) -> Vec<IntradayExecutionQuote> {
let mut quotes = self
.execution_quotes_index
.iter()
.filter(|((quote_date, _), _)| *quote_date == date)
.flat_map(|(_, rows)| rows.iter().cloned())
.collect::<Vec<_>>();
quotes.sort_by(|left, right| {
left.timestamp
.cmp(&right.timestamp)
.then_with(|| left.symbol.cmp(&right.symbol))
});
quotes
}
pub fn benchmark_series(&self) -> Vec<BenchmarkSnapshot> {
self.benchmark_by_date.values().cloned().collect()
}
pub fn history_bars(
&self,
date: NaiveDate,
symbol: &str,
bar_count: usize,
frequency: &str,
field: &str,
include_now: bool,
) -> Vec<f64> {
self.history_bars_at(date, None, symbol, bar_count, frequency, field, include_now)
}
pub fn history_bars_at(
&self,
date: NaiveDate,
active_datetime: Option<NaiveDateTime>,
symbol: &str,
bar_count: usize,
frequency: &str,
field: &str,
include_now: bool,
) -> Vec<f64> {
if bar_count == 0 {
return Vec::new();
}
match normalize_history_frequency(frequency).as_deref() {
Some("1d") => self.history_daily_values(date, symbol, bar_count, field, include_now),
Some("1m") | Some("tick") => self.history_intraday_values(
date,
active_datetime,
symbol,
bar_count,
field,
include_now,
),
_ => Vec::new(),
}
}
pub fn history_daily_snapshots(
&self,
date: NaiveDate,
symbol: &str,
bar_count: usize,
include_now: bool,
) -> Vec<DailyMarketSnapshot> {
if bar_count == 0 {
return Vec::new();
}
let mut snapshots = self
.market_by_date
.iter()
.filter(|(day, _)| {
if include_now {
**day <= date
} else {
**day < date
}
})
.flat_map(|(_, rows)| rows.iter())
.filter(|row| row.symbol == symbol)
.cloned()
.collect::<Vec<_>>();
snapshots.sort_by_key(|row| row.date);
take_last(snapshots, bar_count)
}
pub fn history_intraday_quotes(
&self,
date: NaiveDate,
symbol: &str,
bar_count: usize,
include_now: bool,
) -> Vec<IntradayExecutionQuote> {
self.history_intraday_quotes_at(date, None, symbol, bar_count, include_now)
}
pub fn history_intraday_quotes_at(
&self,
date: NaiveDate,
active_datetime: Option<NaiveDateTime>,
symbol: &str,
bar_count: usize,
include_now: bool,
) -> Vec<IntradayExecutionQuote> {
if bar_count == 0 {
return Vec::new();
}
let mut quotes = self
.execution_quotes_index
.iter()
.filter(|((_, quote_symbol), _)| quote_symbol == symbol)
.flat_map(|(_, rows)| rows.iter())
.filter(|quote| intraday_quote_visible(quote, date, active_datetime, include_now))
.cloned()
.collect::<Vec<_>>();
quotes.sort_by_key(|quote| quote.timestamp);
take_last(quotes, bar_count)
}
pub fn trading_dates(&self, start: NaiveDate, end: NaiveDate) -> Vec<NaiveDate> {
self.calendar.trading_dates(start, end)
}
pub fn previous_trading_date(&self, date: NaiveDate, n: usize) -> Option<NaiveDate> {
self.calendar.previous_trading_date(date, n)
}
pub fn next_trading_date(&self, date: NaiveDate, n: usize) -> Option<NaiveDate> {
self.calendar.next_trading_date(date, n)
}
pub fn is_suspended_flags(&self, date: NaiveDate, symbol: &str, count: usize) -> Vec<bool> {
self.historical_daily_flags(date, symbol, count, |candidate, market| {
candidate.is_some_and(|row| row.is_paused) || market.is_some_and(|row| row.paused)
})
}
pub fn is_st_stock_flags(&self, date: NaiveDate, symbol: &str, count: usize) -> Vec<bool> {
self.historical_daily_flags(date, symbol, count, |candidate, _| {
candidate.is_some_and(|row| row.is_st)
})
}
pub fn get_price(
&self,
symbol: &str,
start: NaiveDate,
end: NaiveDate,
frequency: &str,
) -> Vec<PriceBar> {
if start > end {
return Vec::new();
}
match normalize_history_frequency(frequency).as_deref() {
Some("1d") => self
.market_by_date
.range(start..=end)
.flat_map(|(_, rows)| rows.iter())
.filter(|row| row.symbol == symbol)
.map(daily_market_price_bar)
.collect(),
Some("1m") | Some("tick") => {
let mut bars = self
.execution_quotes_index
.iter()
.filter(|((date, quote_symbol), _)| {
quote_symbol == symbol && *date >= start && *date <= end
})
.flat_map(|(_, rows)| rows.iter())
.map(intraday_quote_price_bar)
.collect::<Vec<_>>();
bars.sort_by(|left, right| {
left.date
.cmp(&right.date)
.then_with(|| left.timestamp.cmp(&right.timestamp))
});
bars
}
_ => Vec::new(),
}
}
pub fn price(&self, date: NaiveDate, symbol: &str, field: PriceField) -> Option<f64> {
let snapshot = self.market(date, symbol)?;
Some(snapshot.price(field))
}
pub fn price_on_or_before(
&self,
date: NaiveDate,
symbol: &str,
field: PriceField,
) -> Option<f64> {
self.market_series_by_symbol
.get(symbol)
.and_then(|series| series.price_on_or_before(date, field))
}
pub fn factor_snapshots_on(&self, date: NaiveDate) -> Vec<&DailyFactorSnapshot> {
self.factor_by_date
.get(&date)
.map(|rows| rows.iter().collect())
.unwrap_or_default()
}
pub fn market_snapshots_on(&self, date: NaiveDate) -> Vec<&DailyMarketSnapshot> {
self.market_by_date
.get(&date)
.map(|rows| rows.iter().collect())
.unwrap_or_default()
}
pub fn candidate_snapshots_on(&self, date: NaiveDate) -> Vec<&CandidateEligibility> {
self.candidate_by_date
.get(&date)
.map(|rows| rows.iter().collect())
.unwrap_or_default()
}
pub fn bundle_on(&self, date: NaiveDate) -> Result<DailySnapshotBundle, DataSetError> {
let benchmark = self
.benchmark(date)
.cloned()
.ok_or(DataSetError::MissingBenchmark { date })?;
Ok(DailySnapshotBundle {
date,
benchmark,
market: self.market_by_date.get(&date).cloned().unwrap_or_default(),
factors: self.factor_by_date.get(&date).cloned().unwrap_or_default(),
candidates: self
.candidate_by_date
.get(&date)
.cloned()
.unwrap_or_default(),
corporate_actions: self
.corporate_actions_by_date
.get(&date)
.cloned()
.unwrap_or_default(),
})
}
pub fn benchmark_closes_up_to(&self, date: NaiveDate, lookback: usize) -> Vec<f64> {
self.benchmark_series_cache.trailing_values(date, lookback)
}
pub fn market_closes_up_to(&self, date: NaiveDate, symbol: &str, lookback: usize) -> Vec<f64> {
self.market_series_by_symbol
.get(symbol)
.map(|series| series.trailing_values(date, lookback, PriceField::Close))
.unwrap_or_default()
}
fn history_daily_values(
&self,
date: NaiveDate,
symbol: &str,
bar_count: usize,
field: &str,
include_now: bool,
) -> Vec<f64> {
self.history_daily_snapshots(date, symbol, bar_count, include_now)
.into_iter()
.filter_map(|row| daily_market_numeric_value(&row, field))
.collect()
}
fn history_intraday_values(
&self,
date: NaiveDate,
active_datetime: Option<NaiveDateTime>,
symbol: &str,
bar_count: usize,
field: &str,
include_now: bool,
) -> Vec<f64> {
self.history_intraday_quotes_at(date, active_datetime, symbol, bar_count, include_now)
.into_iter()
.filter_map(|row| intraday_quote_numeric_value(&row, field))
.collect()
}
fn historical_daily_flags<F>(
&self,
date: NaiveDate,
symbol: &str,
count: usize,
evaluator: F,
) -> Vec<bool>
where
F: Fn(Option<&CandidateEligibility>, Option<&DailyMarketSnapshot>) -> bool,
{
if count == 0 {
return Vec::new();
}
let days = self
.calendar
.iter()
.filter(|day| *day <= date)
.collect::<Vec<_>>();
let start = days.len().saturating_sub(count);
days[start..]
.iter()
.map(|day| {
evaluator(
self.candidate_index.get(&(*day, symbol.to_string())),
self.market_index.get(&(*day, symbol.to_string())),
)
})
.collect()
}
pub fn market_decision_close(&self, date: NaiveDate, symbol: &str) -> Option<f64> {
self.market_series_by_symbol
.get(symbol)
.and_then(|series| series.decision_price_on_or_before(date))
}
pub fn market_decision_close_moving_average(
&self,
date: NaiveDate,
symbol: &str,
lookback: usize,
) -> Option<f64> {
self.market_series_by_symbol
.get(symbol)
.and_then(|series| series.decision_close_moving_average(date, lookback))
}
pub fn market_decision_volume_moving_average(
&self,
date: NaiveDate,
symbol: &str,
lookback: usize,
) -> Option<f64> {
self.market_series_by_symbol
.get(symbol)
.and_then(|series| series.decision_volume_moving_average(date, lookback))
}
pub fn factor_numeric_value(&self, date: NaiveDate, symbol: &str, field: &str) -> Option<f64> {
self.factor(date, symbol)
.and_then(|snapshot| factor_numeric_value(snapshot, field))
}
pub fn factor_moving_average(
&self,
date: NaiveDate,
symbol: &str,
field: &str,
lookback: usize,
) -> Option<f64> {
if lookback == 0 {
return None;
}
let dates = self.calendar.trailing_days(date, lookback);
if dates.is_empty() {
return None;
}
let mut sum = 0.0_f64;
let mut count = 0usize;
for trading_day in dates {
let snapshot = self.factor(trading_day, symbol)?;
let value = factor_numeric_value(snapshot, field)?;
sum += value;
count += 1;
}
if count == 0 {
None
} else {
Some(sum / count as f64)
}
}
pub fn market_decision_numeric_moving_average(
&self,
date: NaiveDate,
symbol: &str,
field: &str,
lookback: usize,
) -> Option<f64> {
match field {
"close" | "prev_close" | "stock_close" | "price" => self
.market_series_by_symbol
.get(symbol)
.and_then(|series| series.decision_close_rolling_average(date, lookback)),
"volume" | "stock_volume" => self
.market_series_by_symbol
.get(symbol)
.and_then(|series| series.decision_volume_rolling_average(date, lookback)),
"day_open" | "dayopen" => {
self.market_moving_average(date, symbol, lookback, PriceField::DayOpen)
}
"open" => self.market_moving_average(date, symbol, lookback, PriceField::Open),
"last" | "last_price" => {
self.market_moving_average(date, symbol, lookback, PriceField::Last)
}
other => self.factor_moving_average(date, symbol, other, lookback),
}
}
pub fn market_moving_average(
&self,
date: NaiveDate,
symbol: &str,
lookback: usize,
field: PriceField,
) -> Option<f64> {
self.market_series_by_symbol
.get(symbol)
.and_then(|series| series.moving_average(date, lookback, field))
}
pub fn benchmark_moving_average(&self, date: NaiveDate, lookback: usize) -> Option<f64> {
self.benchmark_series_cache.moving_average(date, lookback)
}
pub fn benchmark_open_moving_average(&self, date: NaiveDate, lookback: usize) -> Option<f64> {
self.benchmark_series_cache
.moving_average_for(date, lookback, PriceField::Open)
}
pub fn market_open_moving_average(
&self,
date: NaiveDate,
symbol: &str,
lookback: usize,
) -> Option<f64> {
self.market_moving_average(date, symbol, lookback, PriceField::Open)
}
pub fn eligible_universe_on(&self, date: NaiveDate) -> &[EligibleUniverseSnapshot] {
self.eligible_universe_by_date
.get(&date)
.map(Vec::as_slice)
.unwrap_or(&[])
}
pub fn require_market(
&self,
date: NaiveDate,
symbol: &str,
) -> Result<&DailyMarketSnapshot, DataSetError> {
self.market(date, symbol)
.ok_or_else(|| DataSetError::MissingSnapshot {
kind: "market",
date,
symbol: symbol.to_string(),
})
}
pub fn require_candidate(
&self,
date: NaiveDate,
symbol: &str,
) -> Result<&CandidateEligibility, DataSetError> {
self.candidate(date, symbol)
.ok_or_else(|| DataSetError::MissingSnapshot {
kind: "candidate",
date,
symbol: symbol.to_string(),
})
}
pub fn require_factor(
&self,
date: NaiveDate,
symbol: &str,
) -> Result<&DailyFactorSnapshot, DataSetError> {
self.factor(date, symbol)
.ok_or_else(|| DataSetError::MissingSnapshot {
kind: "factor",
date,
symbol: symbol.to_string(),
})
}
}
fn read_instruments(path: &Path) -> Result<Vec<Instrument>, DataSetError> {
let rows = read_rows(path)?;
let mut instruments = Vec::new();
for row in rows {
instruments.push(Instrument {
symbol: row.get(0)?.to_string(),
name: row.get(1)?.to_string(),
board: row.get(2)?.to_string(),
round_lot: row.parse_optional_u32(3).unwrap_or(100),
listed_at: row.parse_optional_date(4)?,
delisted_at: row.parse_optional_date(5)?,
status: row
.fields
.get(6)
.map(|value| value.trim())
.filter(|value| !value.is_empty())
.unwrap_or("active")
.to_string(),
});
}
Ok(instruments)
}
fn read_market(path: &Path) -> Result<Vec<DailyMarketSnapshot>, DataSetError> {
let rows = read_rows(path)?;
let mut snapshots = Vec::new();
for row in rows {
let open = row.parse_f64(2)?;
let close = row.parse_f64(5)?;
let prev_close = row.parse_f64(6)?;
let price_tick = row.parse_optional_f64(15).unwrap_or(0.01);
let derived_upper_limit = round_price_to_tick(prev_close * 1.10, price_tick);
let derived_lower_limit = round_price_to_tick(prev_close * 0.90, price_tick);
snapshots.push(DailyMarketSnapshot {
date: row.parse_date(0)?,
symbol: row.get(1)?.to_string(),
timestamp: row
.fields
.get(16)
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty()),
day_open: row.parse_optional_f64(11).unwrap_or(open),
open,
high: row.parse_f64(3)?,
low: row.parse_f64(4)?,
close,
last_price: row.parse_optional_f64(12).unwrap_or(close),
bid1: row.parse_optional_f64(13).unwrap_or(close),
ask1: row.parse_optional_f64(14).unwrap_or(close),
prev_close,
volume: row.parse_u64(7)?,
tick_volume: row.parse_optional_u64(17).unwrap_or_default(),
bid1_volume: row.parse_optional_u64(18).unwrap_or_default(),
ask1_volume: row.parse_optional_u64(19).unwrap_or_default(),
trading_phase: row
.fields
.get(20)
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty()),
paused: row.parse_bool(8)?,
upper_limit: row.parse_optional_f64(9).unwrap_or(derived_upper_limit),
lower_limit: row.parse_optional_f64(10).unwrap_or(derived_lower_limit),
price_tick,
});
}
Ok(snapshots)
}
fn read_factors(path: &Path) -> Result<Vec<DailyFactorSnapshot>, DataSetError> {
let rows = read_rows(path)?;
let mut snapshots = Vec::new();
for row in rows {
snapshots.push(DailyFactorSnapshot {
date: row.parse_date(0)?,
symbol: row.get(1)?.to_string(),
market_cap_bn: row.parse_f64(2)?,
free_float_cap_bn: row.parse_f64(3)?,
pe_ttm: row.parse_f64(4)?,
turnover_ratio: row.parse_optional_f64(5),
effective_turnover_ratio: row.parse_optional_f64(6),
extra_factors: row
.fields
.get(7)
.filter(|value| !value.trim().is_empty())
.and_then(|value| serde_json::from_str::<BTreeMap<String, f64>>(value).ok())
.unwrap_or_default(),
});
}
Ok(snapshots)
}
fn factor_numeric_value(snapshot: &DailyFactorSnapshot, field: &str) -> Option<f64> {
match field {
"market_cap" | "market_cap_bn" => Some(snapshot.market_cap_bn),
"free_float_cap" | "free_float_market_cap" | "free_float_cap_bn" => {
Some(snapshot.free_float_cap_bn)
}
"pe_ttm" => Some(snapshot.pe_ttm),
"turnover_ratio" => snapshot.turnover_ratio,
"effective_turnover_ratio" => snapshot.effective_turnover_ratio,
other => snapshot.extra_factors.get(other).copied(),
}
}
fn daily_market_numeric_value(snapshot: &DailyMarketSnapshot, field: &str) -> Option<f64> {
match normalize_field(field).as_str() {
"day_open" | "dayopen" => Some(snapshot.day_open),
"open" => Some(snapshot.open),
"high" => Some(snapshot.high),
"low" => Some(snapshot.low),
"close" | "price" => Some(snapshot.close),
"last" | "last_price" => Some(snapshot.last_price),
"prev_close" | "pre_close" => Some(snapshot.prev_close),
"volume" => Some(snapshot.volume as f64),
"tick_volume" => Some(snapshot.tick_volume as f64),
"bid1" => Some(snapshot.bid1),
"ask1" => Some(snapshot.ask1),
"bid1_volume" => Some(snapshot.bid1_volume as f64),
"ask1_volume" => Some(snapshot.ask1_volume as f64),
"upper_limit" => Some(snapshot.upper_limit),
"lower_limit" => Some(snapshot.lower_limit),
"price_tick" => Some(snapshot.price_tick),
_ => None,
}
}
fn intraday_quote_numeric_value(snapshot: &IntradayExecutionQuote, field: &str) -> Option<f64> {
match normalize_field(field).as_str() {
"last" | "last_price" | "close" | "price" => Some(snapshot.last_price),
"bid1" => Some(snapshot.bid1),
"ask1" => Some(snapshot.ask1),
"bid1_volume" => Some(snapshot.bid1_volume as f64),
"ask1_volume" => Some(snapshot.ask1_volume as f64),
"volume" | "volume_delta" => Some(snapshot.volume_delta as f64),
"amount" | "amount_delta" | "total_turnover" => Some(snapshot.amount_delta),
_ => None,
}
}
fn intraday_quote_visible(
quote: &IntradayExecutionQuote,
date: NaiveDate,
active_datetime: Option<NaiveDateTime>,
include_now: bool,
) -> bool {
if quote.date < date {
return true;
}
if quote.date > date {
return false;
}
let Some(active_datetime) = active_datetime.filter(|value| value.date() == date) else {
return include_now;
};
if include_now {
quote.timestamp <= active_datetime
} else {
quote.timestamp < active_datetime
}
}
fn daily_market_price_bar(snapshot: &DailyMarketSnapshot) -> PriceBar {
PriceBar {
date: snapshot.date,
timestamp: snapshot.timestamp.clone(),
symbol: snapshot.symbol.clone(),
frequency: "1d".to_string(),
open: snapshot.open,
high: snapshot.high,
low: snapshot.low,
close: snapshot.close,
last_price: snapshot.last_price,
volume: snapshot.volume,
amount: 0.0,
bid1: snapshot.bid1,
ask1: snapshot.ask1,
bid1_volume: snapshot.bid1_volume,
ask1_volume: snapshot.ask1_volume,
}
}
fn intraday_quote_price_bar(snapshot: &IntradayExecutionQuote) -> PriceBar {
PriceBar {
date: snapshot.date,
timestamp: Some(snapshot.timestamp.format("%Y-%m-%d %H:%M:%S").to_string()),
symbol: snapshot.symbol.clone(),
frequency: "tick".to_string(),
open: snapshot.last_price,
high: snapshot.last_price,
low: snapshot.last_price,
close: snapshot.last_price,
last_price: snapshot.last_price,
volume: snapshot.volume_delta,
amount: snapshot.amount_delta,
bid1: snapshot.bid1,
ask1: snapshot.ask1,
bid1_volume: snapshot.bid1_volume,
ask1_volume: snapshot.ask1_volume,
}
}
fn normalize_field(field: &str) -> String {
field
.trim()
.trim_matches('"')
.trim_matches('\'')
.to_ascii_lowercase()
}
fn normalize_history_frequency(frequency: &str) -> Option<String> {
let normalized = normalize_field(frequency);
match normalized.as_str() {
"1d" | "d" | "day" | "daily" => Some("1d".to_string()),
"1m" | "m" | "minute" | "min" => Some("1m".to_string()),
"tick" | "t" => Some("tick".to_string()),
_ => None,
}
}
fn take_last<T>(mut rows: Vec<T>, count: usize) -> Vec<T> {
if rows.len() <= count {
return rows;
}
rows.split_off(rows.len() - count)
}
fn read_candidates(path: &Path) -> Result<Vec<CandidateEligibility>, DataSetError> {
let rows = read_rows(path)?;
let mut snapshots = Vec::new();
for row in rows {
snapshots.push(CandidateEligibility {
date: row.parse_date(0)?,
symbol: row.get(1)?.to_string(),
is_st: row.parse_bool(2)?,
is_new_listing: row.parse_bool(3)?,
is_paused: row.parse_bool(4)?,
allow_buy: row.parse_bool(5)?,
allow_sell: row.parse_bool(6)?,
is_kcb: row.parse_optional_bool(7).unwrap_or(false),
is_one_yuan: row.parse_optional_bool(8).unwrap_or(false),
});
}
Ok(snapshots)
}
fn read_benchmarks(path: &Path) -> Result<Vec<BenchmarkSnapshot>, DataSetError> {
let rows = read_rows(path)?;
let mut snapshots = Vec::new();
for row in rows {
snapshots.push(BenchmarkSnapshot {
date: row.parse_date(0)?,
benchmark: row.get(1)?.to_string(),
open: row.parse_f64(2)?,
close: row.parse_f64(3)?,
prev_close: row.parse_f64(4)?,
volume: row.parse_u64(5)?,
});
}
Ok(snapshots)
}
fn read_corporate_actions(path: &Path) -> Result<Vec<CorporateAction>, DataSetError> {
let rows = read_rows(path)?;
let mut snapshots = Vec::new();
for row in rows {
let has_payable_date = row.fields.len() >= 10;
let payable_date = if has_payable_date {
row.parse_optional_date(2)?
} else {
None
};
let offset = if has_payable_date { 1 } else { 0 };
snapshots.push(CorporateAction {
date: row.parse_date(0)?,
symbol: row.get(1)?.to_string(),
payable_date,
share_cash: row.parse_optional_f64(2 + offset).unwrap_or(0.0),
share_bonus: row.parse_optional_f64(3 + offset).unwrap_or(0.0),
share_gift: row.parse_optional_f64(4 + offset).unwrap_or(0.0),
issue_quantity: row.parse_optional_f64(5 + offset).unwrap_or(0.0),
issue_price: row.parse_optional_f64(6 + offset).unwrap_or(0.0),
reform: row.parse_optional_bool(7 + offset).unwrap_or(false),
adjust_factor: row.parse_optional_f64(8 + offset),
successor_symbol: row
.fields
.get(9 + offset)
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty()),
successor_ratio: row.parse_optional_f64(10 + offset),
successor_cash: row.parse_optional_f64(11 + offset),
});
}
Ok(snapshots)
}
fn read_execution_quotes(path: &Path) -> Result<Vec<IntradayExecutionQuote>, DataSetError> {
let rows = read_rows(path)?;
let mut quotes = Vec::new();
for row in rows {
quotes.push(IntradayExecutionQuote {
date: row.parse_date(0)?,
symbol: row.get(1)?.to_string(),
timestamp: row.parse_datetime(2)?,
last_price: row.parse_optional_f64(3).unwrap_or_default(),
bid1: row.parse_optional_f64(4).unwrap_or_default(),
ask1: row.parse_optional_f64(5).unwrap_or_default(),
bid1_volume: row.parse_optional_u64(6).unwrap_or_default(),
ask1_volume: row.parse_optional_u64(7).unwrap_or_default(),
volume_delta: row.parse_optional_u64(8).unwrap_or_default(),
amount_delta: row.parse_optional_f64(9).unwrap_or_default(),
trading_phase: row
.fields
.get(10)
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty()),
});
}
Ok(quotes)
}
struct CsvRow {
path: String,
line: usize,
fields: Vec<String>,
}
impl CsvRow {
fn get(&self, index: usize) -> Result<&str, DataSetError> {
self.fields
.get(index)
.map(String::as_str)
.ok_or_else(|| DataSetError::InvalidRow {
path: self.path.clone(),
line: self.line,
message: format!("missing column {index}"),
})
}
fn parse_date(&self, index: usize) -> Result<NaiveDate, DataSetError> {
NaiveDate::parse_from_str(self.get(index)?, "%Y-%m-%d").map_err(|err| {
DataSetError::InvalidRow {
path: self.path.clone(),
line: self.line,
message: format!("invalid date: {err}"),
}
})
}
fn parse_f64(&self, index: usize) -> Result<f64, DataSetError> {
self.get(index)?
.parse::<f64>()
.map_err(|err| DataSetError::InvalidRow {
path: self.path.clone(),
line: self.line,
message: format!("invalid f64: {err}"),
})
}
fn parse_u64(&self, index: usize) -> Result<u64, DataSetError> {
self.get(index)?
.parse::<u64>()
.map_err(|err| DataSetError::InvalidRow {
path: self.path.clone(),
line: self.line,
message: format!("invalid u64: {err}"),
})
}
fn parse_optional_f64(&self, index: usize) -> Option<f64> {
self.fields.get(index).and_then(|value| {
let trimmed = value.trim();
if trimmed.is_empty() {
None
} else {
trimmed.parse::<f64>().ok()
}
})
}
fn parse_bool(&self, index: usize) -> Result<bool, DataSetError> {
self.get(index)?
.parse::<bool>()
.map_err(|err| DataSetError::InvalidRow {
path: self.path.clone(),
line: self.line,
message: format!("invalid bool: {err}"),
})
}
fn parse_optional_bool(&self, index: usize) -> Option<bool> {
self.fields
.get(index)
.and_then(|value| value.parse::<bool>().ok())
}
fn parse_optional_date(&self, index: usize) -> Result<Option<NaiveDate>, DataSetError> {
let Some(value) = self.fields.get(index) else {
return Ok(None);
};
let trimmed = value.trim();
if trimmed.is_empty() {
return Ok(None);
}
NaiveDate::parse_from_str(trimmed, "%Y-%m-%d")
.map(Some)
.map_err(|err| DataSetError::InvalidRow {
path: self.path.clone(),
line: self.line,
message: format!("invalid optional date: {err}"),
})
}
fn parse_datetime(&self, index: usize) -> Result<NaiveDateTime, DataSetError> {
NaiveDateTime::parse_from_str(self.get(index)?, "%Y-%m-%d %H:%M:%S").map_err(|err| {
DataSetError::InvalidRow {
path: self.path.clone(),
line: self.line,
message: format!("invalid datetime: {err}"),
}
})
}
fn parse_optional_u32(&self, index: usize) -> Option<u32> {
self.fields.get(index).and_then(|value| {
let trimmed = value.trim();
if trimmed.is_empty() {
None
} else {
trimmed.parse::<u32>().ok()
}
})
}
fn parse_optional_u64(&self, index: usize) -> Option<u64> {
self.fields.get(index).and_then(|value| {
let trimmed = value.trim();
if trimmed.is_empty() {
None
} else {
trimmed.parse::<u64>().ok()
}
})
}
}
fn read_partitioned_dir<T, F>(dir: &Path, mut loader: F) -> Result<Vec<T>, DataSetError>
where
F: FnMut(&Path) -> Result<Vec<T>, DataSetError>,
{
let mut rows = Vec::new();
let mut stack = vec![dir.to_path_buf()];
while let Some(current_dir) = stack.pop() {
let mut entries = fs::read_dir(&current_dir)
.map_err(|source| DataSetError::Io {
path: current_dir.display().to_string(),
source,
})?
.collect::<Result<Vec<_>, _>>()
.map_err(|source| DataSetError::Io {
path: current_dir.display().to_string(),
source,
})?;
entries.sort_by_key(|entry| entry.path());
for entry in entries.into_iter().rev() {
let path = entry.path();
if path.is_dir() {
stack.push(path);
continue;
}
if path.extension().and_then(|x| x.to_str()) != Some("csv") {
continue;
}
rows.extend(loader(&path)?);
}
}
Ok(rows)
}
fn read_rows(path: &Path) -> Result<Vec<CsvRow>, DataSetError> {
let content = fs::read_to_string(path).map_err(|source| DataSetError::Io {
path: path.display().to_string(),
source,
})?;
let mut rows = Vec::new();
for (line_idx, line) in content.lines().enumerate() {
let line_no = line_idx + 1;
if line_no == 1 || line.trim().is_empty() {
continue;
}
rows.push(CsvRow {
path: path.display().to_string(),
line: line_no,
fields: line
.split(',')
.map(|field| field.trim().to_string())
.collect(),
});
}
Ok(rows)
}
fn group_by_date<T, F>(rows: Vec<T>, mut date_of: F) -> BTreeMap<NaiveDate, Vec<T>>
where
F: FnMut(&T) -> NaiveDate,
{
let mut grouped = BTreeMap::<NaiveDate, Vec<T>>::new();
for row in rows {
grouped.entry(date_of(&row)).or_default().push(row);
}
grouped
}
fn collect_benchmark_code(benchmarks: &[BenchmarkSnapshot]) -> Result<String, DataSetError> {
let mut codes = benchmarks
.iter()
.map(|row| row.benchmark.clone())
.collect::<Vec<_>>();
codes.sort_unstable();
codes.dedup();
if codes.len() == 1 {
Ok(codes.remove(0))
} else {
Err(DataSetError::MultipleBenchmarks)
}
}
fn round_price_to_tick(value: f64, tick: f64) -> f64 {
let effective_tick = if tick.is_finite() && tick > 0.0 {
tick
} else {
0.01
};
((value / effective_tick).round() * effective_tick * 10000.0).round() / 10000.0
}
fn prefix_sums(values: &[f64]) -> Vec<f64> {
let mut prefix = Vec::with_capacity(values.len() + 1);
prefix.push(0.0);
for value in values {
let next = prefix.last().copied().unwrap_or_default() + *value;
prefix.push(next);
}
prefix
}
mod optional_date_format {
use chrono::NaiveDate;
use serde::{self, Deserialize, Deserializer, Serializer};
const FORMAT: &str = "%Y-%m-%d";
pub fn serialize<S>(date: &Option<NaiveDate>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match date {
Some(date) => serializer.serialize_some(&date.format(FORMAT).to_string()),
None => serializer.serialize_none(),
}
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<NaiveDate>, D::Error>
where
D: Deserializer<'de>,
{
let text = Option::<String>::deserialize(deserializer)?;
match text
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
{
Some(text) => NaiveDate::parse_from_str(text, FORMAT)
.map(Some)
.map_err(serde::de::Error::custom),
None => Ok(None),
}
}
}
fn build_market_series(
market_by_date: &BTreeMap<NaiveDate, Vec<DailyMarketSnapshot>>,
) -> HashMap<String, SymbolPriceSeries> {
let mut grouped = HashMap::<String, Vec<DailyMarketSnapshot>>::new();
for rows in market_by_date.values() {
for row in rows {
grouped
.entry(row.symbol.clone())
.or_default()
.push(row.clone());
}
}
grouped
.into_iter()
.map(|(symbol, rows)| (symbol, SymbolPriceSeries::new(&rows)))
.collect()
}
fn build_execution_quote_index(
execution_quotes: Vec<IntradayExecutionQuote>,
) -> HashMap<(NaiveDate, String), Vec<IntradayExecutionQuote>> {
let mut grouped = HashMap::<(NaiveDate, String), Vec<IntradayExecutionQuote>>::new();
for quote in execution_quotes {
grouped
.entry((quote.date, quote.symbol.clone()))
.or_default()
.push(quote);
}
for quotes in grouped.values_mut() {
quotes.sort_by_key(|quote| quote.timestamp);
}
grouped
}
fn build_eligible_universe(
factor_by_date: &BTreeMap<NaiveDate, Vec<DailyFactorSnapshot>>,
candidate_index: &HashMap<(NaiveDate, String), CandidateEligibility>,
market_index: &HashMap<(NaiveDate, String), DailyMarketSnapshot>,
) -> BTreeMap<NaiveDate, Vec<EligibleUniverseSnapshot>> {
let mut per_date = BTreeMap::<NaiveDate, Vec<EligibleUniverseSnapshot>>::new();
for (date, factors) in factor_by_date {
let mut rows = Vec::new();
for factor in factors {
if factor.market_cap_bn <= 0.0 || !factor.market_cap_bn.is_finite() {
continue;
}
let key = (*date, factor.symbol.clone());
let Some(candidate) = candidate_index.get(&key) else {
continue;
};
let Some(market) = market_index.get(&key) else {
continue;
};
if !candidate.eligible_for_selection() || market.paused {
continue;
}
rows.push(EligibleUniverseSnapshot {
symbol: factor.symbol.clone(),
market_cap_bn: factor.market_cap_bn,
free_float_cap_bn: factor.free_float_cap_bn,
});
}
rows.sort_by(|left, right| {
left.market_cap_bn
.partial_cmp(&right.market_cap_bn)
.unwrap_or(std::cmp::Ordering::Equal)
.then_with(|| left.symbol.cmp(&right.symbol))
});
per_date.insert(*date, rows);
}
per_date
}