Improve jq microcap execution semantics

This commit is contained in:
boris
2026-04-18 18:02:50 +08:00
parent 9f4165e689
commit 0e2c25e4c4
26 changed files with 5058 additions and 362 deletions

654
聚宽微盘股策略.py Normal file
View File

@@ -0,0 +1,654 @@
'''
设定的市值17—26亿 可以根据指数的变化来更改
比如3300点集中在15—25亿市值最小的四十只 3400点 集中在17—27亿
对应指数乘以一个系数 对应市值选出40只
'''
from jqdata import *
from datetime import datetime, timedelta
## 初始化函数,设定要操作的股票、基准等等
def initialize(context):
set_benchmark('000852.XSHG') #对标中证1000
# True为开启动态复权模式使用真实价格交易
set_option('use_real_price', True)
# 设定成交量比例
set_option('order_volume_ratio', 1)
# 股票类交易手续费是:买入时佣金万分之三,卖出时佣金万分之三加千分之一印花税, 每笔交易佣金最低扣5块钱
set_order_cost(OrderCost(open_tax=0, close_tax=0.001, \
open_commission=0.0003, close_commission=0.0003,\
close_today_commission=0, min_commission=5), type='stock')
# 交易日计时器
g.days = 0
# 分仓常量参数,无须人为修改
g.TR = 1;
# 区间最高价
g.summit = {}
g.muster = []
# 运行状态 1/运行; 0/停运
g.OpenYN = 1
# 开始范围
#g.mystart = 13
# 截至范围
#g.myend = 23
# 调仓频率
g.refresh_rate = 15
# 运行函数1
run_daily(trade, time='10:18')
# 运行函数2
run_daily(CPtrade, time='10:17')
# 持仓数量
g.stocknum = 40
# 上证指数对应系数
g.XS = 4/500
#止盈比率
g.CloseRate = 1.07
#止损比率
g.LossRate = 0.93
# 均线上涨比率
g.RSIRate = 1.0001
# 保证金仓位比率: 1/2为半仓
g.TradeRate = 0.5
# 调试日志展示数量
g.debug_log_limit = 60
g.debug_boundary_window = 5
g.market_cap_map = {}
def _fmt_num(value, digits=2):
try:
return ('%.' + str(digits) + 'f') % float(value)
except Exception:
return str(value)
def _safe_bar_values(bar_data, field):
try:
values = bar_data[field]
except Exception:
values = getattr(bar_data, field)
try:
return [float(x) for x in list(values)]
except Exception:
try:
return list(values)
except Exception:
return []
def _safe_bar_last(bar_data, field):
values = _safe_bar_values(bar_data, field)
if values:
return values[-1]
return None
def _market_cap_text(stock):
cap = getattr(g, 'market_cap_map', {}).get(stock)
if cap is None:
return 'None'
return _fmt_num(cap, 2) + '亿'
def _describe_stock(stock, extras=None):
parts = [stock, '市值=' + _market_cap_text(stock)]
if extras:
parts.extend(extras)
return ','.join(parts)
def _log_detail_items(label, details, limit=None, chunk=10):
total = len(details)
if total == 0:
log.info(label + ': 0')
return
if limit is None:
limit = total
shown = details[:limit]
log.info('%s: 总数=%d, 展示=%d' % (label, total, len(shown)))
for idx in range(0, len(shown), chunk):
part = shown[idx:idx + chunk]
log.info('%s[%d-%d]: %s' % (
label,
idx + 1,
idx + len(part),
' | '.join(part)
))
if total > limit:
log.info('%s: 其余%d项省略' % (label, total - limit))
def _log_market_cap_snapshot(df, caller):
if df is None or len(df) == 0:
log.info('市值筛选快照[%s]: 无数据' % caller)
return
top_n = min(len(df), g.debug_log_limit)
top_details = []
for i in range(top_n):
row = df.iloc[i]
top_details.append('%d.%s:%s亿' % (
i + 1,
row['code'],
_fmt_num(row['market_cap'], 2)
))
_log_detail_items('市值排序前段[%s]' % caller, top_details, limit=top_n, chunk=8)
start_idx = max(0, g.stocknum - g.debug_boundary_window - 1)
end_idx = min(len(df), g.stocknum + g.debug_boundary_window)
boundary_details = []
for i in range(start_idx, end_idx):
row = df.iloc[i]
boundary_details.append('%d.%s:%s亿' % (
i + 1,
row['code'],
_fmt_num(row['market_cap'], 2)
))
_log_detail_items('市值排序边界[%s]' % caller, boundary_details, limit=len(boundary_details), chunk=5)
## 选出小市值股票
def check_stocks(context, caller='unknown'):
g.today = context.current_dt
validate_date()
#不停运参数
g.OpenYN = 1
log.info('选股开始[%s]: 日期=%s, g.days=%d, refresh_rate=%d, 持仓数=%d' % (
caller,
context.current_dt.strftime('%Y-%m-%d %H:%M:%S'),
g.days,
g.refresh_rate,
len(context.portfolio.positions)
))
# 检查日期是否在范围内
if g.OpenYN == 0:
log.warn("该时段属于停运==================范围")
return []
else:
# g.OpenYN = 1
g.security = '000001.XSHG'
#000852.XSHG
close_data = get_bars(g.security, count=1, unit='1d', fields=['close'])
# 取得过去五天的平均价格
MA5 = close_data['close'].mean()
###################################################
close5_data = get_bars(g.security, count=5, unit='1d', fields=['close'])
# 获取股票的收盘价
close10_data = get_bars(g.security, count=10, unit='1d', fields=['close'])
# 取得过去五天的平均价格
MA5 = close5_data['close'].mean()
# 取得过去十天的平均价格
MA10 = close10_data['close'].mean()
#5日线下穿,则半仓交易
if MA5 < MA10*g.RSIRate:
g.TR = g.TradeRate
elif MA5 >= MA10*g.RSIRate:
g.TR = 1
###################################################
close5_list = _safe_bar_values(close5_data, 'close')
log.info('指数均线调试[%s][%s] close[-5:]=%s, ma5=%s, ma10=%s, ma5<ma10*RSIRate=%s, TR=%s' % (
caller,
context.current_dt.strftime('%Y-%m-%d'),
str([round(float(v), 2) for v in close5_list]),
_fmt_num(MA5, 4),
_fmt_num(MA10, 4),
str(MA5 < MA10 * g.RSIRate),
_fmt_num(g.TR, 4)
))
# 取得上一时间点价格
current_price = close_data['close'][-1]
log.info('中证指数(current_price)'+str(current_price))
if current_price == 2000:
g.mystart = 7
g.myend = 17
elif current_price > 0:
#Y = (current_price - 3000) *g.XS + 14
Y = (current_price - 2000) *g.XS + 7
g.mystart = Y
g.myend = Y + 10
# mystart = g.start
# myend = g.end
mystart = round(g.mystart)
myend = round(g.myend)
log.info('价格区间为:'+ str(mystart) + '~'+str(myend))
# 设定查询条件
q = query(
valuation.code,
valuation.market_cap
).filter(
valuation.market_cap.between(mystart,myend)
).order_by(
valuation.market_cap.asc()
)
# 选出低市值的股票构成buylist
df = get_fundamentals(q)
g.market_cap_map = {}
if df is not None and len(df) > 0:
for _, row in df.iterrows():
try:
g.market_cap_map[row['code']] = float(row['market_cap'])
except Exception:
g.market_cap_map[row['code']] = None
log.info('市值筛选结果[%s]: %d只股票' % (caller, 0 if df is None else len(df)))
_log_market_cap_snapshot(df, caller)
buylist =list(df['code'])
# 过滤停牌ST科创新股1元股
buylist = filter_paused_stock(buylist, caller=caller)
final_list = buylist[:g.stocknum]
g.muster = final_list
_log_detail_items(
'最终选股[%s]' % caller,
[_describe_stock(stock) for stock in final_list],
limit=len(final_list),
chunk=8
)
boundary_slice = buylist[max(0, g.stocknum - g.debug_boundary_window): min(len(buylist), g.stocknum + g.debug_boundary_window)]
_log_detail_items(
'最终选股边界[%s]' % caller,
[_describe_stock(stock) for stock in boundary_slice],
limit=len(boundary_slice),
chunk=5
)
return final_list
def before_trading_start(context):
# 取得当前日期
g.todayDT = context.current_dt
g.today = context.current_dt.strftime('%Y-%m-%d')
g.start = context.current_dt + timedelta(-2)
g.market_cap_map = {}
## 交易函数
def CPtrade(context):
## 选股
stock_list = check_stocks(context, caller='CPtrade')
if g.OpenYN == 0:
#log.warn("日期属于范围")
## 获取持仓列表
sell_list = list(context.portfolio.positions.keys())
# 如果有持仓,则卖出
if len(sell_list) > 0 :
for stock in sell_list:
order_target_value(stock, 0)
return
## 交易函数
def trade(context):
## 选股
stock_list = check_stocks(context, caller='trade')
if g.OpenYN == 0:
#log.warn("日期属于范围")
## 获取持仓列表
sell_list = list(context.portfolio.positions.keys())
# 如果有持仓,则卖出
if len(sell_list) > 0 :
for stock in sell_list:
order_target_value(stock, 0)
return
g.changeYN = 0
curr_data = get_current_data()
log.info('交易调试[trade]: 日期=%s, g.days=%d, refresh_hit=%s, TR=%s, cash=%s, total_value=%s, 持仓数=%d, 目标池数=%d' % (
context.current_dt.strftime('%Y-%m-%d %H:%M:%S'),
g.days,
str(g.days % g.refresh_rate == 0),
_fmt_num(g.TR, 4),
_fmt_num(context.portfolio.cash, 2),
_fmt_num(context.portfolio.total_value, 2),
len(context.portfolio.positions),
len(stock_list)
))
_log_detail_items(
'交易目标池[trade]',
[_describe_stock(stock) for stock in stock_list],
limit=len(stock_list),
chunk=8
)
# --------------------------------------------------------------------------
for stockPos in context.portfolio.positions:
# SS、记录股票峰值信息
# if g.summit.get(stock, 0)<data[stock].high: g.summit[stock]=data[stock].high
# S0、取得最近几天股票价格信息
grid = get_price(stockPos, start_date=g.start, end_date=g.today, fields=['open', 'high', 'low', 'close', 'high_limit', 'paused'])
# SP、跳过退市、停牌、无效数据
# if grid.paused[-1]: continue
# S1、目前持仓不在预选股票池中(g.muster)则清仓
# if stock not in g.muster:
# order_target(stock,0)
# log.info("市值清仓:%s" % (stock))
# S2、回撤10%则清仓
hold = context.portfolio.positions[stockPos]
current_price = curr_data[stockPos].last_price
avg_cost = hold.avg_cost
return_ratio = current_price / avg_cost - 1 if avg_cost else 0
grid_high_limit = _safe_bar_last(grid, 'high_limit')
grid_close = _safe_bar_last(grid, 'close')
grid_paused = _safe_bar_last(grid, 'paused')
stop_hit = (current_price / avg_cost < g.LossRate) if avg_cost else False
profit_hit = (current_price < grid_high_limit and current_price / avg_cost > g.CloseRate) if (avg_cost and grid_high_limit is not None) else False
log.info('止盈止损评估[%s] %s: qty=%s, avg_cost=%s, current=%s, return=%s%%, stop_threshold=%s, profit_threshold=%s, high_limit=%s, bar.close=%s, paused=%s, in_target=%s, stop_hit=%s, profit_hit=%s' % (
context.current_dt.strftime('%Y-%m-%d'),
stockPos,
str(getattr(hold, 'total_amount', getattr(hold, 'amount', 'None'))),
_fmt_num(avg_cost, 4),
_fmt_num(current_price, 4),
_fmt_num(return_ratio * 100, 2),
_fmt_num(g.LossRate, 4),
_fmt_num(g.CloseRate, 4),
_fmt_num(grid_high_limit, 4),
_fmt_num(grid_close, 4),
str(grid_paused),
str(stockPos in stock_list),
str(stop_hit),
str(profit_hit)
))
if stop_hit:
order_target_value(stockPos, 0)
g.changeYN = 1
log.error('止损清仓:%s,当前价=%.2f,成本价=%.2f,收益率=%.2f%%,high_limit=%s,bar.close=%s' % (
stockPos,
current_price,
hold.avg_cost,
return_ratio * 100,
_fmt_num(grid_high_limit, 4),
_fmt_num(grid_close, 4)
))
# S3、今日高开、今日未涨停则清仓
elif profit_hit:
order_target_value(stockPos, 0)
g.changeYN = 1
log.warn('止盈清仓:%s,当前价=%s,成本价=%s,收益率=%s%%,high_limit=%s,bar.close=%s' % (
stockPos,
_fmt_num(current_price, 4),
_fmt_num(hold.avg_cost, 4),
_fmt_num(return_ratio * 100, 2),
_fmt_num(grid_high_limit, 4),
_fmt_num(grid_close, 4)
))
else:
g.changeYN = 0
if g.changeYN == 1:
sell_list = list(context.portfolio.positions.keys())
replacement_candidates = [stock for stock in stock_list if stockPos != stock and stock not in sell_list]
_log_detail_items(
'补仓候选[%s][%s]' % (context.current_dt.strftime('%Y-%m-%d'), stockPos),
[_describe_stock(stock) for stock in replacement_candidates],
limit=min(len(replacement_candidates), g.debug_log_limit),
chunk=6
)
# log.warn('portfoliocash' + str(context.portfolio.cash))
for stock in stock_list:
if len(context.portfolio.positions.keys()) < g.stocknum:
## 获取持仓列表
sell_list = list(context.portfolio.positions.keys())
if stockPos != stock and stock not in sell_list :
## 分配资金
if len(context.portfolio.positions) < g.stocknum :
Num = g.stocknum - len(context.portfolio.positions)
Cash = context.portfolio.cash * g.TR / Num
#gridbuy = get_price(stock, start_date=g.start, end_date=g.today, fields=['open', 'high', 'low_limit', 'close', 'high_limit'])
#if gridbuy.open[-1] > gridbuy.low_limit[-1]:
log.info('补仓买入:卖出=%s,买入=%s,Cash=%s,Num=%d,TR=%s,持仓数=%d' % (
stockPos,
stock,
_fmt_num(Cash, 2),
Num,
_fmt_num(g.TR, 4),
len(context.portfolio.positions)
))
order_value(stock, Cash)
#else :
# log.warn("忽略跌停股票:" + stock)
break
else:
continue
if g.days%g.refresh_rate == 0:
log.info('定期调仓触发:日期=%s,g.days=%d,refresh_rate=%d,TR=%s' % (
context.current_dt.strftime('%Y-%m-%d'),
g.days,
g.refresh_rate,
_fmt_num(g.TR, 4)
))
## 获取持仓列表
sell_list = list(context.portfolio.positions.keys())
rebalance_sell_list = [stock for stock in sell_list if stock not in stock_list]
_log_detail_items(
'定期调仓卖出名单',
[_describe_stock(stock) for stock in rebalance_sell_list],
limit=len(rebalance_sell_list),
chunk=8
)
# 如果有持仓,则卖出
if len(sell_list) > 0 :
for stock in sell_list:
if stock not in stock_list:
log.info('定期调仓卖出:%s' % stock)
order_target_value(stock, 0)
## 分配资金
if len(context.portfolio.positions) < g.stocknum :
Num = g.stocknum - len(context.portfolio.positions)
# Cash = context.portfolio.cash/Num * g.TR
Cash = context.portfolio.cash * g.TR / g.stocknum
else:
Cash = 0
log.info('定期调仓资金分配Cash=%s,Num=%s,cash=%s,total_value=%s' % (
_fmt_num(Cash, 2),
str(Num if len(context.portfolio.positions) < g.stocknum else 0),
_fmt_num(context.portfolio.cash, 2),
_fmt_num(context.portfolio.total_value, 2)
))
## 买入股票
for stock in stock_list:
if len(context.portfolio.positions.keys()) < g.stocknum:
if stock not in sell_list:
log.info('定期调仓买入:%s,Cash=%s' % (stock, _fmt_num(Cash, 2)))
order_value(stock, Cash)
# 天计数加一
g.days = 1
else:
g.days += 1
# 过滤日期
# 一月十号到三十一号
# 四月十号到四月二十九日 期间
# 八月十日到八月三十一日
# 十月二十日 到十月三十日
def validate_date():
# date = g.todayDT.strftime('%Y-%m-%d')
# date = datetime.strptime(g.todayDT, "%Y-%m-%d")
date = g.today
# 检查是否是4月10日到4月29日之间
if date.month == 1 and 15 <= date.day <= 30:
g.OpenYN = 0
elif date.month == 4 and 15 <= date.day <= 29:
g.OpenYN = 0
elif date.month == 8 and 15 <= date.day <= 31:
g.OpenYN = 0
elif date.month == 10 and 20 <= date.day <= 30:
g.OpenYN = 0
elif date.month == 12 and 20 <= date.day <= 30:
g.OpenYN = 0
else :
g.OpenYN = 1
# 过滤停牌股票
def filter_paused_stock(stock_list, caller='unknown'):
curr_data = get_current_data()
raw_count = len(stock_list)
risk_filtered = []
risk_removed = []
for stock in stock_list:
reasons = []
if curr_data[stock].day_open == curr_data[stock].high_limit:
reasons.append('涨停开盘')
if curr_data[stock].day_open == curr_data[stock].low_limit:
reasons.append('跌停开盘')
if curr_data[stock].last_price == curr_data[stock].high_limit:
reasons.append('当前涨停')
if curr_data[stock].last_price == curr_data[stock].low_limit:
reasons.append('当前跌停')
if curr_data[stock].paused:
reasons.append('停牌')
if curr_data[stock].is_st:
reasons.append('ST')
if 'ST' in curr_data[stock].name:
reasons.append('名称含ST')
if '*' in curr_data[stock].name:
reasons.append('名称含*')
if '退' in curr_data[stock].name:
reasons.append('名称含退')
if stock.startswith('688'):
reasons.append('科创板')
if reasons:
risk_removed.append(_describe_stock(stock, [
'原因=' + '/'.join(reasons),
'open=' + _fmt_num(curr_data[stock].day_open, 2),
'last=' + _fmt_num(curr_data[stock].last_price, 2),
'high_limit=' + _fmt_num(curr_data[stock].high_limit, 2),
'low_limit=' + _fmt_num(curr_data[stock].low_limit, 2),
'name=' + str(curr_data[stock].name)
]))
else:
risk_filtered.append(stock)
log.info('风险过滤[%s]: %d -> %d, 移除=%d' % (
caller,
raw_count,
len(risk_filtered),
len(risk_removed)
))
_log_detail_items(
'风险过滤移除[%s]' % caller,
risk_removed,
limit=min(len(risk_removed), g.debug_log_limit),
chunk=4
)
one_yuan_filtered = []
one_yuan_removed = []
for stock in risk_filtered:
if curr_data[stock].day_open > 1:
one_yuan_filtered.append(stock)
else:
one_yuan_removed.append(_describe_stock(stock, [
'原因=1元股过滤',
'open=' + _fmt_num(curr_data[stock].day_open, 2),
'last=' + _fmt_num(curr_data[stock].last_price, 2)
]))
log.info('1元股过滤[%s]: %d -> %d, 移除=%d' % (
caller,
len(risk_filtered),
len(one_yuan_filtered),
len(one_yuan_removed)
))
_log_detail_items(
'1元股过滤移除[%s]' % caller,
one_yuan_removed,
limit=min(len(one_yuan_removed), g.debug_log_limit),
chunk=4
)
new_list = []
ma_pass_details = {}
ma_removed_details = []
for stock in one_yuan_filtered:
# 获取股票的收盘价
close5_data = get_bars(stock, count=5, unit='1d', fields=['close'])
# 获取股票的收盘价
close10_data = get_bars(stock, count=10, unit='1d', fields=['close'])
# 获取股票的收盘价
close20_data = get_bars(stock, count=20, unit='1d', fields=['close'])
# 取得过去五天的平均价格
MA5 = close5_data['close'].mean()
# 取得过去十天的平均价格
MA10 = close10_data['close'].mean()
# 取得过去二十天的平均价格
MA20 = close20_data['close'].mean()
# 取得上一时间点价格
# current_price = close_data['close'][-1]
close5_list = [round(float(v), 2) for v in _safe_bar_values(close5_data, 'close')]
ma_condition_1 = MA5 > MA10 * g.RSIRate
ma_condition_2 = MA10 > MA20
detail = _describe_stock(stock, [
'close[-5:]=' + str(close5_list),
'ma5=' + _fmt_num(MA5, 4),
'ma10=' + _fmt_num(MA10, 4),
'ma20=' + _fmt_num(MA20, 4),
'ma5>ma10*RSIRate=' + str(ma_condition_1),
'ma10>ma20=' + str(ma_condition_2)
])
if MA5 > MA10*g.RSIRate> MA20*g.RSIRate:
new_list.append(stock)
ma_pass_details[stock] = detail
else:
ma_removed_details.append(detail)
log.info('均线过滤[%s]: %d -> %d, 移除=%d' % (
caller,
len(one_yuan_filtered),
len(new_list),
len(ma_removed_details)
))
_log_detail_items(
'均线过滤移除[%s]' % caller,
ma_removed_details,
limit=min(len(ma_removed_details), g.debug_log_limit),
chunk=3
)
ma_boundary_details = [ma_pass_details[stock] for stock in new_list[:min(len(new_list), g.stocknum + g.debug_boundary_window)]]
_log_detail_items(
'均线通过前段[%s]' % caller,
ma_boundary_details,
limit=len(ma_boundary_details),
chunk=3
)
return new_list