功能说明
本代码旨在演示如何通过多维市场情绪数据构建高效的LSTM(长短期记忆网络)预测模型。核心功能包括:1) 从公开API获取结构化市场情绪数据;2) 对非结构化文本数据进行情感分析;3) 构建包含时间序列特征的混合数据集;4) 实现基于PyTorch的LSTM模型训练流程。该方案适用于量化交易中的短期价格波动预测场景,但需注意过拟合风险及市场黑天鹅事件的影响。
市场情绪数据维度体系设计
基础数据采集层
importpandasaspdimportrequestsfromdatetimeimportdatetimeclassDataCollector:def__init__(self):self.sources={'news':'https://newsapi.org/v2/everything','twitter':'https://api.twitter.com/2/tweets/search/recent','financial_reports':'https://www.sec.gov/Archives/edgar'}deffetch_news_sentiment(self,query,from_date,to_date):"""获取新闻标题的情感极性分数"""params={'q':query,'from':from_date,'to':to_date,'sortBy':'publishedAt','apiKey':'YOUR_API_KEY'}response=requests.get(self.sources['news'],params=params)returnself._process_text_data(response.json())def_process_text_data(self,data):"""文本预处理管道"""# 实际项目中应包含分词、去停用词等步骤return[{'timestamp':item['publishedAt'],'content':item['title']}foritemindata.get('articles',[])]情感分析引擎
fromtransformersimportpipelinefromvaderSentiment.vaderSentimentimportSentimentIntensityAnalyzerclassSentimentAnalyzer:def__init__(self):self.lexicon_based=SentimentIntensityAnalyzer()self.transformer_model=pipeline("sentiment-analysis",model="distilbert-base-uncased-finetuned-sst-2-english")defanalyze_batch(self,texts):"""批量执行情感分析"""lexicon_scores=[self.lexicon_based.polarity_scores(t)['compound']fortintexts]transformer_results=self.transformer_model(texts)returnlist(zip(lexicon_scores,[r['label']forrintransformer_results]))时间序列特征工程
importnumpyasnpfromsklearn.preprocessingimportStandardScalerclassTimeSeriesFeaturizer:def__init__(self,window_size=5):self.window_size=window_size self.scaler=StandardScaler()defcreate_sequences(self,data,target_col='close_price'):"""生成LSTM输入序列"""X,y=[],[]foriinrange(len(data)-self.window_size):X.append(data[i:(i+self.window_size)].drop(columns=[target_col]).values)y.append(data[i+self.window_size][target_col])returnnp.array(X),np.array(y)defadd_technical_indicators(self,df):"""添加技术指标作为补充特征"""df['ma_5']=df['close_price'].rolling(window=5).mean()df['volatility']=df['high_price']-df['low_price']returndf.dropna()LSTM模型架构实现
数据加载模块
importtorchfromtorch.utils.dataimportDataset,DataLoaderclassFinancialTimeSeriesDataset(Dataset):def__init__(self,X,y,sequence_length=60):self.X=torch.FloatTensor(X)self.y=torch.FloatTensor(y)self.sequence_length=sequence_lengthdef__len__(self):returnlen(self.X)-self.sequence_length+1def__getitem__(self,idx):returnself.X[idx:idx+self.sequence_length],self.y[idx+self.sequence_length-1]神经网络定义
importtorch.nnasnnclassMultiFeatureLSTM(nn.Module):def__init__(self,input_size,hidden_size,num_layers,output_size):super().__init__()self.lstm=nn.LSTM(input_size=input_size,hidden_size=hidden_size,num_layers=num_layers,batch_first=True,dropout=0.2ifnum_layers>1else0)self.fc=nn.Linear(hidden_size,output_size)self.attention=nn.Sequential(nn.Linear(hidden_size,1),nn.Softmax(dim=1))defforward(self,x):h0,c0=self._init_hidden(x.size(0))out,(hn,cn)=self.lstm(x,(h0,c0))attention_weights=self.attention(out)weighted_out=torch.sum(out*attention_weights,dim=1)returnself.fc(weighted_out)def_init_hidden(self,batch_size):return(torch.zeros(self.lstm.num_layers,batch_size,self.lstm.hidden_size),torch.zeros(self.lstm.num_layers,batch_size,self.lstm.hidden_size))训练循环优化
deftrain_model(model,train_loader,val_loader,criterion,optimizer,epochs=100):best_val_loss=float('inf')early_stop_counter=0forepochinrange(epochs):model.train()train_loss=0.0forbatch_idx,(data,target)inenumerate(train_loader):optimizer.zero_grad()output=model(data)loss=criterion(output,target)loss.backward()torch.nn.utils.clip_grad_norm_(model.parameters(),max_norm=1)optimizer.step()train_loss+=loss.item()# 验证阶段val_loss=evaluate_model(model,val_loader,criterion)ifval_loss<best_val_loss:best_val_loss=val_loss early_stop_counter=0torch.save(model.state_dict(),'best_model.pth')else:early_stop_counter+=1ifearly_stop_counter>=10:print(f"Early stopping at epoch{epoch+1}")break关键维度有效性验证
消融实验设计
defablation_study(baseline_model,test_loader,criteria):results={}# 移除情感特征modified_model=copy.deepcopy(baseline_model)modified_model.fc=nn.Linear(modified_model.lstm.hidden_size,1)results['no_sentiment']=evaluate_model(modified_model,test_loader,criteria)# 替换注意力机制modified_model.attention=nn.Identity()results['no_attention']=evaluate_model(modified_model,test_loader,criteria)# 缩短序列长度short_seq_dataset=FinancialTimeSeriesDataset(X_test,y_test,sequence_length=30)short_seq_loader=DataLoader(short_seq_dataset,batch_size=64)results['short_sequence']=evaluate_model(baseline_model,short_seq_loader,criteria)returnresults可视化分析工具
importmatplotlib.pyplotaspltimportseabornassnsdefplot_feature_importance(model,feature_names):importance_scores=[]withtorch.no_grad():dummy_input=torch.randn(1,10,len(feature_names))output,attn_weights=model(dummy_input)importance_scores=attn_weights.squeeze().numpy()plt.figure(figsize=(12,8))sns.heatmap(importance_scores,xticklabels=feature_names,annot=True)plt.title("Temporal Feature Importance Scores")plt.show()实际应用案例
完整工作流示例
deffull_pipeline(symbol='AAPL',lookback_days=30):# 1. 数据采集collector=DataCollector()news_data=collector.fetch_news_sentiment(query=symbol,from_date=datetime.now()-timedelta(days=lookback_days),to_date=datetime.now())# 2. 情感分析analyzer=SentimentAnalyzer()sentiment_scores=analyzer.analyze_batch([item['content']foriteminnews_data])# 3. 数据整合price_data=get_historical_prices(symbol)# 假设存在的函数combined_df=merge_with_market_data(price_data,sentiment_scores)# 4. 特征工程featurizer=TimeSeriesFeaturizer(window_size=7)processed_df=featurizer.add_technical_indicators(combined_df)X,y=featurizer.create_sequences(processed_df)# 5. 模型训练dataset=FinancialTimeSeriesDataset(X,y)train_size=int(0.8*len(dataset))train_set,val_set=torch.utils.data.random_split(dataset,[train_size,len(dataset)-train_size])train_loader=DataLoader(train_set,batch_size=64,shuffle=True)val_loader=DataLoader(val_set,batch_size=64)model=MultiFeatureLSTM(input_size=X.shape[2],hidden_size=128,num_layers=2,output_size=1)criterion=nn.MSELoss()optimizer=torch.optim.Adam(model.parameters(),lr=0.001)train_model(model,train_loader,val_loader,criterion,optimizer)# 6. 预测部署test_sample=get_latest_market_data(symbol)# 假设存在的函数prediction=predict_next_price(model,test_sample)returnprediction风险管理模块
classRiskManager:def__init__(self,stop_loss=0.05,position_sizing=0.1):self.stop_loss=stop_loss self.position_sizing=position_sizingdefcalculate_position_size(self,account_balance,volatility):"""凯利公式变体计算头寸大小"""returnaccount_balance*self.position_sizing/volatilitydefcheck_stop_loss(self,current_price,entry_price):"""触发止损条件检查"""returnabs((current_price-entry_price)/entry_price)>self.stop_loss