教程:使用多路复用器简化 Pipeline 输入
最后更新:2025 年 8 月 25 日
- 级别:中级
- 完成时间:10 分钟
- 使用的组件:Multiplexer、InMemoryDocumentStore、HuggingFaceAPIDocumentEmbedder、HuggingFaceAPITextEmbedder、InMemoryEmbeddingRetriever、PromptBuilder、HuggingFaceAPIGenerator 和 AnswerBuilder
- 先决条件:您必须拥有一个 Hugging Face API 密钥,并且熟悉 创建 Pipeline
- 目标:完成本教程后,您将学会如何使用 Multiplexer 来简化
Pipeline.run()方法的输入
从 2.2.0 版本开始,Haystack 中的
Multiplexer已被弃用,并将在 v2.4.0 版本中完全移除。我们建议使用 BranchJoiner。有关此弃用的更多详细信息,请参阅 Github 上的 Haystack 2.2.0 发行说明。
概述
如果您曾经构建过包含 3-4 个以上组件的 Haystack Pipeline,您可能已经注意到传递给 Pipeline 的 run() 方法的输入数量会无休止地增长。新组件会从 Pipeline 的其他组件获取部分输入,但许多组件还需要用户提供额外的输入。因此,Pipeline.run() 的 data 输入会不断增长,变得非常重复。
有一个组件可以帮助更有效地管理这种重复性,它叫做 Multiplexer。
在本教程中,您将学习如何使用 Multiplexer 显著简化 RAG Pipeline 的 Pipeline.run() 方法。
设置
准备 Colab 环境
安装 Haystack
使用 pip 安装 Haystack
%%bash
pip install haystack-ai "huggingface_hub>=0.23.0, <0.28.0"
输入 Hugging Face API 密钥
设置 Hugging Face API 密钥
import os
from getpass import getpass
if "HF_API_TOKEN" not in os.environ:
os.environ["HF_API_TOKEN"] = getpass("Enter Hugging Face token:")
使用 Pipeline 索引文档
创建一个 Pipeline,将小型示例数据集及其嵌入存储在 InMemoryDocumentStore 中。您将使用 HuggingFaceAPIDocumentEmbedder 为您的文档生成嵌入,并使用 DocumentWriter 将它们写入文档存储。
将这些组件添加到管道后,连接它们并运行管道。
如果您想了解在将文件索引到文档存储之前进行预处理,请遵循 预处理不同文件类型 教程。
from haystack import Pipeline, Document
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.writers import DocumentWriter
from haystack.components.embedders import HuggingFaceAPIDocumentEmbedder
documents = [
Document(content="My name is Jean and I live in Paris."),
Document(content="My name is Mark and I live in Berlin."),
Document(content="My name is Giorgio and I live in Rome."),
Document(content="My name is Giorgio and I live in Milan."),
Document(content="My name is Giorgio and I lived in many cities, but I settled in Naples eventually."),
]
document_store = InMemoryDocumentStore()
indexing_pipeline = Pipeline()
indexing_pipeline.add_component(
instance=HuggingFaceAPIDocumentEmbedder(
api_type="serverless_inference_api", api_params={"model": "sentence-transformers/all-MiniLM-L6-v2"}
),
name="doc_embedder",
)
indexing_pipeline.add_component(instance=DocumentWriter(document_store=document_store), name="doc_writer")
indexing_pipeline.connect("doc_embedder.documents", "doc_writer.documents")
indexing_pipeline.run({"doc_embedder": {"documents": documents}})
构建 RAG Pipeline
使用 HuggingFaceAPITextEmbedder、InMemoryEmbeddingRetriever、PromptBuilder 和 HuggingFaceAPIGenerator 构建一个基本的检索增强生成 (RAG) Pipeline。此外,添加 AnswerBuilder 来帮助您用 meta 信息和 query 输入来丰富生成的答案。
有关使用 Haystack 创建 RAG Pipeline 的分步指南,请遵循 创建您的第一个检索增强 QA Pipeline 教程。
from haystack.components.embedders import HuggingFaceAPITextEmbedder
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.components.builders import PromptBuilder, AnswerBuilder
from haystack.components.generators import HuggingFaceAPIGenerator
template = """
<|user|>
Answer the question based on the given context.
Context:
{% for document in documents %}
{{ document.content }}
{% endfor %}
Question: {{ question }}</s>
<|assistant|>
Answer:
"""
pipe = Pipeline()
pipe.add_component(
"embedder",
HuggingFaceAPITextEmbedder(
api_type="serverless_inference_api", api_params={"model": "sentence-transformers/all-MiniLM-L6-v2"}
),
)
pipe.add_component("retriever", InMemoryEmbeddingRetriever(document_store=document_store))
pipe.add_component("prompt_builder", PromptBuilder(template=template))
pipe.add_component(
"llm",
HuggingFaceAPIGenerator(api_type="serverless_inference_api", api_params={"model": "HuggingFaceH4/zephyr-7b-beta"}),
)
pipe.add_component("answer_builder", AnswerBuilder())
pipe.connect("embedder.embedding", "retriever.query_embedding")
pipe.connect("retriever", "prompt_builder.documents")
pipe.connect("prompt_builder", "llm")
pipe.connect("llm.replies", "answer_builder.replies")
pipe.connect("llm.meta", "answer_builder.meta")
运行 Pipeline
将 query 传递给 embedder、prompt_builder 和 answer_builder 并运行它
query = "Where does Mark live?"
pipe.run({"embedder": {"text": query}, "prompt_builder": {"question": query}, "answer_builder": {"query": query}})
在这个基础 RAG Pipeline 中,需要 query 来操作的组件是 embedder、prompt_builder 和 answer_builder。然而,随着您向 Pipeline 中添加更多组件(如 Retrievers 和 Rankers),需要 query 的组件数量可能会无限增加。这会导致重复且日益复杂的 Pipeline.run() 调用。在这种情况下,使用 Multiplexer 可以帮助简化和整理 Pipeline.run()。
引入 Multiplexer
Multiplexer 是一个可以接受多个输入连接的组件,然后将它接收到的第一个值分发给连接到其输出的所有组件。在这种情况下,您可以将此组件连接到运行时需要 query 作为输入的其他 Pipeline 组件。
现在,初始化 Multiplexer,指定预期的输入类型(在本例中为 str,因为 query 是一个字符串)
from haystack.components.others import Multiplexer
multiplexer = Multiplexer(str)
将 Multiplexer 添加到 Pipeline
创建相同的 RAG Pipeline,但这次使用 Multiplexer。将 Multiplexer 添加到 Pipeline,并将其连接到所有需要 query 作为输入的组件。
from haystack.components.embedders import HuggingFaceAPITextEmbedder
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.components.builders import PromptBuilder, AnswerBuilder
from haystack.components.generators import HuggingFaceAPIGenerator
template = """
<|user|>
Answer the question based on the given context.
Context:
{% for document in documents %}
{{ document.content }}
{% endfor %}
Question: {{ question }}</s>
<|assistant|>
Answer:
"""
pipe = Pipeline()
pipe.add_component("multiplexer", multiplexer)
pipe.add_component(
"embedder",
HuggingFaceAPITextEmbedder(
api_type="serverless_inference_api", api_params={"model": "sentence-transformers/all-MiniLM-L6-v2"}
),
)
pipe.add_component("retriever", InMemoryEmbeddingRetriever(document_store=document_store))
pipe.add_component("prompt_builder", PromptBuilder(template=template))
pipe.add_component(
"llm",
HuggingFaceAPIGenerator(api_type="serverless_inference_api", api_params={"model": "HuggingFaceH4/zephyr-7b-beta"}),
)
pipe.add_component("answer_builder", AnswerBuilder())
# Connect the Multiplexer to all the components that need the query
pipe.connect("multiplexer.value", "embedder.text")
pipe.connect("multiplexer.value", "prompt_builder.question")
pipe.connect("multiplexer.value", "answer_builder.query")
pipe.connect("embedder.embedding", "retriever.query_embedding")
pipe.connect("retriever", "prompt_builder.documents")
pipe.connect("prompt_builder", "llm")
pipe.connect("llm.replies", "answer_builder.replies")
pipe.connect("llm.meta", "answer_builder.meta")
使用 Multiplexer 运行 Pipeline
运行您已更新了 Multiplexer 的 Pipeline。这次,您无需单独将 query 传递给 prompt_builder、retriever 和 answer_builder,只需将其传递给 multiplexer。因此,您将获得相同的答案。
pipe.run({"multiplexer": {"value": "Where does Mark live?"}})
下一步
🎉 恭喜!您已经使用 Multiplexer 简化了您的 Pipeline 运行!
如果您喜欢本教程,还有更多关于 Haystack 的内容可以学习。
要及时了解最新的 Haystack 开发动态,您可以 注册我们的新闻通讯 或 加入 Haystack Discord 社区。
感谢阅读!
