📘 **TELUS Agriculture & Consumer Goods** 如何通过 **Haystack Agents** 转变促销交易

集成:Milvus

在 Haystack 中使用 Milvus 向量数据库

作者
Zilliz

Twitter Follow discord

目录

近期更新

安装

pip install --upgrade pymilvus milvus-haystack

使用

在 Haystack 管道中快速使用 MilvusDocumentStore

from haystack import Document
from milvus_haystack import MilvusDocumentStore

document_store = MilvusDocumentStore(
    connection_args={"uri": "./milvus.db"},
    drop_old=True,
)
documents = [Document(
    content="A Foo Document",
    meta={"page": "100", "chapter": "intro"},
    embedding=[-10.0] * 128,
)]
document_store.write_documents(documents)
print(document_store.count_documents())  # 1

连接 Milvus 的不同方式

  • 对于 Milvus Lite 的情况,最方便的方法就是将 URI 设置为本地文件。
document_store = MilvusDocumentStore(
    connection_args={"uri": "./milvus.db"},
    drop_old=True,
)
  • 对于在 docker 或 kubernetes 上的 Milvus 服务器,建议在处理大规模数据时使用。启动 Milvus 服务后,您可以使用指定的 URI 连接到服务。
document_store = MilvusDocumentStore(
    connection_args={"uri": "https://:19530"},
    drop_old=True,
)
from haystack.utils import Secret
document_store = MilvusDocumentStore(
    connection_args={
        "uri": "https://in03-ba4234asae.api.gcp-us-west1.zillizcloud.com",  # Your Public Endpoint
        "token": Secret.from_env_var("ZILLIZ_CLOUD_API_KEY"),  # API key, we recommend using the Secret class to load the token from env variable for security.
        "secure": True
    },
    drop_old=True,
)

深入使用

准备一个 OpenAI API 密钥并将其设置为环境变量

export OPENAI_API_KEY=<your_api_key>

创建索引管道并索引一些文档

import glob
import os

from haystack import Pipeline
from haystack.components.converters import MarkdownToDocument
from haystack.components.embedders import OpenAIDocumentEmbedder, OpenAITextEmbedder
from haystack.components.preprocessors import DocumentSplitter
from haystack.components.writers import DocumentWriter

from milvus_haystack import MilvusDocumentStore
from milvus_haystack.milvus_embedding_retriever import MilvusEmbeddingRetriever

current_file_path = os.path.abspath(__file__)
file_paths = [current_file_path]  # You can replace it with your own file paths.

document_store = MilvusDocumentStore(
    connection_args={"uri": "./milvus.db"},
    drop_old=True,
)
indexing_pipeline = Pipeline()
indexing_pipeline.add_component("converter", MarkdownToDocument())
indexing_pipeline.add_component("splitter", DocumentSplitter(split_by="sentence", split_length=2))
indexing_pipeline.add_component("embedder", OpenAIDocumentEmbedder())
indexing_pipeline.add_component("writer", DocumentWriter(document_store))
indexing_pipeline.connect("converter", "splitter")
indexing_pipeline.connect("splitter", "embedder")
indexing_pipeline.connect("embedder", "writer")
indexing_pipeline.run({"converter": {"sources": file_paths}})

print("Number of documents:", document_store.count_documents())

创建检索管道并尝试查询

question = "How to set the service uri with milvus lite?"  # You can replace it with your own question. 

retrieval_pipeline = Pipeline()
retrieval_pipeline.add_component("embedder", OpenAITextEmbedder())
retrieval_pipeline.add_component("retriever", MilvusEmbeddingRetriever(document_store=document_store, top_k=3))
retrieval_pipeline.connect("embedder", "retriever")

retrieval_results = retrieval_pipeline.run({"embedder": {"text": question}})

for doc in retrieval_results["retriever"]["documents"]:
    print(doc.content)
    print("-" * 10)

创建 RAG 管道并尝试查询

from haystack.utils import Secret

from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.builders import ChatPromptBuilder
from haystack.dataclasses import ChatMessage

prompt_template = """Answer the following query based on the provided context. If the context does
                     not include an answer, reply with 'I don't know'.\n
                     Query: {{query}}
                     Documents:
                     {% for doc in documents %}
                        {{ doc.content }}
                     {% endfor %}
                     Answer: 
                  """

llm = OpenAIChatGenerator(api_key=Secret.from_env_var("OPENAI_API_KEY"), model="gpt-4o-mini")

rag_pipeline = Pipeline()
rag_pipeline.add_component("text_embedder", OpenAITextEmbedder())
rag_pipeline.add_component("retriever", MilvusEmbeddingRetriever(document_store=document_store, top_k=3))
rag_pipeline.add_component("prompt_builder", ChatPromptBuilder(template=[ChatMessage.from_user(prompt_template)]))

rag_pipeline.add_component("llm", llm)
rag_pipeline.connect("prompt_builder.prompt", "llm.messages")
rag_pipeline.connect("text_embedder.embedding", "retriever.query_embedding")
rag_pipeline.connect("retriever", "prompt_builder")
rag_pipeline.connect("prompt_builder.prompt", "llm.messages")

messages = [ChatMessage.from_user(prompt_template)]
results = rag_pipeline.run({"text_embedder": {"text": question}, "prompt_builder": {"query": question}})

print('RAG answer:', results["llm"]["replies"][0].text)

稀疏检索

使用 haystack sparse embedder 进行稀疏检索

本示例演示了使用 Haystack 的稀疏嵌入器进行稀疏索引和检索的基本方法。

from haystack import Document, Pipeline
from haystack.components.writers import DocumentWriter
from haystack.document_stores.types import DuplicatePolicy
from haystack_integrations.components.embedders.fastembed import (
    FastembedSparseDocumentEmbedder,
    FastembedSparseTextEmbedder,
)

from milvus_haystack import MilvusDocumentStore, MilvusSparseEmbeddingRetriever

document_store = MilvusDocumentStore(
    connection_args={"uri": "./milvus.db"},
    sparse_vector_field="sparse_vector",  # Specify a name of the sparse vector field to enable sparse retrieval.
    drop_old=True,
)

documents = [
    Document(content="My name is Wolfgang and I live in Berlin"),
    Document(content="I saw a black horse running"),
    Document(content="Germany has many big cities"),
    Document(content="full text search is supported by Milvus."),
]

sparse_document_embedder = FastembedSparseDocumentEmbedder()
writer = DocumentWriter(document_store=document_store, policy=DuplicatePolicy.NONE)

indexing_pipeline = Pipeline()
indexing_pipeline.add_component("sparse_document_embedder", sparse_document_embedder)
indexing_pipeline.add_component("writer", writer)
indexing_pipeline.connect("sparse_document_embedder", "writer")

indexing_pipeline.run({"sparse_document_embedder": {"documents": documents}})

retrieval_pipeline = Pipeline()
retrieval_pipeline.add_component("sparse_text_embedder", FastembedSparseTextEmbedder())
retrieval_pipeline.add_component("sparse_retriever", MilvusSparseEmbeddingRetriever(document_store=document_store))
retrieval_pipeline.connect("sparse_text_embedder.sparse_embedding", "sparse_retriever.query_sparse_embedding")

query = "who supports full text search?"

result = retrieval_pipeline.run({"sparse_text_embedder": {"text": query}})

print(result["sparse_retriever"]["documents"][0])

# Document(id=..., content: 'full text search is supported by Milvus.', sparse_embedding: vector with 48 non-zero elements)

使用 Milvus 内置 BM25 函数进行稀疏检索

Milvus 提供了一个内置的 BM25 函数,可以直接从文本字段生成稀疏向量。与使用 Haystack 的稀疏嵌入器相比,这种方法简化了管道的构建。主要区别在于

  1. 我们需要在文档存储中指定一个 BM25BuiltInFunction,并附带一些字段规范参数。
  2. 我们不需要显式使用嵌入器,因为 Milvus 在 Milvus 服务器端处理稀疏嵌入。
  3. 管道组件和连接更少,更简单。

这是一个示例

from milvus_haystack.function import BM25BuiltInFunction

document_store = MilvusDocumentStore(
    connection_args={"uri": "https://:19530"},
    sparse_vector_field="sparse_vector",
    text_field="text",
    builtin_function=[
        BM25BuiltInFunction(  # The BM25 function converts the text into a sparse vector.
            input_field_names="text", output_field_names="sparse_vector",
        )
    ],
    drop_old=True,
)
writer = DocumentWriter(document_store=document_store, policy=DuplicatePolicy.NONE)
indexing_pipeline = Pipeline()
indexing_pipeline.add_component("writer", writer)
indexing_pipeline.run({"writer": {"documents": documents}})
retrieval_pipeline = Pipeline()
retrieval_pipeline.add_component("sparse_retriever", MilvusSparseEmbeddingRetrieve(document_store=document_store))
query = "who supports full text search?"
result = retrieval_pipeline.run({"sparse_retriever": {"query_text": query}})
print(result["sparse_retriever"]["documents"][0])

混合检索

使用 haystack sparse embedder 进行混合检索

本示例演示了使用 Haystack 的稀疏嵌入器执行混合检索的基本方法。

from haystack import Document, Pipeline
from haystack.components.embedders import OpenAIDocumentEmbedder, OpenAITextEmbedder
from haystack.components.writers import DocumentWriter
from haystack.document_stores.types import DuplicatePolicy
from haystack_integrations.components.embedders.fastembed import (
    FastembedSparseDocumentEmbedder,
    FastembedSparseTextEmbedder,
)

from milvus_haystack import MilvusDocumentStore, MilvusHybridRetriever

document_store = MilvusDocumentStore(
    connection_args={"uri": "./milvus.db"},
    drop_old=True,
    sparse_vector_field="sparse_vector",  # Specify a name of the sparse vector field to enable hybrid retrieval.
)

documents = [
    Document(content="My name is Wolfgang and I live in Berlin"),
    Document(content="I saw a black horse running"),
    Document(content="Germany has many big cities"),
    Document(content="full text search is supported by Milvus."),
]

writer = DocumentWriter(document_store=document_store, policy=DuplicatePolicy.NONE)

indexing_pipeline = Pipeline()
indexing_pipeline.add_component("sparse_doc_embedder", FastembedSparseDocumentEmbedder())
indexing_pipeline.add_component("dense_doc_embedder", OpenAIDocumentEmbedder())
indexing_pipeline.add_component("writer", writer)
indexing_pipeline.connect("sparse_doc_embedder", "dense_doc_embedder")
indexing_pipeline.connect("dense_doc_embedder", "writer")

indexing_pipeline.run({"sparse_doc_embedder": {"documents": documents}})

retrieval_pipeline = Pipeline()
retrieval_pipeline.add_component("sparse_text_embedder",
                                FastembedSparseTextEmbedder(model="prithvida/Splade_PP_en_v1"))

retrieval_pipeline.add_component("dense_text_embedder", OpenAITextEmbedder())
retrieval_pipeline.add_component(
    "retriever",
    MilvusHybridRetriever(
        document_store=document_store,
        # reranker=WeightedRanker(0.5, 0.5),  # Default is RRFRanker()
    )
)

retrieval_pipeline.connect("sparse_text_embedder.sparse_embedding", "retriever.query_sparse_embedding")
retrieval_pipeline.connect("dense_text_embedder.embedding", "retriever.query_embedding")

question = "who supports full text search?"

results = retrieval_pipeline.run(
    {"dense_text_embedder": {"text": question},
     "sparse_text_embedder": {"text": question}}
)

print(results["retriever"]["documents"][0])

# Document(id=..., content: 'full text search is supported by Milvus.', embedding: vector of size 1536, sparse_embedding: vector with 48 non-zero elements)

使用 Milvus 内置 BM25 函数进行混合检索

Milvus 提供了一个内置的 BM25 函数,可以直接从文本字段生成稀疏向量。与使用 Haystack 的稀疏嵌入器相比,这种方法简化了管道的构建,使其成为语义搜索的有用补充。主要区别在于

  1. 我们需要在文档存储中指定一个 BM25BuiltInFunction,并附带一些字段规范参数。
  2. 我们不需要显式使用嵌入器,因为 Milvus 在 Milvus 服务器端处理稀疏嵌入。
  3. 管道更简单,组件和连接更少,这在混合检索设置中尤其有益。

这是一个示例

from milvus_haystack.function import BM25BuiltInFunction

document_store = MilvusDocumentStore(
    connection_args={"uri": "https://:19530"},
    sparse_vector_field="sparse_vector",
    text_field="text",
    builtin_function=[
        BM25BuiltInFunction(  # The BM25 function converts the text into a sparse vector.
            input_field_names="text", output_field_names="sparse_vector",
        )
    ],
    drop_old=True,
)
writer = DocumentWriter(document_store=document_store, policy=DuplicatePolicy.NONE)
indexing_pipeline = Pipeline()
indexing_pipeline.add_component("writer", writer)
indexing_pipeline.run({"writer": {"documents": documents}})
retrieval_pipeline = Pipeline()
retrieval_pipeline.add_component("sparse_retriever", MilvusSparseEmbeddingRetrieve(document_store=document_store))
query = "who supports full text search?"
result = retrieval_pipeline.run({"sparse_retriever": {"query_text": query}})
print(result["sparse_retriever"]["documents"][0])

许可证

milvus-haystackApache-2.0 许可证的条款下分发。