具有人工干预的 DevOps 支持 Agent
最后更新:2025年5月28日
笔记本作者:Amna Mubashar
本笔记本演示了 Haystack Agent 在对下一步感到不确定时,如何交互式地请求用户输入。我们将构建一个 Agent 可以动态调用的人工干预工具。当 Agent 遇到歧义或信息不完整时,它会向人类请求更多输入以继续完成任务。
为此,我们将创建一个DevOps 支持 Agent。
CI/CD 流水线偶尔会因难以诊断的原因而失败,包括手动检查:测试失败、配置错误的变量、不稳定的集成等。
支持 Agent 使用 Haystack 工具和 Agent 构建,并将执行以下任务:
- 检查是否有失败的 CI 工作流
- 获取构建日志和状态
- 查找并分析 git 提交
- 建议后续故障排除步骤
- 通过人工干预工具升级给人工
安装所需的依赖项
!pip install "haystack-ai>=2.14.0"
接下来,我们配置变量。我们需要为 OpenAIChatGenerator 设置 OPENAI_API_KEY,并设置具有适当权限的 GITHUB_TOKEN 以访问存储库信息、CI 日志和提交历史记录。请确保您的 GitHub Token 对您要分析的存储库具有足够的访问权限。
from getpass import getpass
import os
if not os.environ.get("OPENAI_API_KEY"):
os.environ["OPENAI_API_KEY"] = getpass("Enter your OpenAI API key:")
if not os.environ.get("GITHUB_TOKEN"):
os.environ["GITHUB_TOKEN"] = getpass("Enter your GitHub token:")
定义工具
我们首先定义 Agent 将使用的工具。
git_list_commits_tool:获取并格式化给定 GitHub 存储库和分支上的最新提交。
在诊断 CI 故障或代码回归时,可用于快速呈现最新更改。
import os, requests, zipfile, io
from collections import defaultdict
from typing import Annotated, List, Dict, Tuple
from haystack.tools import tool
@tool
def git_list_commits_tool(
repo: Annotated[
str,
"The GitHub repository in 'owner/name' format, e.g. 'my-org/my-repo'"]
,
branch: Annotated[
str,
"The branch name to list commits from"] = "main"
) -> str:
'''List the most recent commits for a GitHub repository and branch.'''
token = os.getenv("GITHUB_TOKEN")
if not token:
return "Error: GITHUB_TOKEN not set in environment."
url = f"https://api.github.com/repos/{repo}/commits"
headers = {"Authorization": f"token {token}"}
params = {"sha": branch, "per_page": 10}
resp = requests.get(url, headers=headers, params=params)
resp.raise_for_status()
commits = resp.json()
lines = []
for c in commits:
sha = c["sha"][:7]
msg = c["commit"]["message"].split("\n")[0]
lines.append(f"- {sha}: {msg}")
return "\n".join(lines)
git_diff_tool:检索 GitHub 存储库中两个提交或分支之间的补丁(diff)。
允许并排检查可能导致测试失败的代码更改。
@tool
def git_commit_diff_tool(
repo: Annotated[
str,
"The GitHub repository in 'owner/name' format, e.g. 'my-org/my-repo'"]
,
base: Annotated[
str,
"The base commit SHA or branch name"]
,
head: Annotated[
str,
"The head commit SHA or branch name"]
) -> str:
'''Get the diff (patch) between two commits or branches in a GitHub repository.'''
token = os.getenv("GITHUB_TOKEN")
if not token:
return "Error: GITHUB_TOKEN not set in environment."
url = f"https://api.github.com/repos/{repo}/compare/{base}...{head}"
headers = {"Authorization": f"token {token}"}
resp = requests.get(url, headers=headers)
resp.raise_for_status()
data = resp.json()
return data.get("files", [])
ci_status_tool:检查最近 GitHub Actions 工作流运行是否失败,下载其日志,并提取第一个失败的测试名称和错误消息。通过呈现导致流水线失败的确切测试和错误来自动化根本原因识别。
@tool
def ci_status_tool(
repo: Annotated[
str,
"The GitHub repository in 'owner/name' format, e.g. 'my-org/my-repo'"
],
per_page: Annotated[
int,
"How many of the most recent workflow runs to check (default 50)"
] = 50
) -> str:
'''Check the most recent GitHub Actions workflow runs for a given repository, list any that failed, download their logs, and extract all failures, grouped by test suite (inferred from log file paths).'''
token = os.getenv("GITHUB_TOKEN")
if not token:
return "Error: GITHUB_TOKEN environment variable not set."
headers = {"Authorization": f"token {token}"}
params = {"per_page": per_page}
runs_url = f"https://api.github.com/repos/{repo}/actions/runs"
resp = requests.get(runs_url, headers=headers, params=params)
resp.raise_for_status()
runs = resp.json().get("workflow_runs", [])
failed_runs = [r for r in runs if r.get("conclusion") == "failure"]
if not failed_runs:
return f"No failed runs in the last {per_page} workflow runs for `{repo}`."
def extract_all_failures(logs_url: str) -> List[Tuple[str, str, str]]:
"""
Download and scan logs ZIP for all failure markers.
Returns a list of tuples: (suite, test_name, error_line).
"""
r = requests.get(logs_url, headers=headers)
r.raise_for_status()
z = zipfile.ZipFile(io.BytesIO(r.content))
failures = []
for filepath in z.namelist():
if not filepath.lower().endswith(('.txt', '.log')):
continue
suite = filepath.split('/', 1)[0] # infer suite as top-level folder or file
with z.open(filepath) as f:
for raw in f:
try:
line = raw.decode('utf-8', errors='ignore').strip()
except:
continue
if any(marker in line for marker in ("FAIL", "ERROR", "Exception", "error")):
parts = line.split()
test_name = next(
(p for p in parts if '.' in p or p.endswith("()")),
parts[0] if parts else ""
)
failures.append((suite, test_name, line))
return failures
output = [f"Found {len(failed_runs)} failed run(s):"]
for run in failed_runs:
run_id = run["id"]
branch = run.get("head_branch")
event = run.get("event")
created = run.get("created_at")
logs_url = run.get("logs_url")
html_url = run.get("html_url")
failures = extract_all_failures(logs_url)
if not failures:
detail = "No individual failures parsed from logs."
else:
# Group by suite
by_suite: Dict[str, List[Tuple[str,str]]] = defaultdict(list)
for suite, test, err in failures:
by_suite[suite].append((test, err))
lines = []
for suite, items in by_suite.items():
lines.append(f" ▶ **Suite**: `{suite}`")
for test, err in items:
lines.append(f" - **{test}**: {err}")
detail = "\n".join(lines)
output.append(
f"- **Run ID**: {run_id}\n"
f" **Branch**: {branch}\n"
f" **Event**: {event}\n"
f" **Created At**: {created}\n"
f" **Failures by Suite**:\n{detail}\n"
f" **Logs ZIP**: {logs_url}\n"
f" **Run URL**: {html_url}\n"
)
return "\n\n".join(output)
shell_tool:使用可配置的超时来执行本地 shell 命令,捕获 stdout 或返回详细的错误输出。非常适合在将相关代码片段传递给 LLM 之前进行 grepping、过滤或 tailing CI 日志文件。
import subprocess
@tool
def shell_tool(
command: Annotated[
str,
"The shell command to execute, e.g. 'grep -E \"ERROR|Exception\" build.log'"
],
timeout: Annotated[
int,
"Maximum time in seconds to allow the command to run"
] = 30
) -> str:
'''Executes a shell command with a timeout and returns stdout or an error message.'''
try:
output = subprocess.check_output(
command,
shell=True,
stderr=subprocess.STDOUT,
timeout=timeout,
universal_newlines=True
)
return output
except subprocess.CalledProcessError as e:
return f"❌ Command failed (exit code {e.returncode}):\n{e.output}"
except subprocess.TimeoutExpired:
return f"❌ Command timed out after {timeout} seconds."
except Exception as e:
return f"❌ Unexpected error: {str(e)}"
human_in_loop_tool:当 Agent 遇到歧义或需要额外信息时,通过 input() 向用户提出澄清性问题。确保 Agent 仅在绝对必要时才中断以获取人工反馈。
@tool
def human_in_loop_tool(
question: Annotated[
str,
"A clarifying question to prompt the user for more information"
]
) -> str:
'''Ask the user a clarifying question and return their response via input().'''
user_message = input(f"[Agent needs your input] {question}\n> ")
return user_message
创建并配置代理
创建 Haystack Agent 实例,并使用系统提示和工具配置其行为。
from haystack.components.agents import Agent
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
from haystack.components.generators.utils import print_streaming_chunk
agent = Agent(
chat_generator=OpenAIChatGenerator(model="o4-mini"),
tools=[git_list_commits_tool, git_commit_diff_tool, ci_status_tool, shell_tool, human_in_loop_tool],
system_prompt=(
"You are a DevOps support assistant with the following capabilities:\n"
"1. Check CI failures\n"
"2. Fetch commits and diffs from GitHub\n"
"3. Analyze logs\n"
"4. Ask the user for clarification\n\n"
"Behavior Guidelines:\n"
"- Tool-First: Always attempt to resolve the user's request by using the tools provided.\n"
"- Concise Reasoning: Before calling each tool, think (briefly) about why it's needed—then call it.\n"
"IMPORTANT: Whenever you are unsure about any details or need clarification, "
"you MUST use the human_in_loop tool to ask the user questions. For example if"
" * Required parameters are missing or ambiguous (e.g. repo name, run ID, branch)\n"
" * A tool returns an unexpected error that needs human insight\n"
"- Structured Outputs: After your final tool call, summarize findings in Markdown:\n"
" * Run ID, Branch, Test, Error\n"
" * Next Steps (e.g., \"Add null-check to line 45\", \"Rerun tests after env fix\")\n"
"- Error Handling: If a tool fails (e.g. 404, timeout), surface the error and decide whether to retry, choose another tool, or ask for clarification.\n"
"EXIT CONDITION: Once you've provided a complete, actionable summary and next steps, exit."
),
exit_conditions=["text"]
)
# Warm up the Agent
agent.warm_up()
代理内省与流式传输
Haystack 支持在 Agent 执行期间对 ToolCalls 和 ToolResults 进行流式传输。通过向 Agent 传递 streaming_callback,您可以实时观察内部推理过程。
这允许您:
- 查看 Agent 决定使用的工具。
- 检查传递给每个工具的参数。
- 在生成最终答案之前,查看从每个工具返回的结果。
这对于调试和复杂的多步推理工作流的透明度尤其有用。
注意:您可以传递任何 streaming_callback 函数。Haystack 中的 print_streaming_chunk 实用函数默认配置为流式传输工具调用和结果。
# Pass the user query to the Agent
message=[ChatMessage.from_user("For the given repo, check for any failed CI workflows. Then debug and analyze the workflow by finding the root cause of the failure.")]
# Run the Agent
response = agent.run(messages=message, streaming_callback=print_streaming_chunk)
[TOOL CALL]
Tool: human_in_loop_tool
Arguments: {"question":"Please provide the GitHub repository in 'owner/name' format for which you want to check CI failures."}
[TOOL RESULT]
deepset-ai/haystack
[TOOL CALL]
Tool: ci_status_tool
Arguments: {"repo":"deepset-ai/haystack","per_page":50}
[TOOL RESULT]
Found 2 failed run(s):
- **Run ID**: 14994639315
**Branch**: update-tools-strict-openai
**Event**: pull_request
**Created At**: 2025-05-13T10:45:32Z
**Failures by Suite**:
▶ **Suite**: `Unit ubuntu-latest`
- **2025-05-13T10:47:25.4802006Z**: 2025-05-13T10:47:25.4802006Z ERROR haystack.core.serialization:serialization.py:263 Failed to import class 'Nonexisting.DocumentStore'
- **2025-05-13T10:47:26.4669164Z**: 2025-05-13T10:47:26.4669164Z test/components/converters/test_csv_to_document.py::TestCSVToDocument::test_run_error_handling
- **2025-05-13T10:47:26.6024332Z**: 2025-05-13T10:47:26.6024332Z test/components/converters/test_docx_file_to_document.py::TestDOCXToDocument::test_run_error_wrong_file_type
[... additional test failure details omitted for brevity ...]
[... agent analysis and recommendations continue below ...]
print(response["messages"][-1].text)
Here’s what I found for the most recent failing run:
**Run ID**: 14994639315
**Branch**: update-tools-strict-openai
| Test Suite | Test / Step | Error |
|-------------------------------|-----------------------------------------------------------|-----------------------------------------------------------------------------|
| 0_lint.txt (→ `lint`) | `Process completed with exit code 4` | Linter returned code 4 (no details in output) |
| Integration · ubuntu-latest | TestMarkdownToDocument::test_run_error_handling | “Task exception was never retrieved” / `RuntimeError('Event loop is closed')` |
| Integration · windows-latest | TestMarkdownToDocument::test_run_error_handling | `RuntimeError('Event loop is closed')` (unraisableexception warnings) |
| Integration · macos-latest | TestMarkdownToDocument::test_run_error_handling | `RuntimeError('Event loop is closed')` |
Next Steps:
1. **Fix Lint Exit Code 4**
- Run the lint job locally (`flake8` or `hatch run lint`) to reproduce the exit-code and capture missing details.
- Adjust `.flake8`/`pre-commit` config or fix any new style violations so that the lint step exits zero.
2. **Resolve “Event loop is closed” in Markdown converter tests**
- In `haystack/components/converters/markdown.py` (or its test), ensure any `asyncio` event loop is properly created/closed.
- Switch to `pytest-asyncio` fixtures (e.g. use `@pytest.mark.asyncio` and the provided `event_loop` fixture) so that the loop remains open for the duration of `test_run_error_handling`.
3. Rerun CI to confirm both the lint and integration failures are resolved.
