📘 **TELUS Agriculture & Consumer Goods** 如何通过 **Haystack Agents** 转变促销交易

具有人工干预的 DevOps 支持 Agent


笔记本作者:Amna Mubashar

本笔记本演示了 Haystack Agent 在对下一步感到不确定时,如何交互式地请求用户输入。我们将构建一个 Agent 可以动态调用的人工干预工具。当 Agent 遇到歧义或信息不完整时,它会向人类请求更多输入以继续完成任务。

为此,我们将创建一个DevOps 支持 Agent

CI/CD 流水线偶尔会因难以诊断的原因而失败,包括手动检查:测试失败、配置错误的变量、不稳定的集成等。

支持 Agent 使用 Haystack 工具Agent 构建,并将执行以下任务:

  • 检查是否有失败的 CI 工作流
  • 获取构建日志和状态
  • 查找并分析 git 提交
  • 建议后续故障排除步骤
  • 通过人工干预工具升级给人工

安装所需的依赖项

!pip install "haystack-ai>=2.14.0"

接下来,我们配置变量。我们需要为 OpenAIChatGenerator 设置 OPENAI_API_KEY,并设置具有适当权限的 GITHUB_TOKEN 以访问存储库信息、CI 日志和提交历史记录。请确保您的 GitHub Token 对您要分析的存储库具有足够的访问权限。

from getpass import getpass
import os

if not os.environ.get("OPENAI_API_KEY"):
    os.environ["OPENAI_API_KEY"] = getpass("Enter your OpenAI API key:")

if not os.environ.get("GITHUB_TOKEN"):
    os.environ["GITHUB_TOKEN"] = getpass("Enter your GitHub token:")

定义工具

我们首先定义 Agent 将使用的工具。

git_list_commits_tool:获取并格式化给定 GitHub 存储库和分支上的最新提交。
在诊断 CI 故障或代码回归时,可用于快速呈现最新更改。

import os, requests, zipfile, io
from collections import defaultdict
from typing import Annotated, List, Dict, Tuple
from haystack.tools import tool

@tool
def git_list_commits_tool(
    repo: Annotated[
        str, 
        "The GitHub repository in 'owner/name' format, e.g. 'my-org/my-repo'"]
    ,
    branch: Annotated[
        str, 
        "The branch name to list commits from"] = "main"
) -> str:
    '''List the most recent commits for a GitHub repository and branch.'''
    token = os.getenv("GITHUB_TOKEN")
    if not token:
        return "Error: GITHUB_TOKEN not set in environment."

    url = f"https://api.github.com/repos/{repo}/commits"
    headers = {"Authorization": f"token {token}"}
    params = {"sha": branch, "per_page": 10}
    resp = requests.get(url, headers=headers, params=params)
    resp.raise_for_status()
    commits = resp.json()

    lines = []
    for c in commits:
        sha = c["sha"][:7]
        msg = c["commit"]["message"].split("\n")[0]
        lines.append(f"- {sha}: {msg}")
    return "\n".join(lines)

git_diff_tool:检索 GitHub 存储库中两个提交或分支之间的补丁(diff)。
允许并排检查可能导致测试失败的代码更改。

@tool
def git_commit_diff_tool(
    repo: Annotated[
        str, 
        "The GitHub repository in 'owner/name' format, e.g. 'my-org/my-repo'"]
    ,
    base: Annotated[
        str, 
        "The base commit SHA or branch name"]
    ,
    head: Annotated[
        str, 
        "The head commit SHA or branch name"]
) -> str:
    '''Get the diff (patch) between two commits or branches in a GitHub repository.'''
    token = os.getenv("GITHUB_TOKEN")
    if not token:
        return "Error: GITHUB_TOKEN not set in environment."

    url = f"https://api.github.com/repos/{repo}/compare/{base}...{head}"
    headers = {"Authorization": f"token {token}"}
    resp = requests.get(url, headers=headers)
    resp.raise_for_status()
    data = resp.json()
    return data.get("files", [])

ci_status_tool:检查最近 GitHub Actions 工作流运行是否失败,下载其日志,并提取第一个失败的测试名称和错误消息。通过呈现导致流水线失败的确切测试和错误来自动化根本原因识别。

@tool
def ci_status_tool(
    repo: Annotated[
      str,
      "The GitHub repository in 'owner/name' format, e.g. 'my-org/my-repo'"
    ],
    per_page: Annotated[
      int,
      "How many of the most recent workflow runs to check (default 50)"
    ] = 50
) -> str:
    '''Check the most recent GitHub Actions workflow runs for a given repository, list any that failed, download their logs, and extract all failures, grouped by test suite (inferred from log file paths).'''
    token = os.getenv("GITHUB_TOKEN")
    if not token:
        return "Error: GITHUB_TOKEN environment variable not set."
    
    headers = {"Authorization": f"token {token}"}
    params = {"per_page": per_page}
    runs_url = f"https://api.github.com/repos/{repo}/actions/runs"
    
    resp = requests.get(runs_url, headers=headers, params=params)
    resp.raise_for_status()
    runs = resp.json().get("workflow_runs", [])
    
    failed_runs = [r for r in runs if r.get("conclusion") == "failure"]
    if not failed_runs:
        return f"No failed runs in the last {per_page} workflow runs for `{repo}`."

    def extract_all_failures(logs_url: str) -> List[Tuple[str, str, str]]:
        """
        Download and scan logs ZIP for all failure markers.
        Returns a list of tuples: (suite, test_name, error_line).
        """
        r = requests.get(logs_url, headers=headers)
        r.raise_for_status()
        z = zipfile.ZipFile(io.BytesIO(r.content))
        failures = []
        for filepath in z.namelist():
            if not filepath.lower().endswith(('.txt', '.log')):
                continue
            suite = filepath.split('/', 1)[0]  # infer suite as top-level folder or file
            with z.open(filepath) as f:
                for raw in f:
                    try:
                        line = raw.decode('utf-8', errors='ignore').strip()
                    except:
                        continue
                    if any(marker in line for marker in ("FAIL", "ERROR", "Exception", "error")):
                        parts = line.split()
                        test_name = next(
                            (p for p in parts if '.' in p or p.endswith("()")), 
                            parts[0] if parts else ""
                        )
                        failures.append((suite, test_name, line))
        return failures

    output = [f"Found {len(failed_runs)} failed run(s):"]
    for run in failed_runs:
        run_id   = run["id"]
        branch   = run.get("head_branch")
        event    = run.get("event")
        created  = run.get("created_at")
        logs_url = run.get("logs_url")
        html_url = run.get("html_url")

        failures = extract_all_failures(logs_url)
        if not failures:
            detail = "No individual failures parsed from logs."
        else:
            # Group by suite
            by_suite: Dict[str, List[Tuple[str,str]]] = defaultdict(list)
            for suite, test, err in failures:
                by_suite[suite].append((test, err))
            lines = []
            for suite, items in by_suite.items():
                lines.append(f"  ▶ **Suite**: `{suite}`")
                for test, err in items:
                    lines.append(f"    - **{test}**: {err}")
            detail = "\n".join(lines)

        output.append(
            f"- **Run ID**: {run_id}\n"
            f"  **Branch**: {branch}\n"
            f"  **Event**: {event}\n"
            f"  **Created At**: {created}\n"
            f"  **Failures by Suite**:\n{detail}\n"
            f"  **Logs ZIP**: {logs_url}\n"
            f"  **Run URL**: {html_url}\n"
        )

    return "\n\n".join(output)

shell_tool:使用可配置的超时来执行本地 shell 命令,捕获 stdout 或返回详细的错误输出。非常适合在将相关代码片段传递给 LLM 之前进行 grepping、过滤或 tailing CI 日志文件。

import subprocess
@tool
def shell_tool(
    command: Annotated[
        str,
        "The shell command to execute, e.g. 'grep -E \"ERROR|Exception\" build.log'"
    ],
    timeout: Annotated[
        int,
        "Maximum time in seconds to allow the command to run"
    ] = 30
) -> str:
    '''Executes a shell command with a timeout and returns stdout or an error message.'''
    try:
        output = subprocess.check_output(
            command,
            shell=True,
            stderr=subprocess.STDOUT,
            timeout=timeout,
            universal_newlines=True
        )
        return output
    except subprocess.CalledProcessError as e:
        return f"❌ Command failed (exit code {e.returncode}):\n{e.output}"
    except subprocess.TimeoutExpired:
        return f"❌ Command timed out after {timeout} seconds."
    except Exception as e:
        return f"❌ Unexpected error: {str(e)}"

human_in_loop_tool:当 Agent 遇到歧义或需要额外信息时,通过 input() 向用户提出澄清性问题。确保 Agent 仅在绝对必要时才中断以获取人工反馈。

@tool
def human_in_loop_tool(
    question: Annotated[
        str,
        "A clarifying question to prompt the user for more information"
    ]
) -> str:
    '''Ask the user a clarifying question and return their response via input().'''
    user_message = input(f"[Agent needs your input] {question}\n> ")
    return user_message

创建并配置代理

创建 Haystack Agent 实例,并使用系统提示和工具配置其行为。

from haystack.components.agents import Agent
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
from haystack.components.generators.utils import print_streaming_chunk

agent = Agent(
    chat_generator=OpenAIChatGenerator(model="o4-mini"),
    tools=[git_list_commits_tool, git_commit_diff_tool, ci_status_tool, shell_tool, human_in_loop_tool],
    system_prompt=(
        "You are a DevOps support assistant with the following capabilities:\n"
        "1. Check CI failures\n"
        "2. Fetch commits and diffs from GitHub\n"
        "3. Analyze logs\n"
        "4. Ask the user for clarification\n\n"
        "Behavior Guidelines:\n"
        "- Tool-First: Always attempt to resolve the user's request by using the tools provided.\n"
        "- Concise Reasoning: Before calling each tool, think (briefly) about why it's needed—then call it.\n"
        "IMPORTANT: Whenever you are unsure about any details or need clarification, "
        "you MUST use the human_in_loop tool to ask the user questions. For example if"
        "  * Required parameters are missing or ambiguous (e.g. repo name, run ID, branch)\n"
        "  * A tool returns an unexpected error that needs human insight\n"
        "- Structured Outputs: After your final tool call, summarize findings in Markdown:\n"
        "  * Run ID, Branch, Test, Error\n"
        "  * Next Steps (e.g., \"Add null-check to line 45\", \"Rerun tests after env fix\")\n"
        "- Error Handling: If a tool fails (e.g. 404, timeout), surface the error and decide whether to retry, choose another tool, or ask for clarification.\n"
        "EXIT CONDITION: Once you've provided a complete, actionable summary and next steps, exit."
        
    ),
    exit_conditions=["text"]
)

# Warm up the Agent
agent.warm_up()

代理内省与流式传输

Haystack 支持在 Agent 执行期间对 ToolCallsToolResults 进行流式传输。通过向 Agent 传递 streaming_callback,您可以实时观察内部推理过程。

这允许您:

  • 查看 Agent 决定使用的工具。
  • 检查传递给每个工具的参数。
  • 在生成最终答案之前,查看从每个工具返回的结果。

这对于调试和复杂的多步推理工作流的透明度尤其有用。

注意:您可以传递任何 streaming_callback 函数。Haystack 中的 print_streaming_chunk 实用函数默认配置为流式传输工具调用和结果。

# Pass the user query to the Agent
message=[ChatMessage.from_user("For the given repo, check for any failed CI workflows. Then debug and analyze the workflow by finding the root cause of the failure.")]

# Run the Agent
response = agent.run(messages=message, streaming_callback=print_streaming_chunk)
[TOOL CALL]
Tool: human_in_loop_tool 
Arguments: {"question":"Please provide the GitHub repository in 'owner/name' format for which you want to check CI failures."}

[TOOL RESULT]
deepset-ai/haystack

[TOOL CALL]
Tool: ci_status_tool 
Arguments: {"repo":"deepset-ai/haystack","per_page":50}

[TOOL RESULT]
Found 2 failed run(s):

- **Run ID**: 14994639315
  **Branch**: update-tools-strict-openai
  **Event**: pull_request
  **Created At**: 2025-05-13T10:45:32Z
  **Failures by Suite**:
  ▶ **Suite**: `Unit  ubuntu-latest`
    - **2025-05-13T10:47:25.4802006Z**: 2025-05-13T10:47:25.4802006Z ERROR    haystack.core.serialization:serialization.py:263 Failed to import class 'Nonexisting.DocumentStore'
    - **2025-05-13T10:47:26.4669164Z**: 2025-05-13T10:47:26.4669164Z test/components/converters/test_csv_to_document.py::TestCSVToDocument::test_run_error_handling
    - **2025-05-13T10:47:26.6024332Z**: 2025-05-13T10:47:26.6024332Z test/components/converters/test_docx_file_to_document.py::TestDOCXToDocument::test_run_error_wrong_file_type

    [... additional test failure details omitted for brevity ...]

    [... agent analysis and recommendations continue below ...]
print(response["messages"][-1].text)
Here’s what I found for the most recent failing run:

**Run ID**: 14994639315  
**Branch**: update-tools-strict-openai  

| Test Suite                    | Test / Step                                              | Error                                                                       |
|-------------------------------|-----------------------------------------------------------|-----------------------------------------------------------------------------|
| 0_lint.txt (→ `lint`)         | `Process completed with exit code 4`                       | Linter returned code 4 (no details in output)                               |
| Integration · ubuntu-latest   | TestMarkdownToDocument::test_run_error_handling            | “Task exception was never retrieved” / `RuntimeError('Event loop is closed')` |
| Integration · windows-latest  | TestMarkdownToDocument::test_run_error_handling            | `RuntimeError('Event loop is closed')` (unraisableexception warnings)       |
| Integration · macos-latest    | TestMarkdownToDocument::test_run_error_handling            | `RuntimeError('Event loop is closed')`                                      |

Next Steps:

1. **Fix Lint Exit Code 4**  
   - Run the lint job locally (`flake8` or `hatch run lint`) to reproduce the exit-code and capture missing details.  
   - Adjust `.flake8`/`pre-commit` config or fix any new style violations so that the lint step exits zero.

2. **Resolve “Event loop is closed” in Markdown converter tests**  
   - In `haystack/components/converters/markdown.py` (or its test), ensure any `asyncio` event loop is properly created/closed.  
   - Switch to `pytest-asyncio` fixtures (e.g. use `@pytest.mark.asyncio` and the provided `event_loop` fixture) so that the loop remains open for the duration of `test_run_error_handling`.

3. Rerun CI to confirm both the lint and integration failures are resolved.