教程:创建自定义 SuperComponents
最后更新:2025 年 8 月 25 日
- 级别:中级
- 完成时间:20 分钟
- 使用的概念和组件:
@super_component,Pipeline,DocumentJoiner,SentenceTransformersTextEmbedder,InMemoryBM25Retriever,InMemoryEmbeddingRetriever,TransformersSimilarityRanker - 目标:完成本教程后,您将学会如何使用
@super_component装饰器创建自定义 SuperComponents,以简化复杂的管道并使其可重用为组件。
概述
在本教程中,您将学习如何使用 @super_component 装饰器创建自定义 SuperComponents。SuperComponents 是将复杂管道封装到具有简化接口的可重用组件的强大方法。
我们将探讨几个示例
- 创建简单的 HybridRetriever SuperComponent
- 使用 ranker 组件扩展我们的 HybridRetriever
- 创建具有自定义输入和输出映射的 SuperComponent
- 创建暴露非叶子组件输出的 SuperComponent
@super_component 装饰器可轻松将定义了管道的类转换为功能齐全的 Haystack 组件,该组件可在其他管道或应用程序中使用,而不会丢失诸如内容跟踪和调试等管道功能。它所需要的只是类有一个名为 pipeline 的属性。
准备环境
首先,让我们安装 Haystack 和我们需要的依赖项
%%bash
pip install haystack-ai
pip install "sentence-transformers>=4.1.0" datasets transformers[torch,sentencepiece]
理解 @super_component 装饰器
@super_component 装饰器是一个强大的工具,它允许您通过包装 Pipeline 来创建自定义组件。它处理了组件接口和底层管道之间输入和输出映射的所有复杂性。
当您使用 @super_component 装饰器时,您需要定义一个类,该类具有
- 一个创建 Pipeline 并将其分配给
self.pipeline的__init__方法 - 可选的
input_mapping和output_mapping属性,用于自定义输入和输出的映射方式
然后,该装饰器
- 创建一个继承自
SuperComponent的新类 - 复制原始类中的所有方法和属性
- 添加初始化逻辑以正确设置 SuperComponent
让我们通过一些实际示例来看看它是如何工作的。
1. 创建 HybridRetriever SuperComponent
让我们从一个简单的示例开始:创建一个结合了 BM25 和基于嵌入的检索的 HybridRetriever。这个 SuperComponent 将接收一个查询并返回相关文档。
from haystack import Document, Pipeline, super_component
from haystack.components.joiners import DocumentJoiner
from haystack.components.embedders import SentenceTransformersTextEmbedder
from haystack.components.retrievers import InMemoryBM25Retriever, InMemoryEmbeddingRetriever
from haystack.document_stores.in_memory import InMemoryDocumentStore
from datasets import load_dataset
@super_component
class HybridRetriever:
def __init__(self, document_store: InMemoryDocumentStore, embedder_model: str = "BAAI/bge-small-en-v1.5"):
# Create the components
embedding_retriever = InMemoryEmbeddingRetriever(document_store)
bm25_retriever = InMemoryBM25Retriever(document_store)
text_embedder = SentenceTransformersTextEmbedder(embedder_model)
document_joiner = DocumentJoiner(join_mode="reciprocal_rank_fusion")
# Create the pipeline
self.pipeline = Pipeline()
self.pipeline.add_component("text_embedder", text_embedder)
self.pipeline.add_component("embedding_retriever", embedding_retriever)
self.pipeline.add_component("bm25_retriever", bm25_retriever)
self.pipeline.add_component("document_joiner", document_joiner)
# Connect the components
self.pipeline.connect("text_embedder", "embedding_retriever")
self.pipeline.connect("bm25_retriever", "document_joiner")
self.pipeline.connect("embedding_retriever", "document_joiner")
现在,让我们加载一个数据集并测试我们的 HybridRetriever
# Load a dataset
dataset = load_dataset("HaystackBot/medrag-pubmed-chunk-with-embeddings", split="train")
docs = [Document(content=doc["contents"], embedding=doc["embedding"]) for doc in dataset]
document_store = InMemoryDocumentStore()
document_store.write_documents(docs)
# Create and run the HybridRetriever
query = "What treatments are available for chronic bronchitis?"
retriever = HybridRetriever(document_store)
result = retriever.run(
text=query, query=query
) # `query` variable will match with `text` and `query` inputs of components in the pipeline.
# Print the results
print(f"Found {len(result['documents'])} documents")
for i, doc in enumerate(result["documents"][:3]): # Show first 3 documents
print(f"\nDocument {i+1} (Score: {doc.score:.4f}):")
print(doc.content[:200] + "...")
HybridRetriever 的工作原理
让我们分解一下我们的 HybridRetriever SuperComponent 中发生的情况
- 我们定义了一个用
@super_component装饰的类 - 在
__init__方法中,我们- 创建所需的所有组件(嵌入检索器、BM25 检索器等)
- 创建一个 Pipeline 并将所有组件添加到其中
- 连接组件以定义数据流
@super_component装饰器处理使我们的类正常工作的复杂性
如果我们定义了类似 {"query": ["text_embedder.text", "bm25_retriever.query"]} 的输入映射,我们可以调用 retriever.run(query=query),查询将自动路由到文本嵌入器的 text 输入和 BM25 检索器的 query 输入。
您还可以通过 output_mapping 指定如何公开输出。例如,输出映射 {"document_joiner.documents": "documents"} 意味着当您调用 retriever.run(...) 时,由 document_joiner 生成的文档将以 documents 的名称返回。
2. 带有重新排序和自定义 'input_mapping' 的 HybridRetriever
现在,让我们通过添加一个 ranker 组件来增强我们的 HybridRetriever。这将根据文档与查询的语义相似性重新排序文档,从而可能提高结果的质量。我们还定义了一个自定义的 input_mapping。
from haystack import Document, Pipeline, super_component
from haystack.components.joiners import DocumentJoiner
from haystack.components.embedders import SentenceTransformersTextEmbedder
from haystack.components.rankers import TransformersSimilarityRanker
from haystack.components.retrievers import InMemoryBM25Retriever, InMemoryEmbeddingRetriever
from haystack.document_stores.in_memory import InMemoryDocumentStore
from datasets import load_dataset
@super_component
class HybridRetrieverWithRanker:
def __init__(
self,
document_store: InMemoryDocumentStore,
embedder_model: str = "BAAI/bge-small-en-v1.5",
ranker_model: str = "BAAI/bge-reranker-base",
):
# Create the components
embedding_retriever = InMemoryEmbeddingRetriever(document_store)
bm25_retriever = InMemoryBM25Retriever(document_store)
text_embedder = SentenceTransformersTextEmbedder(embedder_model)
document_joiner = DocumentJoiner()
ranker = TransformersSimilarityRanker(ranker_model)
# Create the pipeline
self.pipeline = Pipeline()
self.pipeline.add_component("text_embedder", text_embedder)
self.pipeline.add_component("embedding_retriever", embedding_retriever)
self.pipeline.add_component("bm25_retriever", bm25_retriever)
self.pipeline.add_component("document_joiner", document_joiner)
self.pipeline.add_component("ranker", ranker)
# Connect the components
self.pipeline.connect("text_embedder", "embedding_retriever")
self.pipeline.connect("bm25_retriever", "document_joiner")
self.pipeline.connect("embedding_retriever", "document_joiner")
self.pipeline.connect("document_joiner", "ranker")
# Define input mapping
self.input_mapping = {"query": ["text_embedder.text", "bm25_retriever.query", "ranker.query"]}
# Create and run the HybridRetrieverWithRanker
retriever = HybridRetrieverWithRanker(document_store)
result = retriever.run(query=query) # instead of retriever.run(text=query, query=query) thanks to input_mapping
# Print the results
print(f"Found {len(result['documents'])} documents")
for i, doc in enumerate(result["documents"][:3]): # Show first 3 documents
print(f"\nDocument {i+1} (Score: {doc.score:.4f}):")
print(doc.content[:200] + "...")
比较两个检索器
两个检索器的主要区别在于
- 添加了 Ranker 组件:第二个版本包含一个
TransformersSimilarityRanker,它根据文档与查询的语义相似性对文档进行重新排序。 - 更新了输入映射:我们在输入映射中添加了
"text_embedder.text"、"bm25_retriever.query"和"ranker.query",以确保输入查询发送到所有三个组件,同时简化了retriever.run方法。
ranker 可以通过根据文档与查询的语义相似性对文档进行重新排序来显著提高结果的质量,即使它们在初始检索器中排名不高。
3. SuperComponents 的序列化和反序列化
使用 @super_component 装饰器的主要优点之一是它会自动为您的组件添加序列化和反序列化功能。这意味着您可以使用标准的 Haystack 序列化函数轻松保存和加载您的 SuperComponents。
让我们看看这对于我们的 DocumentPreprocessor 组件是如何工作的
from haystack.core.serialization import component_to_dict, component_from_dict
from haystack.components.preprocessors import DocumentPreprocessor
# Create an instance of our SuperComponent
preprocessor = DocumentPreprocessor()
# Serialize the component to a dictionary
serialized = component_to_dict(preprocessor, "document_preprocessor")
print("Serialized component:")
print(serialized)
# Deserialize the component from the dictionary
deserialized = component_from_dict(DocumentPreprocessor, serialized, "document_preprocessor")
print("\nDeserialized component:")
print(deserialized)
# Verify that the deserialized component works
doc = Document(content="I love pizza!")
result = deserialized.run(documents=[doc])
print(f"\nDeserialized component produced {len(result['documents'])} documents")
由于 @super_component 装饰器会自动添加必要的功能,因此序列化和反序列化过程与 SuperComponents 无缝集成。这特别有用,当您想要
- 保存和加载管道:您可以将整个管道(包括 SuperComponents)保存到文件并在以后加载。
- 部署组件:您可以将 SuperComponents 部署到服务器或云环境。
- 共享组件:您可以与他人共享 SuperComponents,然后他们可以在自己的管道中使用它们。
序列化过程捕获了您的 SuperComponent 的所有初始化参数,确保在反序列化时,它会以相同的配置重新创建。
4. 创建一个输出非叶子组件的 SuperComponent
SuperComponents 的一个强大功能是能够暴露管道中任何组件的输出,而不仅仅是叶子组件。在这里,叶子组件指的是不向管道中的其他组件发送任何输出的组件。让我们创建一个演示此功能的 SuperComponent。
from haystack import Document, Pipeline, super_component
from haystack.components.joiners import DocumentJoiner
from haystack.components.embedders import SentenceTransformersTextEmbedder
from haystack.components.retrievers import InMemoryBM25Retriever, InMemoryEmbeddingRetriever
from haystack.components.rankers import TransformersSimilarityRanker
from haystack.document_stores.in_memory import InMemoryDocumentStore
@super_component
class AdvancedHybridRetriever:
def __init__(
self,
document_store: InMemoryDocumentStore,
embedder_model: str = "BAAI/bge-small-en-v1.5",
ranker_model: str = "BAAI/bge-reranker-base",
):
# Create the components
embedding_retriever = InMemoryEmbeddingRetriever(document_store)
bm25_retriever = InMemoryBM25Retriever(document_store)
text_embedder = SentenceTransformersTextEmbedder(embedder_model)
document_joiner = DocumentJoiner()
ranker = TransformersSimilarityRanker(ranker_model)
# Create the pipeline
self.pipeline = Pipeline()
self.pipeline.add_component("text_embedder", text_embedder)
self.pipeline.add_component("embedding_retriever", embedding_retriever)
self.pipeline.add_component("bm25_retriever", bm25_retriever)
self.pipeline.add_component("document_joiner", document_joiner)
self.pipeline.add_component("ranker", ranker)
# Connect the components
self.pipeline.connect("text_embedder", "embedding_retriever")
self.pipeline.connect("bm25_retriever", "document_joiner")
self.pipeline.connect("embedding_retriever", "document_joiner")
self.pipeline.connect("document_joiner", "ranker")
# Define input and output mappings
self.input_mapping = {"query": ["text_embedder.text", "bm25_retriever.query", "ranker.query"]}
# Expose outputs from multiple components, including non-leaf components
self.output_mapping = {
"bm25_retriever.documents": "bm25_documents",
"embedding_retriever.documents": "embedding_documents",
"document_joiner.documents": "joined_documents",
"ranker.documents": "ranked_documents",
"text_embedder.embedding": "query_embedding",
}
# Create and run the AdvancedHybridRetriever
retriever = AdvancedHybridRetriever(document_store)
result = retriever.run(query=query)
# Print the results
print(f"BM25 documents: {len(result['bm25_documents'])}")
print(f"Embedding documents: {len(result['embedding_documents'])}")
print(f"Joined documents: {len(result['joined_documents'])}")
print(f"Ranked documents: {len(result['ranked_documents'])}")
print(f"Query embedding shape: {len(result['query_embedding'])}")
# Compare the top document from each stage
print("\nTop BM25 document:")
print(result["bm25_documents"][0].content[:200] + "...")
print(f"Score: {result['bm25_documents'][0].score:.4f}")
print("\nTop embedding document:")
print(result["embedding_documents"][0].content[:200] + "...")
print(f"Score: {result['embedding_documents'][0].score:.4f}")
print("\nTop ranked document:")
print(result["ranked_documents"][0].content[:200] + "...")
print(f"Score: {result['ranked_documents'][0].score:.4f}")
理解非叶子组件的输出
在此示例中,我们创建了一个 SuperComponent,它暴露了管道中多个组件的输出,包括非叶子组件
- BM25 文档:BM25 检索器检索到的文档
- 嵌入文档:嵌入检索器检索到的文档
- 已连接文档:连接两个检索器的结果后的文档
- 排序文档:重新排序后的文档
- 查询嵌入:查询的嵌入
这演示了 @super_component 装饰器如何允许您公开管道中任何组件的输出,而不仅仅是叶子组件。这对于调试、分析或当您想向用户提供更详细信息时特别有用。
Haystack 中现成的 SuperComponents
Haystack 提供了几个现成的 SuperComponents,您可以在应用程序中使用它们,例如
- MultiFileConverter:一个可以轻松将各种文件类型转换为文档的 SuperComponent。
- DocumentPreprocessor:一个结合了文档清理和拆分的 SuperComponent。
这些 SuperComponents 提供了一种便捷的方式来使用常见的管道,而无需从头开始构建它们。
结论
在本教程中,您已学会如何使用 @super_component 装饰器创建自定义 SuperComponents。您已了解如何
- 创建简单的 HybridRetriever SuperComponent
- 使用 ranker 和自定义输入映射对其进行增强
- 使用开箱即用的功能序列化和反序列化组件
- 创建暴露非叶子组件输出的 SuperComponent
SuperComponents 是将复杂管道封装到具有简化接口的可重用组件中的强大方法。它们可以轻松创建抽象底层管道细节的更高级组件。
如果您喜欢本教程,还有更多关于 Haystack 的内容可以学习。
要随时了解最新的 Haystack 发展,您可以 订阅我们的时事通讯 或 加入 Haystack discord 社区。
