(优化)使用Milvus和DeepSeek构建RAG
构建一个高质量的 RAG 系统: 包括数据加载、智能分块、混合召回、嵌入生成、Milvus 建库、重排序、和DeepSeek LLM 生成回答
# 安装依赖
# !pip install pymilvus[model]==2.5.10 rank_bm25 sentence-transformers openai tqdm
!pip install pymilvus[model]==2.5.10 rank_bm25
Collecting pymilvus==2.5.10 (from pymilvus[model]==2.5.10)
Downloading pymilvus-2.5.10-py3-none-any.whl.metadata (5.7 kB)
Collecting rank_bm25
Downloading rank_bm25-0.2.2-py3-none-any.whl.metadata (3.2 kB)
Requirement already satisfied: setuptools>69 in /usr/local/lib/python3.11/dist-packages (from pymilvus==2.5.10->pymilvus[model]==2.5.10) (75.2.0)
......
!wget https://github.com/milvus-io/milvus-docs/releases/download/v2.4.6-preview/milvus_docs_2.4.x_en.zip
!unzip -q milvus_docs_2.4.x_en.zip -d milvus_docs
--2025-10-11 07:48:28-- https://github.com/milvus-io/milvus-docs/releases/download/v2.4.6-preview/milvus_docs_2.4.x_en.zip
......
milvus_docs_2.4.x_e 100%[===================>] 598.72K --.-KB/s in 0.04s
2025-10-11 07:48:28 (13.5 MB/s) - ‘milvus_docs_2.4.x_en.zip’ saved [613094/613094]
Step 1: 数据准备与分块
- 从Milvus FAQ文档中加载所有Markdown文件
- 使用正则表达式将长文档分为更小的文本块,以便更好地生成嵌入
from glob import glob
import os, re
from tqdm import tqdm
docs = []
for file_path in glob("milvus_docs/en/faq/*.md", recursive=True):
with open(file_path, "r", encoding="utf-8") as f:
text = f.read()
sentences = re.split(r'(?<=[。!?.!?])\s+', text)
# 每三个句子合并成一个 chunk,防止过短的文本影响语义表达
chunks = [" ".join(sentences[i:i+3]) for i in range(0, len(sentences), 3)]
docs.extend(chunks)
print(f"Loaded {len(docs)} chunks")
Loaded 90 chunks
Step 2: 生成文本嵌入
- 使用OpenAI的text-embedding-3-large模型生成嵌入
- 每段文本被转换为一个512维向量
from pymilvus import model as milvus_model
embedding_model = milvus_model.dense.OpenAIEmbeddingFunction(
model_name="text-embedding-3-large",
api_key="sk-@@@", # 替换为真实API key
base_url="https://api.apiyi.com/v1",
dimensions=512
)
# 对所有文本块进行批量嵌入计算
embeddings = embedding_model.encode_documents(docs)
90
Step 3: 存储到Milvus Lite数据库
- 在本地使用Milvus Lite(SQLite存储),无需安装Milvus服务器
- (可在Docker或Kubernetes环境中替换为远程Milvus服务)
from pymilvus import MilvusClient
client = MilvusClient(uri="./rag_demo.db")
col = "rag_collection_opt"
if client.has_collection(col):
client.drop_collection(col)
'''
创建集合并指定参数:
- dimension: 向量维度
- metric_type: 向量距离度量方式(此处用内积)
- consistency_level: 一致性等级(Strong 保证读取到最新写入的数据)
参见 https://milvus.io/docs/consistency.md#Consistency-Level
'''
client.create_collection(
collection_name=col,
dimension=512,
metric_type="IP",
consistency_level="Strong"
)
# 组织数据并插入, 每条数据包含id、vector、text字段
# Milvus自动管理非schema字段(例如text)到JSON扩展区域中
data = [{"id": i, "vector": embeddings[i], "text": docs[i]} for i in range(len(docs))]
client.insert(collection_name=col, data=data)
print("✅ 数据插入完成")
✅ 数据插入完成
Step 4: 构建混合检索
同时利用BM25(基于关键词的传统检索)与向量检索(基于语义相似度) 进行混合召回,以获得更稳定的召回效果
from rank_bm25 import BM25Okapi
import numpy as np
# 初始化BM25模型(基于词袋的检索)
bm25 = BM25Okapi([d.split() for d in docs])
def hybrid_search(query, top_k=5, alpha=0.7):
"""
参数:
query: 查询字符串
top_k: 返回结果数量
alpha: 控制向量检索与BM25权重(越大表示更偏向语义召回)
"""
vec = embedding_model.encode_queries([query])[0]
# 在Milvus中进行向量相似度搜索
vec_res = client.search(
col,
data=[vec],
limit=top_k*2, # 先取多一点结果,后续再融合过滤
output_fields=["text"]
)[0]
bm25_scores = bm25.get_scores(query.split())
results = []
for r in vec_res:
idx = docs.index(r["entity"]["text"])
bm25_score = bm25_scores[idx]
# 计算混合得分:alpha * 向量相似度 + (1-alpha) * BM25归一化得分
hybrid_score = alpha*r["distance"] + (1-alpha)*(bm25_score/np.max(bm25_scores))
results.append((r["entity"]["text"], hybrid_score))
# 排序并返回前 top_k 条结果
return sorted(results, key=lambda x: x[1], reverse=True)[:top_k]
Step 5: 调用DeepSeek LLM生成回答
from openai import OpenAI
ds = OpenAI(
api_key="sk-@@@", # 替换为真实API key
base_url="https://api.deepseek.com/v1"
)
def rag_answer(question):
"""
RAG主流程:
1. 对输入问题进行混合检索
2. 将检索到的文档合并为上下文
3. 构造Prompt并调用LLM生成回答
"""
retrieved = hybrid_search(question)
context = "\n".join([r[0] for r in retrieved])
prompt = f'''
请根据以下上下文回答问题:
<context>{context}</context>
<question>{question}</question>
<translated></translated>
'''
resp = ds.chat.completions.create(
model="deepseek-chat",
messages=[{"role":"user","content":prompt}]
)
print("Answer:\n", resp.choices[0].message.content)
Answer:
根据上下文,Milvus 中的数据存储方式如下:
1. **数据分类**:Milvus 处理两种类型的数据:
* **元数据**:由 Milvus 内部生成,每个模块都有自己的元数据。这些元数据存储在 **etcd** 中。
* **插入数据**:这是用户插入的实际数据,包括向量数据、标量数据和集合特定的模式。
2. **插入数据的存储**:
* 插入数据作为**增量日志**存储在**持久化存储**中。
* Milvus 支持多种对象存储后端,包括 MinIO、AWS S3、Google Cloud Storage、Azure Blob Storage、Alibaba Cloud OSS 和 Tencent Cloud COS。
3. **数据段管理**:
* **增量数据**:位于 "growing segments" 中。这些数据在达到持久化阈值之前会先在内存中缓冲。
* **历史数据**:来自 "sealed segments",这些数据段存储在对象存储中。
* 增量数据和历史数据共同构成了用于搜索的完整数据集。
4. **数据刷新**:当插入的数据被加载到消息队列后,Milvus 就会返回成功响应。
**总结**:元数据存储在 etcd 中,而实际的向量和标量等插入数据则作为日志文件存储在对象存储(如 S3)中,并分为内存中的增量段和对象存储中的历史段进行管理。
/tmp/ipykernel_37/950822849.py:30: RuntimeWarning: invalid value encountered in scalar divide
hybrid_score = alpha*r["distance"] + (1-alpha)*(bm25_score/np.max(bm25_scores))
Answer:
根据提供的上下文,Milvus的核心要求和对硬件架构的依赖性可以总结如下:
**核心硬件要求与架构限制:**
1. **x86架构依赖**:Milvus**不能**在非x86架构的平台上安装或运行。
2. **SIMD指令集**:Milvus的正常运行强制要求CPU必须支持至少一种特定的SIMD指令集,包括 **SSE4.2、AVX、AVX2 或 AVX512**。如果在启动时遇到 `illegal instruction` 错误,这通常表明当前的CPU不支持上述任何指令集。
**核心功能与行为:**
1. **不支持更新操作**:Milvus目前不提供数据更新功能。
2. **主键重复性检查**:系统不会自动检查实体主键是否重复。用户需要自行保证主键的唯一性。
3. **重复主键的未知行为**:如果插入了重复的主键,在查询时返回哪个数据副本是未知的(未定义行为)。
因此,从上下文来看,Milvus的“核心”可以理解为它**深度依赖现代x86 CPU的向量化计算能力(通过SIMD指令集)来实现高性能,并且在数据管理上有特定的限制和行为**。