📘 **TELUS Agriculture & Consumer Goods** 如何通过 **Haystack Agents** 转变促销交易

构建 GitHub Issue 解决者 Agent


在本教程中,我们将使用Anthropic Claude 4 Sonnet创建一个 GitHub Issue 解决代理。给定一个 issue URL,该代理将:

  • 抓取并解析 issue 描述和评论
  • 识别相关的仓库、目录和文件
  • 检索和处理文件内容
  • 确定解决步骤,并将它们作为评论发布

为此,我们将使用新的 Agent 组件。Agent 是一个 Haystack 组件,它实现了支持提供商无关的聊天模型的工具调用功能。我们可以将 Agent 用作独立组件,也可以在管道中使用它。

这是我们的GitHub Issue 解决管道的外观

issue_resolver_pipeline.png

安装依赖项

!pip install anthropic-haystack github-haystack -q
import os
from getpass import getpass
from typing import List

from haystack import logging, Document,Pipeline
from haystack.components.agents import Agent
from haystack.components.builders import ChatPromptBuilder
from haystack.dataclasses import ChatMessage
from haystack.tools.from_function import tool
from haystack_integrations.components.generators.anthropic.chat.chat_generator import AnthropicChatGenerator
logger = logging.getLogger(__name__)

初始化一个 GitHubIssueViewer 组件

GitHubIssueViewer 组件接收一个 GitHub issue URL 并返回一个 Haystack 文档列表。第一个文档包含主要的 issue 内容,而后续文档包含 issue 评论。

from haystack_integrations.components.connectors.github import GitHubIssueViewer
issue_viewer = GitHubIssueViewer()
issue_viewer.run(url="https://github.com/deepset-ai/haystack/issues/8903")["documents"]
[Document(id=401aeab38ff82756caddcf20be6191917e0a8d262347f4acc2adb24869c842e9, content: '**Is your feature request related to a problem? Please describe.**
 Most of our components require so...', meta: {'type': 'issue', 'title': 'Proposal to make input variables to `PromptBuilder` and `ChatPromptBuilder` required by default', 'number': 8903, 'state': 'closed', 'created_at': '2025-02-21T14:03:22Z', 'updated_at': '2025-03-21T14:53:27Z', 'author': 'sjrl', 'url': 'https://github.com/deepset-ai/haystack/issues/8903'}),
 Document(id=463748463715f2c4f988273caf73d5006e5a95beeecd04c91a142fa93ce78354, content: 'Old related issue: https://github.com/deepset-ai/haystack/issues/7441', meta: {'type': 'comment', 'issue_number': 8903, 'created_at': '2025-02-21T14:07:54Z', 'updated_at': '2025-02-21T14:07:54Z', 'author': 'anakin87', 'url': 'https://github.com/deepset-ai/haystack/issues/8903#issuecomment-2674648879'}),
 Document(id=d7eb9351f9c74a0d8eac616bfc92f97e06bdb6276c54b3e6ec437e3fc7378cb2, content: '@sjrl with the new run-logic released in 2.10 the component will not always trigger anymore. It need...', meta: {'type': 'comment', 'issue_number': 8903, 'created_at': '2025-02-21T21:32:12Z', 'updated_at': '2025-02-21T21:32:12Z', 'author': 'mathislucka', 'url': 'https://github.com/deepset-ai/haystack/issues/8903#issuecomment-2675585679'}),
 Document(id=1c5ea7c3f07f3db061bf2169b11d59ea675bd8269b7168598e88f5a072ed3a5e, content: '@mathislucka thanks for the additional info. I'll need to talk with @ju-gu again about how exactly h...', meta: {'type': 'comment', 'issue_number': 8903, 'created_at': '2025-02-24T07:02:02Z', 'updated_at': '2025-02-24T07:02:02Z', 'author': 'sjrl', 'url': 'https://github.com/deepset-ai/haystack/issues/8903#issuecomment-2677577510'}),
 Document(id=3994326a4e33ee897938e8cff215d6e407d63d9b800fa088df27aec2cb24ad03, content: '> PromptBuilder with documents (pipeline provided) and query (user provided) will trigger even if it...', meta: {'type': 'comment', 'issue_number': 8903, 'created_at': '2025-02-25T14:55:26Z', 'updated_at': '2025-02-25T14:55:26Z', 'author': 'ju-gu', 'url': 'https://github.com/deepset-ai/haystack/issues/8903#issuecomment-2682266205'}),
 Document(id=630987248d564b2a538b56a1bc2ada63f66d4d53031708b9c004bfc9e1bf9346, content: '> I think this can still cause problems as it can run before the correct input is created inside the...', meta: {'type': 'comment', 'issue_number': 8903, 'created_at': '2025-02-26T08:01:12Z', 'updated_at': '2025-02-26T08:01:12Z', 'author': 'mathislucka', 'url': 'https://github.com/deepset-ai/haystack/issues/8903#issuecomment-2684214013'}),
 Document(id=622bc0e5219da00bfcb908519611a649f7eaa28eebedb3c30ad90d66cc0191ab, content: '> for the PromptBuilder, and ChatPromptBuilder we set all Jinja2 variables as optional by default.
 
 ...', meta: {'type': 'comment', 'issue_number': 8903, 'created_at': '2025-03-11T10:11:54Z', 'updated_at': '2025-03-11T10:12:29Z', 'author': 'LastRemote', 'url': 'https://github.com/deepset-ai/haystack/issues/8903#issuecomment-2713495950'}),
 Document(id=22d69cc52cb789306cf54f56a568d1526cdfe766d42818d18f6fc7ec2f9163ad, content: '@sjrl and I decided that we don't want to make breaking changes to the current behavior of the `Prom...', meta: {'type': 'comment', 'issue_number': 8903, 'created_at': '2025-03-21T14:53:24Z', 'updated_at': '2025-03-21T14:53:24Z', 'author': 'julian-risch', 'url': 'https://github.com/deepset-ai/haystack/issues/8903#issuecomment-2743603353'})]

初始化一个 GitHubRepoViewer 工具

此工具根据给定的 repopath 从 GitHub 仓库检索内容

  • 如果 path 指向一个目录,它将返回一个文档列表(每个项目一个),其中每个文档包含项目的名称(文件或目录),以及完整的路径和 Document.meta 中的元数据。
  • 如果 path 指向一个文件,它将返回一个包含文件内容的单个文档,其中包含完整的路径和 Document.meta 中的元数据。
  • 如果发生错误,它将返回一个包含错误消息的单个文档,并且 Document.meta 包括 type="error"
from haystack_integrations.tools.github import GitHubRepoViewerTool
github_repo_viewer_tool = GitHubRepoViewerTool()

通过函数模拟 GitHubIssueCommenter 工具

使用 @tool 装饰器,我们可以轻松地将函数转换为工具,并使用其文档字符串作为描述。

现在,让我们创建一个工具(一个模拟器),允许代理在 GitHub issue 上编写评论。此工具还将作为代理的退出条件,指示其任务何时完成。

如果您愿意,以后可以用 GitHubIssueCommenterTool 替换此模拟器。您需要一个 GitHub 个人访问令牌才能启用在 GitHub 上进行评论。有关详细信息,请参阅 GitHub-Haystack 集成页面

@tool
def write_github_comment(comment: str) -> str:
    """
    Use this to create a comment on GitHub once you finished your exploration.
    """
    return comment

使用工具创建“Issue 解决代理”

要初始化代理,我们需要:

  1. 工具列表 (✅)
  2. 聊天生成器
  3. 系统提示

我们将首先创建 ChatGenerator。在此示例中,我们将使用 AnthropicChatGeneratorclaude-sonnet-4-20250514 模型。

os.environ["ANTHROPIC_API_KEY"] = getpass("Anthropic Key: ")
Anthropic Key: ··········
chat_generator = AnthropicChatGenerator(model="claude-sonnet-4-20250514", generation_kwargs={"max_tokens": 8000})

在此示例中,我们将使用预定义的系统提示,指导代理分析 GitHub issue,探索仓库中的相关文件,并生成包含解决步骤的详细评论。当然,您也可以使用自己的自定义提示。

from haystack_integrations.prompts.github import SYSTEM_PROMPT
print(SYSTEM_PROMPT[:100]+"...")
The assistant is Haystack-Agent, created by deepset.
Haystack-Agent helps developers to develop soft...

最后,我们使用 chat_generatorSYSTEM_PROMPTtools 创建代理。我们将 exit_conditions=["write_github_comment"] 设置为确保代理在 write_github_comment 工具使用后停止。对于 state_schema,我们定义了 {"documents": {"type": List[Document]}},允许代理累积从工具(如 github_repo_viewer_tool)检索的文档。

issue_resolver_agent = Agent(
    chat_generator=chat_generator,
    system_prompt=SYSTEM_PROMPT,
    tools=[github_repo_viewer_tool, write_github_comment],
    exit_conditions=["write_github_comment"],
    state_schema={"documents": {"type": List[Document]}},
)

💡 提示:您可以将内置的 print_streaming_chunk 或您自定义的函数传递给 Agent,以启用流式传输并实时查看工具调用和结果。

from haystack.components.generators.utils import print_streaming_chunk

issue_resolver_agent = Agent(
    chat_generator=chat_generator,
    system_prompt=SYSTEM_PROMPT,
    tools=[github_repo_viewer_tool, write_github_comment],
    exit_conditions=["write_github_comment"],
    state_schema={"documents": {"type": List[Document]}},
    streaming_callback=print_streaming_chunk
)

构建 Issue 解决管道

有了所有组件,我们现在可以组装 issue 解决管道。

issue_viewer = GitHubIssueViewer()
issue_template = """
Issue from: {{ url }}
{% for document in documents %}
{% if loop.index == 1 %}
**Title: {{ document.meta.title }}**
{% endif %}
<issue-comment>
{{document.content}}
</issue-comment>
{% endfor %}
"""

issue_builder = ChatPromptBuilder(template=[ChatMessage.from_user(issue_template)], required_variables="*")

issue_resolver = Pipeline()
issue_resolver.add_component("issue_viewer", issue_viewer)
issue_resolver.add_component("issue_builder", issue_builder)
issue_resolver.add_component("issue_resolver_agent", issue_resolver_agent)

issue_resolver.connect("issue_viewer.documents", "issue_builder.documents")
issue_resolver.connect("issue_builder.prompt", "issue_resolver_agent.messages")
<haystack.core.pipeline.pipeline.Pipeline object at 0x7c205115ab90>
🚅 Components
  - issue_viewer: GitHubIssueViewer
  - issue_builder: ChatPromptBuilder
  - issue_resolver_agent: Agent
🛤️ Connections
  - issue_viewer.documents -> issue_builder.documents (List[Document])
  - issue_builder.prompt -> issue_resolver_agent.messages (List[ChatMessage])

让我们来尝试我们的管道

现在,让我们使用 issue URL 运行管道,看看代理是如何工作的。

issue_url = "https://github.com/deepset-ai/haystack-core-integrations/issues/1819"
result = issue_resolver.run({"url": issue_url})

让我们看看我们管道生成的评论,以解决给定的 issue。

print(result["issue_resolver_agent"]["last_message"].tool_call_result.result)
# Implementation: Adding `component_info` Support to `AmazonBedrockChatGenerator`

I've analyzed the codebase and the PR mentioned in the issue. Here's my proposed implementation to add `component_info` support to `AmazonBedrockChatGenerator`, following the same pattern used in the main Haystack repository.

## Changes Required

### 1. Update Imports in `utils.py`

The `ComponentInfo` class needs to be imported:

```python
from haystack.dataclasses import (
    AsyncStreamingCallbackT,
    ChatMessage,
    ChatRole,
    ComponentInfo,  # Add this import
    StreamingChunk,
    SyncStreamingCallbackT,
    ToolCall,
)
```

### 2. Update `_convert_event_to_streaming_chunk` Function

Modify the function signature to accept an optional `component_info` parameter and include it in all `StreamingChunk` creations:

```python
def _convert_event_to_streaming_chunk(
    event: Dict[str, Any], 
    model: str, 
    component_info: Optional[ComponentInfo] = None
) -> StreamingChunk:
    """
    Convert a Bedrock streaming event to a Haystack StreamingChunk.

    Handles different event types (contentBlockStart, contentBlockDelta, messageStop, metadata) and extracts relevant
    information to create StreamingChunk objects in the same format used by Haystack's OpenAIChatGenerator.

    :param event: Dictionary containing a Bedrock streaming event.
    :param model: The model ID used for generation, included in chunk metadata.
    :param component_info: An optional `ComponentInfo` object containing information about the component that
        generated the chunk, such as the component name and type.
    :returns: StreamingChunk object containing the content and metadata extracted from the event.
    """
    # Initialize an empty StreamingChunk to return if no relevant event is found
    # (e.g. for messageStart and contentBlockStop)
    streaming_chunk = StreamingChunk(
        content="", 
        component_info=component_info,  # Add component_info here
        meta={"model": model, "received_at": datetime.now(timezone.utc).isoformat()}
    )

    if "contentBlockStart" in event:
        # contentBlockStart always has the key "contentBlockIndex"
        block_start = event["contentBlockStart"]
        block_idx = block_start["contentBlockIndex"]
        if "start" in block_start and "toolUse" in block_start["start"]:
            tool_start = block_start["start"]["toolUse"]
            streaming_chunk = StreamingChunk(
                content="",
                component_info=component_info,  # Add component_info here
                meta={
                    "model": model,
                    # This is always 0 b/c it represents the choice index
                    "index": 0,
                    # We follow the same format used in the OpenAIChatGenerator
                    "tool_calls": [  # Optional[List[ChoiceDeltaToolCall]]
                        {
                            "index": block_idx,  # int
                            "id": tool_start["toolUseId"],  # Optional[str]
                            "function": {  # Optional[ChoiceDeltaToolCallFunction]
                                # Will accumulate deltas as string
                                "arguments": "",  # Optional[str]
                                "name": tool_start["name"],  # Optional[str]
                            },
                            "type": "function",  # Optional[Literal["function"]]
                        }
                    ],
                    "finish_reason": None,
                    "received_at": datetime.now(timezone.utc).isoformat(),
                },
            )

    elif "contentBlockDelta" in event:
        # contentBlockDelta always has the key "contentBlockIndex" and "delta"
        block_idx = event["contentBlockDelta"]["contentBlockIndex"]
        delta = event["contentBlockDelta"]["delta"]
        # This is for accumulating text deltas
        if "text" in delta:
            streaming_chunk = StreamingChunk(
                content=delta["text"],
                component_info=component_info,  # Add component_info here
                meta={
                    "model": model,
                    # This is always 0 b/c it represents the choice index
                    "index": 0,
                    "tool_calls": None,
                    "finish_reason": None,
                    "received_at": datetime.now(timezone.utc).isoformat(),
                },
            )
        # This only occurs when accumulating the arguments for a toolUse
        # The content_block for this tool should already exist at this point
        elif "toolUse" in delta:
            streaming_chunk = StreamingChunk(
                content="",
                component_info=component_info,  # Add component_info here
                meta={
                    "model": model,
                    # This is always 0 b/c it represents the choice index
                    "index": 0,
                    "tool_calls": [  # Optional[List[ChoiceDeltaToolCall]]
                        {
                            "index": block_idx,  # int
                            "id": None,  # Optional[str]
                            "function": {  # Optional[ChoiceDeltaToolCallFunction]
                                # Will accumulate deltas as string
                                "arguments": delta["toolUse"].get("input", ""),  # Optional[str]
                                "name": None,  # Optional[str]
                            },
                            "type": "function",  # Optional[Literal["function"]]
                        }
                    ],
                    "finish_reason": None,
                    "received_at": datetime.now(timezone.utc).isoformat(),
                },
            )

    elif "messageStop" in event:
        finish_reason = event["messageStop"].get("stopReason")
        streaming_chunk = StreamingChunk(
            content="",
            component_info=component_info,  # Add component_info here
            meta={
                "model": model,
                # This is always 0 b/c it represents the choice index
                "index": 0,
                "tool_calls": None,
                "finish_reason": finish_reason,
                "received_at": datetime.now(timezone.utc).isoformat(),
            },
        )

    elif "metadata" in event and "usage" in event["metadata"]:
        metadata = event["metadata"]
        streaming_chunk = StreamingChunk(
            content="",
            component_info=component_info,  # Add component_info here
            meta={
                "model": model,
                # This is always 0 b/c it represents the choice index
                "index": 0,
                "tool_calls": None,
                "finish_reason": None,
                "received_at": datetime.now(timezone.utc).isoformat(),
                "usage": {
                    "prompt_tokens": metadata["usage"].get("inputTokens", 0),
                    "completion_tokens": metadata["usage"].get("outputTokens", 0),
                    "total_tokens": metadata["usage"].get("totalTokens", 0),
                },
            },
        )

    return streaming_chunk
```

### 3. Update Streaming Response Parsing Functions

Update both sync and async versions to create `ComponentInfo` and pass it to the conversion function:

```python
def _parse_streaming_response(
    response_stream: EventStream,
    streaming_callback: SyncStreamingCallbackT,
    model: str,
    component_info: Optional[ComponentInfo] = None,  # Add this parameter
) -> List[ChatMessage]:
    """
    Parse a streaming response from Bedrock.

    :param response_stream: EventStream from Bedrock API
    :param streaming_callback: Callback for streaming chunks
    :param model: The model ID used for generation
    :param component_info: An optional `ComponentInfo` object containing information about the component that
        generated the chunk, such as the component name and type.
    :return: List of ChatMessage objects
    """
    chunks: List[StreamingChunk] = []
    for event in response_stream:
        streaming_chunk = _convert_event_to_streaming_chunk(
            event=event, 
            model=model, 
            component_info=component_info  # Pass component_info here
        )
        streaming_callback(streaming_chunk)
        chunks.append(streaming_chunk)
    replies = [_convert_streaming_chunks_to_chat_message(chunks=chunks)]
    return replies


async def _parse_streaming_response_async(
    response_stream: EventStream,
    streaming_callback: AsyncStreamingCallbackT,
    model: str,
    component_info: Optional[ComponentInfo] = None,  # Add this parameter
) -> List[ChatMessage]:
    """
    Parse a streaming response from Bedrock.

    :param response_stream: EventStream from Bedrock API
    :param streaming_callback: Callback for streaming chunks
    :param model: The model ID used for generation
    :param component_info: An optional `ComponentInfo` object containing information about the component that
        generated the chunk, such as the component name and type.
    :return: List of ChatMessage objects
    """
    chunks: List[StreamingChunk] = []
    async for event in response_stream:
        streaming_chunk = _convert_event_to_streaming_chunk(
            event=event, 
            model=model, 
            component_info=component_info  # Pass component_info here
        )
        await streaming_callback(streaming_chunk)
        chunks.append(streaming_chunk)
    replies = [_convert_streaming_chunks_to_chat_message(chunks=chunks)]
    return replies
```

### 4. Update Chat Generator Import

In `chat_generator.py`, add the `ComponentInfo` import:

```python
from haystack.dataclasses import ChatMessage, StreamingCallbackT, ComponentInfo, select_streaming_callback
```

### 5. Update Chat Generator Methods

Update both `run` and `run_async` methods to create and pass `ComponentInfo`:

```python
# In the run method, update the streaming section:
if callback:
    response = self.client.converse_stream(**params)
    response_stream: EventStream = response.get("stream")
    if not response_stream:
        msg = "No stream found in the response."
        raise AmazonBedrockInferenceError(msg)
    
    # Create ComponentInfo from this component instance
    component_info = ComponentInfo.from_component(self)
    replies = _parse_streaming_response(response_stream, callback, self.model, component_info)
else:
    response = self.client.converse(**params)
    replies = _parse_completion_response(response, self.model)

# In the run_async method, update the streaming section:
if callback:
    response = await async_client.converse_stream(**params)
    response_stream: EventStream = response.get("stream")
    if not response_stream:
        msg = "No stream found in the response."
        raise AmazonBedrockInferenceError(msg)
    
    # Create ComponentInfo from this component instance
    component_info = ComponentInfo.from_component(self)
    replies = await _parse_streaming_response_async(response_stream, callback, self.model, component_info)
else:
    response = await async_client.converse(**params)
    replies = _parse_completion_response(response, self.model)
```

## Testing

The existing tests should continue to pass with these changes. The `ComponentInfo` will be automatically populated for streaming chunks, making the component information available to any streaming callback functions.

You can test the implementation by running streaming inference and checking that `chunk.component_info` is properly populated:

```python
def test_streaming_callback(chunk):
    assert chunk.component_info is not None
    assert "AmazonBedrockChatGenerator" in chunk.component_info.type
    # component name will be None unless the component is added to a pipeline with a specific name

generator = AmazonBedrockChatGenerator(model="anthropic.claude-3-5-sonnet-20240620-v1:0", streaming_callback=test_streaming_callback)
```

This implementation follows the exact same pattern as the OpenAI chat generator and ensures consistency across all chat generators in the Haystack ecosystem.
# Render it in markdown format
from IPython.display import Markdown, display

display(Markdown("# Comment from Agent\n\n" + result["issue_resolver_agent"]["last_message"].tool_call_result.result))

通过查看其他 messages,您可以观察到我们的 Issue 解决代理的迭代过程,它逐步生成 GitHub 评论,进行工具调用并处理其结果。

result["issue_resolver_agent"]["messages"]

我们还可以查看代理查看过的文件

for document in result["issue_resolver_agent"]["documents"]:
    if document.meta["type"] in ["file_content"]:
        display(Markdown(f"[{document.meta['url']}]({document.meta['url']})"))

https://github.com/deepset-ai/haystack-core-integrations/blob/main/integrations/amazon_bedrock/src/haystack_integrations/components/generators/amazon_bedrock/chat/chat_generator.py

https://github.com/deepset-ai/haystack-core-integrations/blob/main/integrations/amazon_bedrock/src/haystack_integrations/components/generators/amazon_bedrock/chat/utils.py

https://github.com/deepset-ai/haystack/blob/main/haystack/dataclasses/streaming_chunk.py

https://github.com/deepset-ai/haystack/blob/main/haystack/components/generators/chat/openai.py

https://github.com/deepset-ai/haystack-core-integrations/blob/main/integrations/amazon_bedrock/tests/test_chat_generator.py