📘 **TELUS Agriculture & Consumer Goods** 如何通过 **Haystack Agents** 转变促销交易

在流水线中的 Agent 上设置断点


本笔记本演示了如何在 Haystack Pipeline 中的 Agent 组件内设置断点。断点可以设置在 chat_generator 上,也可以设置在 Agent 使用的任何 tools 上。本指南将展示这两种方法。

该 Pipeline 包含一个充当数据库助手(负责提取相关信息并将其写入数据库)的 Agent

安装软件包

%%bash

pip install "haystack-ai>=2.16.1"
pip install "transformers[torch,sentencepiece]"
pip install "sentence-transformers>=3.0.0"

chat_generator 设置 OpenAI API 密钥

import os
from getpass import getpass

if "OPENAI_API_KEY" not in os.environ:
    os.environ["OPENAI_API_KEY"] = getpass("Enter OpenAI API key:")

初始化

现在,我们初始化构建代理 Pipeline 所需的组件。我们将设置:

  • 一个用于 Agent 的 chat_generator
  • 一个自定义 tool,用于将结构化信息写入 InMemoryDocumentStore
  • 一个使用这些组件从用户提供的上下文中提取和存储实体的 Agent
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.agents.agent import Agent
from haystack.components.generators.chat import OpenAIChatGenerator

from haystack.dataclasses import Document
from haystack.tools import tool
from typing import Optional

# Initialize a document store and a chat_generator
document_store = InMemoryDocumentStore()
chat_generator = OpenAIChatGenerator(
    model="gpt-4o-mini",
)

# Initialize a tool
@tool
def add_database_tool(name: str, surname: str, job_title: Optional[str], other: Optional[str]):
    document_store.write_documents(
        [Document(content=name + " " + surname + " " + (job_title or ""), meta={"other":other})]
    )

# Create the Agent
database_assistant = Agent(
        chat_generator=chat_generator,
        tools=[add_database_tool],
        system_prompt="""
        You are a database assistant.
        Your task is to extract the names of people mentioned in the given context and add them to a knowledge base, 
        along with additional relevant information about them that can be extracted from the context.
        Do not use your own knowledge, stay grounded to the given context.
        Do not ask the user for confirmation. Instead, automatically update the knowledge base and return a brief 
        summary of the people added, including the information stored for each.
        """,
        exit_conditions=["text"],
        max_agent_steps=100,
        raise_on_tool_invocation_failure=False
    )

初始化Pipeline

在此步骤中,我们将构建一个执行以下任务的 Haystack Pipeline:

  • 从指定 URL 获取 HTML 内容。
  • 将 HTML 转换为 Haystack Document 对象。
  • 从提取的内容构建一个 prompt
  • 将 prompt 传递给预定义的 Agent,由 Agent 处理上下文并将相关信息写入文档存储。
from haystack import Pipeline
from haystack.components.converters import HTMLToDocument
from haystack.components.fetchers import LinkContentFetcher
from haystack.components.builders import ChatPromptBuilder
from haystack.dataclasses import ChatMessage

pipeline_with_agent = Pipeline()
pipeline_with_agent.add_component("fetcher", LinkContentFetcher())
pipeline_with_agent.add_component("converter", HTMLToDocument())
pipeline_with_agent.add_component("builder", ChatPromptBuilder(
    template=[ChatMessage.from_user("""
    {% for doc in docs %}
    {{ doc.content|default|truncate(25000) }}
    {% endfor %}
    """)],
    required_variables=["docs"]
))
pipeline_with_agent.add_component("database_agent", database_assistant)

pipeline_with_agent.connect("fetcher.streams", "converter.sources")
pipeline_with_agent.connect("converter.documents", "builder.docs")
pipeline_with_agent.connect("builder", "database_agent")
<haystack.core.pipeline.pipeline.Pipeline object at 0x107b24da0>
🚅 Components
  - fetcher: LinkContentFetcher
  - converter: HTMLToDocument
  - builder: ChatPromptBuilder
  - database_agent: Agent
🛤️ Connections
  - fetcher.streams -> converter.sources (List[ByteStream])
  - converter.documents -> builder.docs (List[Document])
  - builder.prompt -> database_agent.messages (List[ChatMessage])

设置断点

在我们的 Pipeline 就绪后,我们现在可以为 Agent 配置断点。这允许我们在特定步骤(在本例中为 Agent 操作期间)暂停 Pipeline 执行,并将中间 Pipeline 快照保存到外部文件以供检查或调试。

我们将首先为 chat_generator 创建一个 Breakpoint,然后使用 AgentBreakpoint 进行包装,该类显式地针对 Pipeline 中的 Agent 组件。

snapshot_file_path 设置为指定要保存文件的位置。

from haystack.dataclasses.breakpoints import AgentBreakpoint, Breakpoint, ToolBreakpoint

agent_generator_breakpoint = Breakpoint(component_name="chat_generator", visit_count=0, snapshot_file_path="snapshots/")
agent_breakpoint = AgentBreakpoint(break_point=agent_generator_breakpoint, agent_name='database_agent')
pipeline_with_agent.run(
    data={"fetcher": {"urls": ["https://en.wikipedia.org/wiki/Deepset"]}},
    break_point=agent_breakpoint,
)
---------------------------------------------------------------------------

BreakpointException                       Traceback (most recent call last)

Cell In[3], line 5
      3 agent_generator_breakpoint = Breakpoint(component_name="chat_generator", visit_count=0, snapshot_file_path="snapshots/")
      4 agent_breakpoint = AgentBreakpoint(break_point=agent_generator_breakpoint, agent_name='database_agent')
----> 5 pipeline_with_agent.run(
      6     data={"fetcher": {"urls": ["https://en.wikipedia.org/wiki/Deepset"]}},
      7     break_point=agent_breakpoint,
      8 )


File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/core/pipeline/pipeline.py:382, in Pipeline.run(self, data, include_outputs_from, break_point, pipeline_snapshot)
    377         if should_trigger_breakpoint:
    378             _trigger_break_point(
    379                 pipeline_snapshot=new_pipeline_snapshot, pipeline_outputs=pipeline_outputs
    380             )
--> 382 component_outputs = self._run_component(
    383     component_name=component_name,
    384     component=component,
    385     inputs=component_inputs,  # the inputs to the current component
    386     component_visits=component_visits,
    387     parent_span=span,
    388 )
    390 # Updates global input state with component outputs and returns outputs that should go to
    391 # pipeline outputs.
    392 component_pipeline_outputs = self._write_component_outputs(
    393     component_name=component_name,
    394     component_outputs=component_outputs,
   (...)
    397     include_outputs_from=include_outputs_from,
    398 )


File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/core/pipeline/pipeline.py:75, in Pipeline._run_component(component_name, component, inputs, component_visits, parent_span)
     70     component_output = instance.run(**inputs)
     71 except BreakpointException as error:
     72     # Re-raise BreakpointException to preserve the original exception context
     73     # This is important when Agent components internally use Pipeline._run_component
     74     # and trigger breakpoints that need to bubble up to the main pipeline
---> 75     raise error
     76 except Exception as error:
     77     raise PipelineRuntimeError.from_exception(component_name, instance.__class__, error) from error


File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/core/pipeline/pipeline.py:70, in Pipeline._run_component(component_name, component, inputs, component_visits, parent_span)
     67 logger.info("Running component {component_name}", component_name=component_name)
     69 try:
---> 70     component_output = instance.run(**inputs)
     71 except BreakpointException as error:
     72     # Re-raise BreakpointException to preserve the original exception context
     73     # This is important when Agent components internally use Pipeline._run_component
     74     # and trigger breakpoints that need to bubble up to the main pipeline
     75     raise error


File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/components/agents/agent.py:350, in Agent.run(self, messages, streaming_callback, break_point, snapshot, **kwargs)
    337 if (
    338     break_point
    339     and break_point.break_point.component_name == "chat_generator"
    340     and component_visits["chat_generator"] == break_point.break_point.visit_count
    341 ):
    342     agent_snapshot = _create_agent_snapshot(
    343         component_visits=component_visits,
    344         agent_breakpoint=break_point,
   (...)
    348         },
    349     )
--> 350     _check_chat_generator_breakpoint(agent_snapshot=agent_snapshot, parent_snapshot=parent_snapshot)
    352 # 1. Call the ChatGenerator
    353 # We skip the chat generator when restarting from a snapshot where we restart at the ToolInvoker.
    354 if skip_chat_generator:


File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/core/pipeline/breakpoint.py:377, in _check_chat_generator_breakpoint(agent_snapshot, parent_snapshot)
    372 msg = (
    373     f"Breaking at {break_point.component_name} visit count "
    374     f"{agent_snapshot.component_visits[break_point.component_name]}"
    375 )
    376 logger.info(msg)
--> 377 raise BreakpointException(
    378     message=msg,
    379     component=break_point.component_name,
    380     inputs=agent_snapshot.component_inputs,
    381     results=agent_snapshot.component_inputs["tool_invoker"]["serialized_data"]["state"],
    382 )


BreakpointException: Breaking at chat_generator visit count 0

这将生成一个 JSON 文件,其名称由断点关联的 Agent 和组件命名,保存在“snapshots”目录中,其中包含 Agent 正在运行的 Pipeline 的快照以及断点时 Agent 状态的快照。

!ls snapshots/database_agent_chat*
snapshots/database_agent_chat_generator_2025_07_26_12_22_11.json

我们也可以为 Agent 使用的 tool 设置断点。这允许我们在 tool_invoker 调用 tool 的点中断 Pipeline 执行。

为此,我们将初始化一个 ToolBreakpoint,其中包含目标工具的名称,用 AgentBreakpoint 进行包装,然后运行带有已配置断点的 Pipeline。

agent_tool_breakpoint = ToolBreakpoint(component_name="tool_invoker", visit_count=0, tool_name="add_database_tool", snapshot_file_path="snapshots")
agent_breakpoint = AgentBreakpoint(break_point=agent_tool_breakpoint, agent_name = 'database_agent')

pipeline_with_agent.run(
    data={"fetcher": {"urls": ["https://en.wikipedia.org/wiki/Deepset"]}},
    break_point=agent_breakpoint,
)
---------------------------------------------------------------------------

BreakpointException                       Traceback (most recent call last)

Cell In[6], line 4
      1 agent_tool_breakpoint = ToolBreakpoint(component_name="tool_invoker", visit_count=0, tool_name="add_database_tool", snapshot_file_path="snapshots")
      2 agent_breakpoint = AgentBreakpoint(break_point=agent_tool_breakpoint, agent_name = 'database_agent')
----> 4 pipeline_with_agent.run(
      5     data={"fetcher": {"urls": ["https://en.wikipedia.org/wiki/Deepset"]}},
      6     break_point=agent_breakpoint,
      7 )


File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/core/pipeline/pipeline.py:382, in Pipeline.run(self, data, include_outputs_from, break_point, pipeline_snapshot)
    377         if should_trigger_breakpoint:
    378             _trigger_break_point(
    379                 pipeline_snapshot=new_pipeline_snapshot, pipeline_outputs=pipeline_outputs
    380             )
--> 382 component_outputs = self._run_component(
    383     component_name=component_name,
    384     component=component,
    385     inputs=component_inputs,  # the inputs to the current component
    386     component_visits=component_visits,
    387     parent_span=span,
    388 )
    390 # Updates global input state with component outputs and returns outputs that should go to
    391 # pipeline outputs.
    392 component_pipeline_outputs = self._write_component_outputs(
    393     component_name=component_name,
    394     component_outputs=component_outputs,
   (...)
    397     include_outputs_from=include_outputs_from,
    398 )


File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/core/pipeline/pipeline.py:75, in Pipeline._run_component(component_name, component, inputs, component_visits, parent_span)
     70     component_output = instance.run(**inputs)
     71 except BreakpointException as error:
     72     # Re-raise BreakpointException to preserve the original exception context
     73     # This is important when Agent components internally use Pipeline._run_component
     74     # and trigger breakpoints that need to bubble up to the main pipeline
---> 75     raise error
     76 except Exception as error:
     77     raise PipelineRuntimeError.from_exception(component_name, instance.__class__, error) from error


File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/core/pipeline/pipeline.py:70, in Pipeline._run_component(component_name, component, inputs, component_visits, parent_span)
     67 logger.info("Running component {component_name}", component_name=component_name)
     69 try:
---> 70     component_output = instance.run(**inputs)
     71 except BreakpointException as error:
     72     # Re-raise BreakpointException to preserve the original exception context
     73     # This is important when Agent components internally use Pipeline._run_component
     74     # and trigger breakpoints that need to bubble up to the main pipeline
     75     raise error


File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/components/agents/agent.py:392, in Agent.run(self, messages, streaming_callback, break_point, snapshot, **kwargs)
    375 if (
    376     break_point
    377     and break_point.break_point.component_name == "tool_invoker"
    378     and break_point.break_point.visit_count == component_visits["tool_invoker"]
    379 ):
    380     agent_snapshot = _create_agent_snapshot(
    381         component_visits=component_visits,
    382         agent_breakpoint=break_point,
   (...)
    390         },
    391     )
--> 392     _check_tool_invoker_breakpoint(
    393         llm_messages=llm_messages, agent_snapshot=agent_snapshot, parent_snapshot=parent_snapshot
    394     )
    396 # 3. Call the ToolInvoker
    397 # We only send the messages from the LLM to the tool invoker
    398 tool_invoker_result = Pipeline._run_component(
    399     component_name="tool_invoker",
    400     component={"instance": self._tool_invoker},
   (...)
    403     parent_span=span,
    404 )


File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/core/pipeline/breakpoint.py:437, in _check_tool_invoker_breakpoint(llm_messages, agent_snapshot, parent_snapshot)
    434     msg += f" for tool {tool_breakpoint.tool_name}"
    435 logger.info(msg)
--> 437 raise BreakpointException(
    438     message=msg,
    439     component=tool_breakpoint.component_name,
    440     inputs=agent_snapshot.component_inputs,
    441     results=agent_snapshot.component_inputs["tool_invoker"]["serialized_data"]["state"],
    442 )


BreakpointException: Breaking at tool_invoker visit count 0 for tool add_database_tool

同样,这也会在“snapshots”目录中生成一个 JSON 文件,其名称由 Agent 的名称和处理 Agent 使用的工具的“tool_invoker”组件命名。

!ls snapshots/database_agent_tool_invoker*
snapshots/database_agent_tool_invoker_2025_07_26_12_43_03.json

从断点恢复

出于调试目的,可以检查和编辑快照文件,然后将其注入 Pipeline 并从触发断点的点恢复执行。

一旦 Pipeline 执行被中断,我们就可以从该保存状态恢复 pipeline_with_agent

要做到这一点:

  • 使用 load_state() 从磁盘加载保存的 Pipeline 状态。此函数将存储的 JSON 文件转换回表示中间状态的 Python 字典。
  • 将此状态作为参数传递给 Pipeline.run() 方法。

Pipeline 将从中断处恢复执行,并继续直到完成。

from haystack.core.pipeline.breakpoint import load_pipeline_snapshot

# resume the pipeline from the saved state
snapshot = load_pipeline_snapshot("snapshots/database_agent_chat_generator_2025_07_26_12_22_11.json")

result = pipeline_with_agent.run(
    data={},
    pipeline_snapshot=snapshot
)
print(result['database_agent']['last_message'].text)
The following individuals have been added to the knowledge base along with their relevant information:

1. **Milos Rusic**
   - **Job Title:** Co-Founder
   - **Other:** Co-founded deepset in 2018 in Berlin.

2. **Malte Pietsch**
   - **Job Title:** Co-Founder
   - **Other:** Co-founded deepset in 2018 in Berlin.

3. **Timo Möller**
   - **Job Title:** Co-Founder
   - **Other:** Co-founded deepset in 2018 in Berlin.

4. **Alex Ratner**
   - **Job Title:** Founder
   - **Other:** Snorkel AI.

5. **Mustafa Suleyman**
   - **Job Title:** Co-Founder
   - **Other:** Deepmind.

6. **Spencer Kimball**
   - **Job Title:** Co-Founder
   - **Other:** Cockroach Labs.

7. **Jeff Hammerbacher**
   - **Job Title:** Co-Founder
   - **Other:** Cloudera.

8. **Emil Eifrem**
   - **Job Title:** Founder
   - **Other:** Neo4j. 

This information emphasizes their roles in the establishment and growth of deepset as well as their affiliations with other notable companies in the tech industry.