📘 **TELUS Agriculture & Consumer Goods** 如何通过 **Haystack Agents** 转变促销交易

带有断点的混合 RAG 流水线


此笔记本演示了如何在 Haystack 管道中设置断点。在这种情况下,我们将在一个混合检索增强生成 (RAG) 管道中设置断点。该管道结合了 BM25 和基于嵌入的检索方法,然后使用基于 Transformer 的重排序器和 LLM 来生成答案。

安装包

%%bash

pip install haystack-ai>=2.16.0
pip install "transformers[torch,sentencepiece]"
pip install "sentence-transformers>=3.0.0"

设置 OpenAI API 密钥

import os
from getpass import getpass

if "OPENAI_API_KEY" not in os.environ:
    os.environ["OPENAI_API_KEY"] = getpass("Enter OpenAI API key:")

导入所需库

首先,让我们从 Haystack 导入所有必需的组件。

from haystack import Document, Pipeline
from haystack.components.builders import AnswerBuilder, ChatPromptBuilder
from haystack.components.embedders import SentenceTransformersDocumentEmbedder, SentenceTransformersTextEmbedder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.joiners import DocumentJoiner
from haystack.components.rankers import TransformersSimilarityRanker
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever, InMemoryEmbeddingRetriever
from haystack.components.writers import DocumentWriter
from haystack.dataclasses import ChatMessage
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.document_stores.types import DuplicatePolicy

文档存储初始化

让我们创建一个带有示例文档及其嵌入的简单文档存储。

def indexing():
    """
    Indexing documents in a DocumentStore.
    """

    print("Indexing documents...")

    # Create sample documents
    documents = [
        Document(content="My name is Jean and I live in Paris. The weather today is 25°C."),
        Document(content="My name is Mark and I live in Berlin. The weather today is 15°C."),
        Document(content="My name is Giorgio and I live in Rome. The weather today is 30°C."),
    ]

    # Initialize document store and components
    document_store = InMemoryDocumentStore()
    doc_writer = DocumentWriter(document_store=document_store, policy=DuplicatePolicy.SKIP)
    doc_embedder = SentenceTransformersDocumentEmbedder(model="intfloat/e5-base-v2", progress_bar=False)

    # Build and run the ingestion pipeline
    ingestion_pipe = Pipeline()
    ingestion_pipe.add_component(instance=doc_embedder, name="doc_embedder")
    ingestion_pipe.add_component(instance=doc_writer, name="doc_writer")

    ingestion_pipe.connect("doc_embedder.documents", "doc_writer.documents")
    ingestion_pipe.run({"doc_embedder": {"documents": documents}})

    return document_store

一个混合检索管道

现在让我们构建一个混合 RAG 管道。

def hybrid_retrieval(doc_store):
    """
    A simple pipeline for hybrid retrieval using BM25 and embeddings.
    """

    # Initialize query embedder
    query_embedder = SentenceTransformersTextEmbedder(model="intfloat/e5-base-v2", progress_bar=False)

    # Define the prompt template for the LLM
    template = [
        ChatMessage.from_system(
            "You are a helpful AI assistant. Answer the following question based on the given context information only. If the context is empty or just a '\n' answer with None, example: 'None'."
        ),
        ChatMessage.from_user(
            """
            Context:
            {% for document in documents %}
                {{ document.content }}
            {% endfor %}
    
            Question: {{question}}
            """
        )
    ]

    
    # Build the RAG pipeline
    rag_pipeline = Pipeline()
    
    # Add components to the pipeline
    rag_pipeline.add_component(instance=InMemoryBM25Retriever(document_store=doc_store), name="bm25_retriever")
    rag_pipeline.add_component(instance=query_embedder, name="query_embedder")
    rag_pipeline.add_component(instance=InMemoryEmbeddingRetriever(document_store=doc_store), name="embedding_retriever")
    rag_pipeline.add_component(instance=DocumentJoiner(sort_by_score=False), name="doc_joiner")
    rag_pipeline.add_component(instance=TransformersSimilarityRanker(model="intfloat/simlm-msmarco-reranker", top_k=5), name="ranker")    
    rag_pipeline.add_component(instance=ChatPromptBuilder(template=template, required_variables=["question", "documents"]), name="prompt_builder", )    
    rag_pipeline.add_component(instance=OpenAIChatGenerator(), name="llm")
    rag_pipeline.add_component(instance=AnswerBuilder(), name="answer_builder")

    # Connect the components
    rag_pipeline.connect("query_embedder", "embedding_retriever.query_embedding")
    rag_pipeline.connect("embedding_retriever", "doc_joiner.documents")
    rag_pipeline.connect("bm25_retriever", "doc_joiner.documents")
    rag_pipeline.connect("doc_joiner", "ranker.documents")
    rag_pipeline.connect("ranker", "prompt_builder.documents")
    rag_pipeline.connect("prompt_builder", "llm")
    rag_pipeline.connect("llm.replies", "answer_builder.replies")    
    rag_pipeline.connect("doc_joiner", "answer_builder.documents")

    return rag_pipeline

运行带断点的管道

现在我们演示如何在一个 Haystack 管道中设置断点,以检查和调试管道在特定点的执行。断点允许您暂停执行、保存管道的当前状态,并在以后从中断处继续。

我们将使用在 query_embedder 组件处设置的断点来运行管道。这将保存管道状态,然后执行 query_embedder 并引发 PipelineBreakpointException 来停止执行。

from haystack.dataclasses.breakpoints import Breakpoint

break_point = Breakpoint(component_name="query_embedder", visit_count=0, snapshot_file_path="snapshots/")
# Initialize document store and pipeline
doc_store = indexing()
pipeline = hybrid_retrieval(doc_store)

# Define the query
question = "Where does Mark live?"
data = {
    "query_embedder": {"text": question},
    "bm25_retriever": {"query": question},
    "ranker": {"query": question, "top_k": 10},
    "prompt_builder": {"question": question},
    "answer_builder": {"query": question},
}

pipeline.run(data, break_point=break_point)
Indexing documents...


TransformersSimilarityRanker is considered legacy and will no longer receive updates. It may be deprecated in a future release, with removal following after a deprecation period. Consider using SentenceTransformersSimilarityRanker instead, which provides the same functionality along with additional features.



---------------------------------------------------------------------------

BreakpointException                       Traceback (most recent call last)

Cell In[6], line 15
      6 question = "Where does Mark live?"
      7 data = {
      8     "query_embedder": {"text": question},
      9     "bm25_retriever": {"query": question},
   (...)
     12     "answer_builder": {"query": question},
     13 }
---> 15 pipeline.run(data, break_point=break_point)


File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/core/pipeline/pipeline.py:378, in Pipeline.run(self, data, include_outputs_from, break_point, pipeline_snapshot)
    376         # trigger the breakpoint if needed
    377         if should_trigger_breakpoint:
--> 378             _trigger_break_point(
    379                 pipeline_snapshot=new_pipeline_snapshot, pipeline_outputs=pipeline_outputs
    380             )
    382 component_outputs = self._run_component(
    383     component_name=component_name,
    384     component=component,
   (...)
    387     parent_span=span,
    388 )
    390 # Updates global input state with component outputs and returns outputs that should go to
    391 # pipeline outputs.


File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/core/pipeline/breakpoint.py:299, in _trigger_break_point(pipeline_snapshot, pipeline_outputs)
    297 component_visits = pipeline_snapshot.pipeline_state.component_visits
    298 msg = f"Breaking at component {component_name} at visit count {component_visits[component_name]}"
--> 299 raise BreakpointException(
    300     message=msg, component=component_name, inputs=pipeline_snapshot.pipeline_state.inputs, results=pipeline_outputs
    301 )


BreakpointException: Breaking at component query_embedder at visit count 0

此运行应被 BreakpointException: Breaking at component query_embedder visit count 0 中断,这将生成一个 JSON 文件在“snapshots”目录中,其中包含运行 query_embedder 组件之前的快照。

快照文件(以断点关联的组件命名)可以被检查和编辑,然后注入到管道中并从触发断点的地方恢复执行。

!ls snapshots/

从断点恢复

然后,我们可以通过将保存的 pipeline_snapshot 传递给 Pipeline.run() 方法来从其恢复管道。这将运行管道直到结束。

 # Load the pipeline_snapshot and continue execution
from haystack.core.pipeline.breakpoint import load_pipeline_snapshot

snapshot = load_pipeline_snapshot("snapshots/query_embedder_2025_07_26_12_58_26.json")
result = pipeline.run(data={}, pipeline_snapshot=snapshot)
    
# Print the results
print(result['answer_builder']['answers'][0].data)
print(result['answer_builder']['answers'][0].meta)
Mark lives in Berlin.
{'model': 'gpt-4o-mini-2024-07-18', 'index': 0, 'finish_reason': 'stop', 'usage': {'completion_tokens': 5, 'prompt_tokens': 124, 'total_tokens': 129, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'all_messages': [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=[TextContent(text='Mark lives in Berlin.')], _name=None, _meta={'model': 'gpt-4o-mini-2024-07-18', 'index': 0, 'finish_reason': 'stop', 'usage': {'completion_tokens': 5, 'prompt_tokens': 124, 'total_tokens': 129, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}})]}

管道断点的进阶用例

以下是管道断点可能特别有价值的一些高级场景

  1. 在 LLM 处设置断点,尝试不同提示的结果并实时迭代。

  2. 在文档检索器之后放置一个断点,以检查和修改检索到的文档。

  3. 在组件之前设置断点以注入黄金标准输入,并隔离问题是源于输入质量还是下游逻辑。

为了演示第 1 点中提到的用例,我们重用了相同的查询管道和新问题。首先,我们使用最初传递给 prompt_builder 的提示来运行管道。然后,我们在 prompt_builder 处定义一个断点来尝试一个替代提示。这使我们无需再次运行整个管道即可比较不同提示生成的结果。

# Initialize document store and pipeline
doc_store = indexing()
pipeline = hybrid_retrieval(doc_store)

# Define the query
question = "What's the temperature difference between the warmest and coldest city?"
data = {
    "query_embedder": {"text": question},
    "bm25_retriever": {"query": question},
    "ranker": {"query": question, "top_k": 10},
    "prompt_builder": {"question": question},
    "answer_builder": {"query": question},
}


break_point = Breakpoint(component_name="prompt_builder", visit_count=0, snapshot_file_path="snapshots/")

pipeline.run(data, break_point=break_point)
TransformersSimilarityRanker is considered legacy and will no longer receive updates. It may be deprecated in a future release, with removal following after a deprecation period. Consider using SentenceTransformersSimilarityRanker instead, which provides the same functionality along with additional features.


Indexing documents...



---------------------------------------------------------------------------

BreakpointException                       Traceback (most recent call last)

Cell In[11], line 18
      7 data = {
      8     "query_embedder": {"text": question},
      9     "bm25_retriever": {"query": question},
   (...)
     12     "answer_builder": {"query": question},
     13 }
     16 break_point = Breakpoint(component_name="prompt_builder", visit_count=0, snapshot_file_path="snapshots/")
---> 18 pipeline.run(data, break_point=break_point)


File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/core/pipeline/pipeline.py:378, in Pipeline.run(self, data, include_outputs_from, break_point, pipeline_snapshot)
    376         # trigger the breakpoint if needed
    377         if should_trigger_breakpoint:
--> 378             _trigger_break_point(
    379                 pipeline_snapshot=new_pipeline_snapshot, pipeline_outputs=pipeline_outputs
    380             )
    382 component_outputs = self._run_component(
    383     component_name=component_name,
    384     component=component,
   (...)
    387     parent_span=span,
    388 )
    390 # Updates global input state with component outputs and returns outputs that should go to
    391 # pipeline outputs.


File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/core/pipeline/breakpoint.py:299, in _trigger_break_point(pipeline_snapshot, pipeline_outputs)
    297 component_visits = pipeline_snapshot.pipeline_state.component_visits
    298 msg = f"Breaking at component {component_name} at visit count {component_visits[component_name]}"
--> 299 raise BreakpointException(
    300     message=msg, component=component_name, inputs=pipeline_snapshot.pipeline_state.inputs, results=pipeline_outputs
    301 )


BreakpointException: Breaking at component prompt_builder at visit count 0

现在我们可以手动在 prompt_builder 中插入不同的模板并检查结果。为此,我们在状态文件中的 prompt_builder 组件内更新模板输入。

template = ChatMessage.from_system(
    """You are a mathematical analysis assistant. Follow these steps:
    1. Identify all temperatures mentioned
    2. Find the maximum and minimum values
    3. Calculate their difference
    4. Format response as: 'The temperature difference is X°C (max Y°C in [city] - min Z°C in [city])'
    Use ONLY the information provided in the context."""
)

现在我们只需加载快照文件并使用更新后的快照恢复管道。

!ls snapshots/prompt_builder*
snapshots/prompt_builder_2025_07_26_13_01_23.json


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
To disable this warning, you can either:
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
snapshot = load_pipeline_snapshot("snapshots/prompt_builder_2025_07_26_13_01_23.json")
result = pipeline.run(data={}, pipeline_snapshot=snapshot)
print(result['answer_builder']['answers'][0].data)
The temperature in Rome is 30°C and in Berlin is 15°C. The temperature difference between the warmest (Rome) and the coldest (Berlin) city is 30°C - 15°C = 15°C.