一、升级概览
2025 年 10 月 30 日,TradingAgents-CN 完成了一次全面的 系统稳定性 和 数据一致性 优化工作。
本次更新通过 19 个提交,解决了 数据源优先级不统一、MongoDB 批量写入超时、实时行情数据缺失、AKShare 股票代码格式不一致 等关键问题,显著提升了系统的 可靠性、可维护性和用户体验。
核心改进
- 数据源优先级统一:修复优先级逻辑,实现端到端一致性。
- 重试机制完善:为批量操作和数据同步添加智能重试。
- MongoDB 超时优化:解决大批量历史数据处理超时问题。
- 实时行情增强:程序启动时自动从历史数据回填收盘数据。
- AKShare 代码标准化:修复新浪接口返回股票代码带交易所前缀的问题。
- 工具与诊断优化:改进 Tushare 配置、数据源测试接口和日志系统。
二、数据源优先级统一
2.1 问题背景
相关提交:
719b9da:优化 数据源优先级管理 和股票筛选功能。f632395:修复数据查询不按优先级的问题。586e3dc:修复数据源状态列表排序顺序。f094a62:添加数据源优先级修复说明文档。
此前系统中存在多处 数据源优先级逻辑不一致 的问题。
主要问题
优先级判断错误
代码中使用升序排序,等价于“数字越小优先级越高”;但系统配置期望的是降序,也就是 数字越大优先级越高。
错误逻辑示例:
# 错误:升序逻辑
source_priority = ["baostock", "akshare", "tushare"]
正确逻辑应为:
# 正确:降序逻辑
source_priority = ["tushare", "akshare", "baostock"]
查询不遵循优先级
多个模块存在绕过数据源优先级的问题:
app/routers/reports.py中get_stock_name()直接查询,不按优先级。app/services/database_screening_service.py聚合查询可能混用不同数据源。app/routers/stocks.py中get_fundamentals()按时间戳查询,而不是按数据源优先级查询。
前端显示顺序混乱
数据源状态列表排序与配置不一致,用户无法直观看到当前实际使用的数据源。
2.2 解决方案
统一优先级规则
明确规则:数字越大,优先级越高。
DEFAULT_PRIORITIES = {
"tushare": 3,
"akshare": 2,
"baostock": 1
}
从数据库动态加载优先级
class BaseDataSourceAdapter(ABC):
def __init__(self):
self._priority = None
async def load_priority_from_db(self):
"""从数据库加载优先级配置"""
db = await get_mongo_db()
config = await db.datasource_groupings.find_one(
{"source": self.source_name}
)
if config:
self._priority = config.get("priority", 1)
else:
self._priority = DEFAULT_PRIORITIES.get(self.source_name, 1)
修复股票名称查询逻辑
async def get_stock_name(code: str) -> str:
"""按数据源优先级查询股票名称"""
db = await get_mongo_db()
for source in ["tushare", "akshare", "baostock"]:
doc = await db.stock_basic_info.find_one(
{"code": code, "source": source},
{"name": 1}
)
if doc:
return doc.get("name", code)
doc = await db.stock_basic_info.find_one(
{"code": code},
{"name": 1}
)
return doc.get("name", code) if doc else code
修复股票筛选数据源逻辑
async def screen(self, criteria: ScreeningCriteria) -> List[Dict]:
"""股票筛选,只使用优先级最高的数据源"""
primary_source = await self._get_primary_source()
pipeline = [
{"$match": {"source": primary_source}},
# 其他筛选条件
]
results = await db.stock_basic_info.aggregate(pipeline).to_list(None)
return results
修复前端排序
const sortedSources = computed(() => {
return [...dataSources.value].sort((a, b) =>
b.priority - a.priority
)
})
添加当前数据源显示
<el-alert type="info" :closable="false">
<template #title>
当前数据源:{{ currentDataSource }}
<el-tag :type="getSourceTagType(currentDataSource)">
优先级 {{ currentPriority }}
</el-tag>
</template>
</el-alert>
2.3 优化效果
- 所有查询统一按数据源优先级执行。
- 前端显示顺序与配置一致。
- 用户可以清楚看到当前使用的数据源。
- 避免不同数据源的数据被混用。
- 股票筛选、报告查询和基本面查询结果更加一致。
三、批量操作重试机制
3.1 问题背景
相关提交:
1b97aed:为批量操作添加重试机制,改进超时处理。281587e:为多源基础数据同步添加重试机制。4da35a0:为 Tushare 基础数据同步添加重试机制。
在批量写入和数据同步过程中,系统经常遇到临时失败:
批量写入失败: mongodb:27017: timed out
同步失败: Connection reset by peer
API调用失败: Rate limit exceeded
主要原因
- 网络波动导致写入失败。
- MongoDB 高负载时偶发超时。
- 数据源接口触发临时限流。
- 原先没有重试机制,一次失败就直接放弃,容易造成数据不完整。
3.2 解决方案
新增通用批量写入重试方法。
async def _execute_bulk_write_with_retry(
self,
symbol: str,
operations: List,
max_retries: int = 3
) -> int:
"""执行批量写入,支持重试"""
saved_count = 0
retry_count = 0
while retry_count < max_retries:
try:
result = await self.collection.bulk_write(
operations,
ordered=False
)
saved_count = result.upserted_count + result.modified_count
if retry_count > 0:
logger.info(f"{symbol} 重试成功,保存 {saved_count} 条")
return saved_count
except asyncio.TimeoutError as e:
retry_count += 1
if retry_count < max_retries:
wait_time = 2 ** retry_count
logger.warning(
f"{symbol} 批量写入超时,第 {retry_count}/{max_retries} 次重试,"
f"等待 {wait_time} 秒后重试"
)
await asyncio.sleep(wait_time)
else:
logger.error(f"{symbol} 批量写入失败,已重试 {max_retries} 次: {e}")
return 0
except Exception as e:
logger.error(f"{symbol} 批量写入失败: {e}")
return 0
return saved_count
应用到历史数据同步:
async def save_historical_data(
self,
symbol: str,
data: List[Dict],
period: str = "daily"
) -> int:
"""保存历史数据,使用重试机制"""
operations = []
for record in data:
operations.append(
UpdateOne(
{"code": symbol, "date": record["date"], "period": period},
{"$set": record},
upsert=True
)
)
saved_count = await self._execute_bulk_write_with_retry(
symbol,
operations
)
logger.info(f"{symbol} 保存完成: 新增 {saved_count} 条,共 {len(data)} 条")
return saved_count
3.3 优化效果
- 网络波动时自动重试,避免数据丢失。
- 使用 指数退避策略,避免频繁重试加重负载。
- 区分超时错误和其他错误,避免无限重试。
- 提供详细重试日志,便于问题诊断。
四、MongoDB 超时优化
4.1 问题背景
相关提交:
45a306b:增加 MongoDB 超时参数配置,解决大量历史数据处理超时问题。c3b0a33:改进 MongoDB 数据源日志,明确显示具体数据源类型。
处理大量历史数据时,系统频繁出现 MongoDB 超时错误:
mongodb:27017: timed out
configured timeouts: socketTimeoutMS: 20000ms, connectTimeoutMS: 10000ms
根本原因
socketTimeoutMS只有 20 秒,大批量写入时不够。connectTimeoutMS只有 10 秒,高负载环境下连接可能较慢。- 日志只显示 MongoDB,无法判断具体使用的是 Tushare、AKShare 还是 BaoStock 数据。
4.2 解决方案
增加 MongoDB 超时配置
class Settings(BaseSettings):
MONGO_CONNECT_TIMEOUT_MS: int = 30000
MONGO_SOCKET_TIMEOUT_MS: int = 60000
MONGO_SERVER_SELECTION_TIMEOUT_MS: int = 5000
.env.example 和 .env.docker 中新增:
MONGO_MAX_CONNECTIONS=100
MONGO_MIN_CONNECTIONS=10
MONGO_CONNECT_TIMEOUT_MS=30000
MONGO_SOCKET_TIMEOUT_MS=60000
MONGO_SERVER_SELECTION_TIMEOUT_MS=5000
应用到 MongoDB 连接
_mongo_client = AsyncIOMotorClient(
settings.MONGO_URI,
maxPoolSize=settings.MONGO_MAX_CONNECTIONS,
minPoolSize=settings.MONGO_MIN_CONNECTIONS,
connectTimeoutMS=settings.MONGO_CONNECT_TIMEOUT_MS,
socketTimeoutMS=settings.MONGO_SOCKET_TIMEOUT_MS,
serverSelectionTimeoutMS=settings.MONGO_SERVER_SELECTION_TIMEOUT_MS
)
改进数据源日志
async def get_daily_data(self, symbol: str) -> Optional[pd.DataFrame]:
"""获取日线数据,显示具体数据源"""
tried_sources = []
for source in ["tushare", "akshare", "baostock"]:
logger.debug(f"[MongoDB查询] 尝试数据源: {source}, symbol={symbol}")
cursor = self.db[collection].find(
{"code": symbol, "source": source}
)
docs = await cursor.to_list(length=None)
if docs:
logger.info(f"[MongoDB-{source}] 找到 {len(docs)} 条 daily 数据: {symbol}")
return pd.DataFrame(docs)
else:
tried_sources.append(source)
logger.warning(
f"[数据来源: MongoDB] 所有数据源({', '.join(tried_sources)})都没有 daily 数据: {symbol}"
)
return None
4.3 优化效果
- 大批量数据处理不再频繁超时。
- 用户可以根据环境灵活调整 MongoDB 超时时间。
- 日志清晰显示具体数据源,便于定位问题。
- 保持向后兼容,默认值更加合理。
五、实时行情启动回填
5.1 问题背景
相关提交:
cf892e3:程序启动时自动从历史数据导入收盘数据到market_quotes。
在非交易时段启动系统时,market_quotes 集合可能为空,导致:
- 股票列表没有价格信息。
- K 线图无法显示。
- 无法按涨跌幅筛选。
- 无法按价格筛选。
- 用户需要手动触发实时行情同步。
5.2 解决方案
新增从历史数据回填最新收盘行情的方法。
async def backfill_from_historical_data(self) -> Dict[str, Any]:
"""
从历史数据导入最新交易日的收盘数据到 market_quotes
仅当 market_quotes 集合为空时执行
"""
db = await get_mongo_db()
count = await db[self.collection_name].count_documents({})
if count > 0:
logger.info(f"market_quotes 已有 {count} 条数据,跳过回填")
return {"skipped": True, "reason": "collection_not_empty"}
historical_count = await db.stock_daily_quotes.count_documents({})
if historical_count == 0:
logger.warning("stock_daily_quotes 集合为空,无法回填")
return {"skipped": True, "reason": "no_historical_data"}
pipeline = [
{"$group": {"_id": None, "max_date": {"$max": "$date"}}},
]
result = await db.stock_daily_quotes.aggregate(pipeline).to_list(1)
latest_date = result[0]["max_date"]
cursor = db.stock_daily_quotes.find(
{"date": latest_date},
{"_id": 0}
)
historical_records = await cursor.to_list(length=None)
operations = []
for record in historical_records:
quote = {
"code": record["code"],
"name": record.get("name", ""),
"price": record.get("close", 0),
"open": record.get("open", 0),
"high": record.get("high", 0),
"low": record.get("low", 0),
"volume": record.get("volume", 0),
"amount": record.get("amount", 0),
"change_pct": record.get("change_pct", 0),
"timestamp": datetime.now(self.tz),
"source": "historical_backfill",
"date": latest_date
}
operations.append(
UpdateOne(
{"code": quote["code"]},
{"$set": quote},
upsert=True
)
)
result = await db[self.collection_name].bulk_write(operations, ordered=False)
return {
"success": True,
"date": latest_date,
"total": len(historical_records),
"upserted": result.upserted_count,
"modified": result.modified_count
}
启动时自动调用:
async def backfill_last_close_snapshot_if_needed(self):
"""
启动时检查并回填行情数据
"""
db = await get_mongo_db()
count = await db[self.collection_name].count_documents({})
if count == 0:
logger.info("market_quotes 为空,尝试从历史数据回填")
await self.backfill_from_historical_data()
else:
latest_doc = await db[self.collection_name].find_one(
{},
sort=[("timestamp", -1)]
)
if latest_doc:
latest_time = latest_doc.get("timestamp")
if latest_time:
age = datetime.now(self.tz) - latest_time
if age.total_seconds() > 3600:
logger.info(f"行情数据已陈旧 {age},尝试更新")
await self._fetch_and_save_quotes()
5.3 优化效果
- 非交易时段启动也能看到行情数据。
- 自动化处理,无需手动干预。
- 使用历史收盘价作为基准,数据更加稳定。
- 不影响交易时段的实时行情更新。
六、AKShare 股票代码标准化
6.1 问题背景
相关提交:
cc32639:修复 AKShare 新浪接口股票代码带交易所前缀的问题。
AKShare 的新浪财经接口返回股票代码时会带交易所前缀:
sz000001
sh600036
bj430047
但系统数据库中使用的是 6 位标准股票代码,例如:
000001
600036
430047
这会导致:
- 数据库查询失败。
- 前端显示异常。
- 跨模块代码格式不一致。
- 不同数据源返回格式不统一。
6.2 解决方案
新增股票代码标准化方法。
@staticmethod
def _normalize_stock_code(code: str) -> str:
"""
标准化股票代码为 6 位数字
支持:
- sz000001 -> 000001
- sh600036 -> 600036
- bj430047 -> 430047
- 000001 -> 000001
"""
if not code:
return code
code = code.lower()
if code.startswith(("sz", "sh", "bj")):
code = code[2:]
return code.zfill(6)
应用到实时行情获取:
async def get_realtime_quotes(
self,
symbols: Optional[List[str]] = None
) -> List[Dict[str, Any]]:
"""获取实时行情,标准化股票代码"""
df = ak.stock_zh_a_spot()
quotes = []
for _, row in df.iterrows():
code = self._normalize_stock_code(row.get("代码", ""))
if not code:
continue
quote = {
"code": code,
"name": row.get("名称", ""),
"price": float(row.get("最新价", 0)),
"open": float(row.get("今开", 0)),
"high": float(row.get("最高", 0)),
"low": float(row.get("最低", 0)),
}
quotes.append(quote)
return quotes
应用到行情入库服务:
async def _bulk_upsert(self, quotes: List[Dict]) -> int:
"""批量更新行情,标准化股票代码"""
operations = []
for quote in quotes:
code = self._normalize_stock_code(quote.get("code", ""))
if not code or len(code) != 6:
logger.warning(f"跳过无效代码: {quote.get('code')}")
continue
quote["code"] = code
operations.append(
UpdateOne(
{"code": code},
{"$set": quote},
upsert=True
)
)
result = await self.collection.bulk_write(operations, ordered=False)
return result.upserted_count + result.modified_count
6.3 优化效果
- 所有股票代码统一为 6 位标准格式。
- 数据库查询正常。
- 前端显示正确。
- 跨模块格式一致。
- 降低不同数据源之间的适配成本。
七、工具与诊断优化
7.1 Tushare 配置优化
相关提交:
fd372c7:改进 Tushare Token 配置优先级和测试超时。
优化后,Tushare Token 获取优先级为:
- 优先使用数据库配置。
- 数据库 Token 测试失败时,降级到
.env配置。 - 如果都不存在,则抛出明确错误。
db_token = await self._get_token_from_db()
if db_token:
try:
await self._test_connection(db_token, timeout=10)
return db_token
except Exception:
logger.warning("数据库 Token 测试失败,尝试 .env 配置")
env_token = os.getenv("TUSHARE_TOKEN")
if env_token:
return env_token
raise ValueError("未找到有效的 Tushare Token")
优化效果:
- 用户在 Web 后台修改 Token 后可以立即生效。
- 网络波动或 Token 失效时自动降级。
- 测试连接更快,不会长时间等待。
- 日志显示当前 Token 来源和降级流程。
7.2 数据源测试简化
相关提交:
8e4eecc:简化数据源连通性测试接口。b17deee:修复数据源测试接口参数传递问题。
之前的数据源测试会拉取完整数据,速度慢、占用 API 配额。
优化前:
stocks = await adapter.get_stock_list()
financials = await adapter.get_financials()
优化后:
await adapter.test_connection()
响应格式也从复杂嵌套结构简化为扁平结构:
{
"source": "tushare",
"available": true,
"message": "连接成功"
}
优化效果:
- 测试速度提升 10 倍以上。
- 减少网络带宽消耗。
- 不占用大量 API 配额。
- 用户体验更好。
7.3 DeepSeek 日志优化
相关提交:
88149c7:修复 DeepSeek 市场分析问题和日志显示问题。66ed4c6:改进 DeepSeek 新闻分析日志和错误处理。
此前 DeepSeek 分析时只传股票代码,模型可能无法理解任务。
优化前:
initial_message = ("human", "601179")
优化后:
initial_message = HumanMessage(
content=f"请对股票 {company_name}({symbol}) 进行全面分析"
)
同时改进日志:
- 日志预览长度从 200 字符增加到 500 字符。
- 添加元组消息的特殊处理。
- 记录 LLM 原始响应内容。
- 记录调用参数、返回结果长度和完整异常堆栈。
优化效果:
- DeepSeek 能正确理解分析任务。
- 日志更清晰,便于问题诊断。
- 显示真实数据来源,而不是模糊的
current_source。
7.4 其他改进
相关提交:
e2e88c8:增加中文字体支持。dfbead7:添加 2025-10-29 工作博客。1a4b1ca:补充系统日志功能说明到 2025-10-29 工作博客。
改进内容:
- 添加中文字体支持,优化 PDF 和 Word 导出的中文显示。
- 完善文档,补充工作日志和系统日志说明。
八、影响范围
8.1 后端文件
| 文件 | 改动说明 |
|---|---|
app/core/config.py | 添加 MongoDB 超时配置 |
app/core/database.py | 应用 MongoDB 超时配置 |
app/routers/reports.py | 修复优先级查询 |
app/routers/stocks.py | 修复优先级查询 |
app/routers/multi_source_sync.py | 优化测试接口 |
app/services/historical_data_service.py | 添加重试机制 |
app/services/basics_sync_service.py | 添加重试机制 |
app/services/multi_source_basics_sync_service.py | 添加重试机制 |
app/services/database_screening_service.py | 修复优先级筛选 |
app/services/quotes_ingestion_service.py | 添加启动回填 |
app/services/data_sources/base.py | 动态加载优先级 |
app/services/data_sources/akshare_adapter.py | 股票代码标准化 |
tradingagents/dataflows/providers/china/tushare.py | Token 优先级 |
tradingagents/dataflows/cache/mongodb_cache_adapter.py | 改进日志 |
tradingagents/agents/analysts/news_analyst.py | 改进日志 |
8.2 前端文件
| 文件 | 改动说明 |
|---|---|
frontend/src/views/Screening/index.vue | 添加当前数据源显示 |
frontend/src/components/Sync/DataSourceStatus.vue | 修复排序 |
frontend/src/components/Dashboard/MultiSourceSyncCard.vue | 修复排序 |
frontend/src/api/sync.ts | 更新 API 接口 |
8.3 配置文件
| 文件 | 改动说明 |
|---|---|
.env.example | 添加 MongoDB 超时配置 |
.env.docker | 添加 MongoDB 超时配置 |
8.4 文档
| 文件 | 改动说明 |
|---|---|
docs/blog/2025-10-29-data-source-unification-and-report-export-features.md | 补充相关文档说明 |
九、验证方法
9.1 数据源优先级验证
curl http://localhost:8000/api/multi-source-sync/status
测试股票筛选:
curl http://localhost:8000/api/screening/screen \
-H "Content-Type: application/json" \
-d '{"pe_min": 0, "pe_max": 20}'
前端验证:
访问股票筛选页面,查看“当前数据源”显示。
9.2 重试机制验证
查看历史数据同步日志:
tail -f logs/app.log | grep "重试"
可以在同步过程中模拟网络波动,观察是否自动重试,并检查最终数据完整性。
9.3 MongoDB 超时验证
检查 MongoDB 连接日志:
tail -f logs/app.log | grep "MongoDB连接"
触发大量历史数据同步:
curl -X POST http://localhost:8000/api/scheduler/trigger/sync_historical_data
观察是否还有超时错误:
tail -f logs/app.log | grep "timed out"
9.4 启动回填验证
清空 market_quotes 集合:
mongo tradingagents --eval "db.market_quotes.deleteMany({})"
重启后端服务,检查回填结果:
mongo tradingagents --eval "db.market_quotes.countDocuments({})"
访问前端,确认股票列表可以看到行情数据。
9.5 股票代码标准化验证
触发 AKShare 实时行情同步:
curl -X POST http://localhost:8000/api/scheduler/trigger/akshare_quotes_sync
检查 market_quotes 中的代码格式:
mongo tradingagents --eval "db.market_quotes.find({}, {code: 1}).limit(10)"
确认所有代码都是 6 位数字,没有 sz、sh、bj 前缀。
十、升级指引
10.1 更新环境变量
在 .env 文件中添加 MongoDB 超时配置:
MONGO_CONNECT_TIMEOUT_MS=30000
MONGO_SOCKET_TIMEOUT_MS=60000
MONGO_SERVER_SELECTION_TIMEOUT_MS=5000
10.2 重启服务
Docker 部署:
docker-compose down
docker-compose up -d
本地部署:
停止后端服务后重新启动。
10.3 验证升级
检查服务状态:
curl http://localhost:8000/health
检查数据源状态:
curl http://localhost:8000/api/multi-source-sync/status
测试数据同步:
curl -X POST http://localhost:8000/api/scheduler/trigger/sync_stock_basic_info
10.4 可选:清理旧数据
如果需要重新同步数据以应用新的优先级逻辑,可以先备份,再清空基础数据集合。
备份数据:
mongodump --db tradingagents --out /backup/$(date +%Y%m%d)
清空基础数据:
mongo tradingagents --eval "db.stock_basic_info.deleteMany({})"
重新同步:
curl -X POST http://localhost:8000/api/scheduler/trigger/sync_stock_basic_info
注意:清理旧数据会删除所有基础数据,请谨慎操作。
十一、总结
本次更新通过 19 个提交,全面提升了 TradingAgents-CN 的系统稳定性和数据一致性。
主要成果包括:
- 数据源优先级统一:修复多处优先级逻辑不一致问题,实现端到端一致性。
- 重试机制完善:为批量写入和数据同步添加智能重试,大幅提升成功率。
- MongoDB 超时优化:解决大批量历史数据处理超时问题,并支持灵活配置。
- 实时行情增强:启动时自动从历史数据回填收盘行情,改善非交易时段体验。
- 股票代码标准化:统一 AKShare 返回代码格式,消除跨模块不一致。
- 工具与日志优化:改进 Tushare Token 配置、数据源测试接口和 DeepSeek 日志系统。
- 前端体验优化:统一数据源排序,并在筛选页显示当前数据源。
这些改进显著提升了系统的 可靠性、可维护性、数据一致性和用户体验,也为后续更多数据源和分析功能的扩展打下了更稳固的基础。
✅ 官方唯一渠道:📦 GitHub 仓库:https://github.com/hsliuping/TradingAgents-CN
Aekor AI-API 中转站,让全球顶尖 AI 大模型“触手可及”!你是否曾为这些烦恼头疼?
🔹 人在国内,却总被海外官网 API 的高延迟、掉线、甚至无法访问困扰?
🔹 想用最强的 GPT、Claude 等模型,却卡在海外信用卡、支付审核等重重阻碍?
🔹 官方 API 太贵?Aekor 为你打通“网络-支付-成本”的任督二脉!
💡 Aekor 核心价值:好用、便宜、快 💡
🚀 高速稳定,告别掉线国内专线加速,API 响应低延迟,告别「转圈圈」的焦虑,开发效率瞬间拉满!
🧠 顶尖模型,随需而调涵盖 GPT 系列、Claude 系列等全球主流大厂模型,一次接入,轻松调用!
🎁 免费白嫖,诚意拉满!
注册即送 20 美元体验额度,够你狠狠测试一轮模型质量与线路稳定性了!
⚠️ 温馨提示:API 中转市场虽多但良莠不齐(甚至有些会偷工减料换小模型糊弄事儿)。Aekor 坚持提供正版稳定的服务,但还是建议:先用免费的 20 刀测试是否契合自身需求,满意了再小额充值上车,理性消费不盲目。
文章评论