集成:Milvus
在 Haystack 中使用 Milvus 向量数据库
目录
近期更新
- [2025.4.17] 使用 Milvus 和 Haystack 实现全文搜索 - 了解如何使用 Haystack 和 Milvus 在您的应用程序中实现全文和混合搜索
安装
pip install --upgrade pymilvus milvus-haystack
使用
在 Haystack 管道中快速使用 MilvusDocumentStore。
from haystack import Document
from milvus_haystack import MilvusDocumentStore
document_store = MilvusDocumentStore(
connection_args={"uri": "./milvus.db"},
drop_old=True,
)
documents = [Document(
content="A Foo Document",
meta={"page": "100", "chapter": "intro"},
embedding=[-10.0] * 128,
)]
document_store.write_documents(documents)
print(document_store.count_documents()) # 1
连接 Milvus 的不同方式
- 对于 Milvus Lite 的情况,最方便的方法就是将 URI 设置为本地文件。
document_store = MilvusDocumentStore(
connection_args={"uri": "./milvus.db"},
drop_old=True,
)
- 对于在 docker 或 kubernetes 上的 Milvus 服务器,建议在处理大规模数据时使用。启动 Milvus 服务后,您可以使用指定的 URI 连接到服务。
document_store = MilvusDocumentStore(
connection_args={"uri": "https://:19530"},
drop_old=True,
)
- 对于 Milvus 的完全托管云服务 Zilliz Cloud,调整 URI 和 token,它们对应于 Zilliz Cloud 中的 公共端点和 API 密钥。
from haystack.utils import Secret
document_store = MilvusDocumentStore(
connection_args={
"uri": "https://in03-ba4234asae.api.gcp-us-west1.zillizcloud.com", # Your Public Endpoint
"token": Secret.from_env_var("ZILLIZ_CLOUD_API_KEY"), # API key, we recommend using the Secret class to load the token from env variable for security.
"secure": True
},
drop_old=True,
)
深入使用
准备一个 OpenAI API 密钥并将其设置为环境变量
export OPENAI_API_KEY=<your_api_key>
创建索引管道并索引一些文档
import glob
import os
from haystack import Pipeline
from haystack.components.converters import MarkdownToDocument
from haystack.components.embedders import OpenAIDocumentEmbedder, OpenAITextEmbedder
from haystack.components.preprocessors import DocumentSplitter
from haystack.components.writers import DocumentWriter
from milvus_haystack import MilvusDocumentStore
from milvus_haystack.milvus_embedding_retriever import MilvusEmbeddingRetriever
current_file_path = os.path.abspath(__file__)
file_paths = [current_file_path] # You can replace it with your own file paths.
document_store = MilvusDocumentStore(
connection_args={"uri": "./milvus.db"},
drop_old=True,
)
indexing_pipeline = Pipeline()
indexing_pipeline.add_component("converter", MarkdownToDocument())
indexing_pipeline.add_component("splitter", DocumentSplitter(split_by="sentence", split_length=2))
indexing_pipeline.add_component("embedder", OpenAIDocumentEmbedder())
indexing_pipeline.add_component("writer", DocumentWriter(document_store))
indexing_pipeline.connect("converter", "splitter")
indexing_pipeline.connect("splitter", "embedder")
indexing_pipeline.connect("embedder", "writer")
indexing_pipeline.run({"converter": {"sources": file_paths}})
print("Number of documents:", document_store.count_documents())
创建检索管道并尝试查询
question = "How to set the service uri with milvus lite?" # You can replace it with your own question.
retrieval_pipeline = Pipeline()
retrieval_pipeline.add_component("embedder", OpenAITextEmbedder())
retrieval_pipeline.add_component("retriever", MilvusEmbeddingRetriever(document_store=document_store, top_k=3))
retrieval_pipeline.connect("embedder", "retriever")
retrieval_results = retrieval_pipeline.run({"embedder": {"text": question}})
for doc in retrieval_results["retriever"]["documents"]:
print(doc.content)
print("-" * 10)
创建 RAG 管道并尝试查询
from haystack.utils import Secret
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.builders import ChatPromptBuilder
from haystack.dataclasses import ChatMessage
prompt_template = """Answer the following query based on the provided context. If the context does
not include an answer, reply with 'I don't know'.\n
Query: {{query}}
Documents:
{% for doc in documents %}
{{ doc.content }}
{% endfor %}
Answer:
"""
llm = OpenAIChatGenerator(api_key=Secret.from_env_var("OPENAI_API_KEY"), model="gpt-4o-mini")
rag_pipeline = Pipeline()
rag_pipeline.add_component("text_embedder", OpenAITextEmbedder())
rag_pipeline.add_component("retriever", MilvusEmbeddingRetriever(document_store=document_store, top_k=3))
rag_pipeline.add_component("prompt_builder", ChatPromptBuilder(template=[ChatMessage.from_user(prompt_template)]))
rag_pipeline.add_component("llm", llm)
rag_pipeline.connect("prompt_builder.prompt", "llm.messages")
rag_pipeline.connect("text_embedder.embedding", "retriever.query_embedding")
rag_pipeline.connect("retriever", "prompt_builder")
rag_pipeline.connect("prompt_builder.prompt", "llm.messages")
messages = [ChatMessage.from_user(prompt_template)]
results = rag_pipeline.run({"text_embedder": {"text": question}, "prompt_builder": {"query": question}})
print('RAG answer:', results["llm"]["replies"][0].text)
稀疏检索
使用 haystack sparse embedder 进行稀疏检索
本示例演示了使用 Haystack 的稀疏嵌入器进行稀疏索引和检索的基本方法。
from haystack import Document, Pipeline
from haystack.components.writers import DocumentWriter
from haystack.document_stores.types import DuplicatePolicy
from haystack_integrations.components.embedders.fastembed import (
FastembedSparseDocumentEmbedder,
FastembedSparseTextEmbedder,
)
from milvus_haystack import MilvusDocumentStore, MilvusSparseEmbeddingRetriever
document_store = MilvusDocumentStore(
connection_args={"uri": "./milvus.db"},
sparse_vector_field="sparse_vector", # Specify a name of the sparse vector field to enable sparse retrieval.
drop_old=True,
)
documents = [
Document(content="My name is Wolfgang and I live in Berlin"),
Document(content="I saw a black horse running"),
Document(content="Germany has many big cities"),
Document(content="full text search is supported by Milvus."),
]
sparse_document_embedder = FastembedSparseDocumentEmbedder()
writer = DocumentWriter(document_store=document_store, policy=DuplicatePolicy.NONE)
indexing_pipeline = Pipeline()
indexing_pipeline.add_component("sparse_document_embedder", sparse_document_embedder)
indexing_pipeline.add_component("writer", writer)
indexing_pipeline.connect("sparse_document_embedder", "writer")
indexing_pipeline.run({"sparse_document_embedder": {"documents": documents}})
retrieval_pipeline = Pipeline()
retrieval_pipeline.add_component("sparse_text_embedder", FastembedSparseTextEmbedder())
retrieval_pipeline.add_component("sparse_retriever", MilvusSparseEmbeddingRetriever(document_store=document_store))
retrieval_pipeline.connect("sparse_text_embedder.sparse_embedding", "sparse_retriever.query_sparse_embedding")
query = "who supports full text search?"
result = retrieval_pipeline.run({"sparse_text_embedder": {"text": query}})
print(result["sparse_retriever"]["documents"][0])
# Document(id=..., content: 'full text search is supported by Milvus.', sparse_embedding: vector with 48 non-zero elements)
使用 Milvus 内置 BM25 函数进行稀疏检索
Milvus 提供了一个内置的 BM25 函数,可以直接从文本字段生成稀疏向量。与使用 Haystack 的稀疏嵌入器相比,这种方法简化了管道的构建。主要区别在于
- 我们需要在文档存储中指定一个
BM25BuiltInFunction,并附带一些字段规范参数。 - 我们不需要显式使用嵌入器,因为 Milvus 在 Milvus 服务器端处理稀疏嵌入。
- 管道组件和连接更少,更简单。
这是一个示例
from milvus_haystack.function import BM25BuiltInFunction
document_store = MilvusDocumentStore(
connection_args={"uri": "https://:19530"},
sparse_vector_field="sparse_vector",
text_field="text",
builtin_function=[
BM25BuiltInFunction( # The BM25 function converts the text into a sparse vector.
input_field_names="text", output_field_names="sparse_vector",
)
],
drop_old=True,
)
writer = DocumentWriter(document_store=document_store, policy=DuplicatePolicy.NONE)
indexing_pipeline = Pipeline()
indexing_pipeline.add_component("writer", writer)
indexing_pipeline.run({"writer": {"documents": documents}})
retrieval_pipeline = Pipeline()
retrieval_pipeline.add_component("sparse_retriever", MilvusSparseEmbeddingRetrieve(document_store=document_store))
query = "who supports full text search?"
result = retrieval_pipeline.run({"sparse_retriever": {"query_text": query}})
print(result["sparse_retriever"]["documents"][0])
混合检索
使用 haystack sparse embedder 进行混合检索
本示例演示了使用 Haystack 的稀疏嵌入器执行混合检索的基本方法。
from haystack import Document, Pipeline
from haystack.components.embedders import OpenAIDocumentEmbedder, OpenAITextEmbedder
from haystack.components.writers import DocumentWriter
from haystack.document_stores.types import DuplicatePolicy
from haystack_integrations.components.embedders.fastembed import (
FastembedSparseDocumentEmbedder,
FastembedSparseTextEmbedder,
)
from milvus_haystack import MilvusDocumentStore, MilvusHybridRetriever
document_store = MilvusDocumentStore(
connection_args={"uri": "./milvus.db"},
drop_old=True,
sparse_vector_field="sparse_vector", # Specify a name of the sparse vector field to enable hybrid retrieval.
)
documents = [
Document(content="My name is Wolfgang and I live in Berlin"),
Document(content="I saw a black horse running"),
Document(content="Germany has many big cities"),
Document(content="full text search is supported by Milvus."),
]
writer = DocumentWriter(document_store=document_store, policy=DuplicatePolicy.NONE)
indexing_pipeline = Pipeline()
indexing_pipeline.add_component("sparse_doc_embedder", FastembedSparseDocumentEmbedder())
indexing_pipeline.add_component("dense_doc_embedder", OpenAIDocumentEmbedder())
indexing_pipeline.add_component("writer", writer)
indexing_pipeline.connect("sparse_doc_embedder", "dense_doc_embedder")
indexing_pipeline.connect("dense_doc_embedder", "writer")
indexing_pipeline.run({"sparse_doc_embedder": {"documents": documents}})
retrieval_pipeline = Pipeline()
retrieval_pipeline.add_component("sparse_text_embedder",
FastembedSparseTextEmbedder(model="prithvida/Splade_PP_en_v1"))
retrieval_pipeline.add_component("dense_text_embedder", OpenAITextEmbedder())
retrieval_pipeline.add_component(
"retriever",
MilvusHybridRetriever(
document_store=document_store,
# reranker=WeightedRanker(0.5, 0.5), # Default is RRFRanker()
)
)
retrieval_pipeline.connect("sparse_text_embedder.sparse_embedding", "retriever.query_sparse_embedding")
retrieval_pipeline.connect("dense_text_embedder.embedding", "retriever.query_embedding")
question = "who supports full text search?"
results = retrieval_pipeline.run(
{"dense_text_embedder": {"text": question},
"sparse_text_embedder": {"text": question}}
)
print(results["retriever"]["documents"][0])
# Document(id=..., content: 'full text search is supported by Milvus.', embedding: vector of size 1536, sparse_embedding: vector with 48 non-zero elements)
使用 Milvus 内置 BM25 函数进行混合检索
Milvus 提供了一个内置的 BM25 函数,可以直接从文本字段生成稀疏向量。与使用 Haystack 的稀疏嵌入器相比,这种方法简化了管道的构建,使其成为语义搜索的有用补充。主要区别在于
- 我们需要在文档存储中指定一个
BM25BuiltInFunction,并附带一些字段规范参数。 - 我们不需要显式使用嵌入器,因为 Milvus 在 Milvus 服务器端处理稀疏嵌入。
- 管道更简单,组件和连接更少,这在混合检索设置中尤其有益。
这是一个示例
from milvus_haystack.function import BM25BuiltInFunction
document_store = MilvusDocumentStore(
connection_args={"uri": "https://:19530"},
sparse_vector_field="sparse_vector",
text_field="text",
builtin_function=[
BM25BuiltInFunction( # The BM25 function converts the text into a sparse vector.
input_field_names="text", output_field_names="sparse_vector",
)
],
drop_old=True,
)
writer = DocumentWriter(document_store=document_store, policy=DuplicatePolicy.NONE)
indexing_pipeline = Pipeline()
indexing_pipeline.add_component("writer", writer)
indexing_pipeline.run({"writer": {"documents": documents}})
retrieval_pipeline = Pipeline()
retrieval_pipeline.add_component("sparse_retriever", MilvusSparseEmbeddingRetrieve(document_store=document_store))
query = "who supports full text search?"
result = retrieval_pipeline.run({"sparse_retriever": {"query_text": query}})
print(result["sparse_retriever"]["documents"][0])
许可证
milvus-haystack 在 Apache-2.0 许可证的条款下分发。
