📘 **TELUS Agriculture & Consumer Goods** 如何通过 **Haystack Agents** 转变促销交易

在异步环境中运行 Haystack 流水线


笔记本由 Madeeswaran KannanMathis Lucka 编写

在本笔记本中,您将学习如何使用 AsyncPipeline 和支持异步的组件来构建和执行 Haystack pipeline,并能在异步环境中运行。本教程基于 这个简短的 Haystack 教程,因此建议您在开始之前先熟悉它。另一个先决条件是您需要具备协程调度和 Python 异步编程方面的实践知识。

动机

默认情况下,haystack 中的 Pipeline 类是一个普通的 Python 对象类,它提供非 async 方法来添加/连接组件并执行 pipeline 逻辑。虽然它可以在异步环境中使用,但这样做并非最优,因为它以“阻塞”的方式执行其逻辑。换句话说,一旦调用 Pipeline.run 方法,它必须运行完毕并返回输出后,下一行代码才能执行1。在典型的异步环境中,这会阻止活动的异步事件循环调度其他 async 协程,从而降低吞吐量。为了缓解这个瓶颈,我们引入了支持异步的 Haystack 组件的概念以及一个 AsyncPipeline 类,该类可以协同调度异步和非异步组件的执行。

1 - 这是一个简化说法,因为 Python 运行时可能调度另一个线程,但在本例中我们可以忽略这个细节。

AsyncPipeline 的优势

  • 并发执行组件以加速 pipeline 执行。
  • 分步执行组件以调试您的 pipeline。
  • 在异步环境中提高吞吐量,例如当在 FastAPI 端点后执行 pipeline 时。
  • 允许单个组件选择支持 async
    • 并非所有组件都受益于支持异步——I/O 密集型组件是最适合的候选。
  • 提供一种向后兼容的方式来执行包含异步和非异步组件的 Haystack pipeline。

现在,让我们开始看看将异步支持添加到原始教程需要哪些步骤,首先是安装 Haystack 和必要的依赖项。

开发环境

%%bash

pip install -U haystack-ai -q
pip install datasets -q
pip install sentence-transformers -q
pip install nest_asyncio -q

提供一个 OpenAI API 密钥,以确保 LLM 生成器可以查询 OpenAI API。

import os
from getpass import getpass

if "OPENAI_API_KEY" not in os.environ:
    os.environ["OPENAI_API_KEY"] = getpass("Enter OpenAI API key:")

# If you're running this notebook on Google Colab, you can do the following instead:
#
# from google.colab import userdata
# if "OPENAI_API_KEY" not in os.environ:
#  os.environ["OPENAI_API_KEY"] = userdata.get('OPENAI_API_KEY')
# The IPython environment is already running an event-loop.
# We will later use a method that tries to create another event-loop which would fail without this snippet.
import nest_asyncio
nest_asyncio.apply()

创建 AsyncPipeline

获取并索引文档

初始化一个 DocumentStore 来索引您的文档。

from haystack.document_stores.in_memory import InMemoryDocumentStore

document_store = InMemoryDocumentStore()

获取数据并将其转换为 Haystack Document

from datasets import load_dataset
from haystack import Document

dataset = load_dataset("bilgeyucel/seven-wonders", split="train")
docs = [Document(content=doc["content"], meta=doc["meta"]) for doc in dataset]

要将数据与嵌入一起存储在 DocumentStore 中,请使用模型名称初始化一个 SentenceTransformersDocumentEmbedder,然后调用 warm_up() 来下载嵌入模型。

然后,我们使用新预热的嵌入器计算文档的嵌入,并将文档写入文档存储。

from haystack.components.embedders import SentenceTransformersDocumentEmbedder

doc_embedder = SentenceTransformersDocumentEmbedder(
    model="sentence-transformers/all-MiniLM-L6-v2"
)
doc_embedder.warm_up()

docs_with_embeddings = doc_embedder.run(docs)
n_docs_written = document_store.write_documents(docs_with_embeddings["documents"])
print(f"Indexed {n_docs_written} documents")
Batches: 100%|██████████| 5/5 [00:00<00:00,  5.37it/s]

Indexed 151 documents

下一步是构建 RAG pipeline 以生成用户查询的答案。我们使用混合检索来构建 RAG pipeline。混合检索使用两个可以并发运行的检索分支。

初始化一个文本嵌入器来创建用户查询的嵌入,以及一个 InMemoryEmbeddingRetriever 和一个 InMemoryBM25Retriever 来与您之前初始化的 InMemoryDocumentStore 一起使用。我们将两个检索器的结果输入到 DocumentJoiner 中,并使用倒数排名融合来获得文档的最终排名。

from haystack.components.embedders import SentenceTransformersTextEmbedder
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever, InMemoryBM25Retriever
from haystack.components.joiners import DocumentJoiner

text_embedder = SentenceTransformersTextEmbedder(model="sentence-transformers/all-MiniLM-L6-v2")
embedding_retriever = InMemoryEmbeddingRetriever(document_store)
bm25_retriever = InMemoryBM25Retriever(document_store)
joiner = DocumentJoiner(join_mode="reciprocal_rank_fusion")

创建一个自定义提示与 ChatPromptBuilder 一起使用,并初始化一个 OpenAIChatGenerator 来消费前者的输出。

from haystack.components.builders import ChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage

template = """
Given the following information, answer the question.

Context:
{% for document in documents %}
    {{ document.content }}
{% endfor %}

Question: {{question}}
Answer:
"""

prompt_builder = ChatPromptBuilder(template=[ChatMessage.from_user(template)])
generator = OpenAIChatGenerator(model="gpt-4o-mini")

最后我们来创建 pipeline 实例。我们没有使用 Pipeline 类,而是使用了 AsyncPipeline 类。

其余过程,即添加组件并相互连接,与原始 Pipeline 类相同。

from haystack import AsyncPipeline

async_rag_pipeline = AsyncPipeline()
# Add components to your pipeline
async_rag_pipeline.add_component("text_embedder", text_embedder)
async_rag_pipeline.add_component("embedding_retriever", embedding_retriever)
async_rag_pipeline.add_component("bm25_retriever", bm25_retriever)
async_rag_pipeline.add_component("joiner", joiner)
async_rag_pipeline.add_component("prompt_builder", prompt_builder)
async_rag_pipeline.add_component("llm", generator)

# Now, connect the components to each other
async_rag_pipeline.connect("text_embedder.embedding", "embedding_retriever.query_embedding")
async_rag_pipeline.connect("bm25_retriever.documents", "joiner.documents")
async_rag_pipeline.connect("embedding_retriever.documents", "joiner.documents")
async_rag_pipeline.connect("joiner.documents", "prompt_builder.documents")
async_rag_pipeline.connect("prompt_builder.prompt", "llm.messages")

async_rag_pipeline.show()
# You can see from the visual pipeline representation that embedding retriever and bm25 retriever do not depend on each other; they could run concurrently

现在,我们创建一个协程来用问题查询 pipeline。

我们使用 run_async_generator 方法来执行 AsyncPipelinerun_async_generator 返回一个 AsyncIterator,我们需要迭代它来推动 pipeline 的执行。

本质上,这允许我们逐个组件地逐步执行 pipeline,这对于调试 pipeline 或当您想在任何组件完成后运行自定义逻辑时都很有用。

AsyncPipeline 还公开了

  • 一个 run_async 方法,该方法在返回最终输出之前执行整个 pipeline。
  • 一个 run 方法,可以从非异步环境调用,但仍并发执行组件;run 方法是 Pipeline.run 的直接替代品。

我们迭代 AsyncIterator 并打印来自检索器和 joiner 的中间输出。

question = "Where is Gardens of Babylon?"
inputs = {
    "text_embedder": {"text": question},
    "bm25_retriever": {"query": question},
    "prompt_builder": {"question": question},
}
include_outputs_from = ["embedding_retriever", "bm25_retriever", "joiner"]
consumed_outputs = []
async for output in async_rag_pipeline.run_async_generator(data=inputs, include_outputs_from=include_outputs_from):
    final_output = output
    for component, results in output.items():
        if component not in consumed_outputs:
            consumed_outputs.append(component)
            if "documents" in results:
                print(f"Outputs from `{component}`.")
                for doc in results["documents"][:1]:
                    print(f"Score: {doc.score}")
                    print(f"Content: {doc.content[:500]}...")
                    print("------------")


print("LLM Response:")
print(final_output["llm"]["replies"][0].text)
    
Outputs from `bm25_retriever`.
Score: 13.520340633994273
Content: [21] However, the gardens were said to still exist at the time that later writers described them, and some of these accounts are regarded as deriving from people who had visited Babylon.[2] Herodotus, who describes Babylon in his Histories, does not mention the Hanging Gardens,[22] although it could be that the gardens were not yet well known to the Greeks at the time of his visit.[2]
To date, no archaeological evidence has been found at Babylon for the Hanging Gardens.[6] It is possible that ev...
------------
Outputs from `embedding_retriever`.
Score: 0.6933103186685945
Content: The construction of the Hanging Gardens has also been attributed to the legendary queen Semiramis[4] and they have been called the Hanging Gardens of Semiramis as an alternative name.[5]
The Hanging Gardens are the only one of the Seven Wonders for which the location has not been definitively established.[6] There are no extant Babylonian texts that mention the gardens, and no definitive archaeological evidence has been found in Babylon.[7][8] Three theories have been suggested to account for th...
------------
Outputs from `joiner`.
Score: 0.9919354838709679
Content: [21] However, the gardens were said to still exist at the time that later writers described them, and some of these accounts are regarded as deriving from people who had visited Babylon.[2] Herodotus, who describes Babylon in his Histories, does not mention the Hanging Gardens,[22] although it could be that the gardens were not yet well known to the Greeks at the time of his visit.[2]
To date, no archaeological evidence has been found at Babylon for the Hanging Gardens.[6] It is possible that ev...
------------
LLM Response:
The Hanging Gardens of Babylon are said to have been located in the ancient city of Babylon, near present-day Hillah in the Babil province of Iraq. However, the exact location has not been definitively established, and there is no archaeological evidence confirming their existence in Babylon. Some theories suggest that they could have been confused with the gardens built by the Assyrian king Sennacherib in his capital city of Nineveh, near modern-day Mosul.

顺序执行 vs. 并发执行

现在,让我们比较一下多个查询的顺序执行和并发执行。我们创建了两个运行问题列表的实用函数。两者都使用 AsyncPipeline,但只有一个将每个问题作为协程运行。

import asyncio

def sequential_execution(pipeline: AsyncPipeline, questions: list[str]):
    results = []
    for question in examples:
        inputs = {
            "text_embedder": {"text": question},
            "bm25_retriever": {"query": question},
            "prompt_builder": {"question": question},
        }
        
        results.append(pipeline.run(data=inputs))
    
    return results

async def concurrent_execution(pipeline: AsyncPipeline, questions: list[str]):
    tasks = [pipeline.run_async(data={
        "text_embedder": {"text": question},
        "bm25_retriever": {"query": question},
        "prompt_builder": {"question": question},
    }) for question in questions]
    
    results = await asyncio.gather(*tasks)
    
    return results

我们使用 3 个示例运行 pipeline。

examples = [
    "Where is Gardens of Babylon?",
    "Why did people build Great Pyramid of Giza?",
    "What does Rhodes Statue look like?",
]

让我们先顺序运行问题。

import time

start = time.time()
results = sequential_execution(async_rag_pipeline, examples)
end = time.time()
total_time = end - start
print(f"All tasks completed in {total_time:.2f} seconds")
All tasks completed in 8.48 seconds

让我们检查一下如果并发运行问题需要多长时间。

start = time.time()
results = await concurrent_execution(async_rag_pipeline, examples)
end = time.time()
total_time = end - start
print(f"All tasks completed in {total_time:.2f} seconds")
Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Batches: 100%|██████████| 1/1 [00:00<00:00,  2.11it/s]


All tasks completed in 3.57 seconds

您可以看到,并发执行 pipeline 比顺序执行快一倍以上。

并发组件执行

上面的示例正在并发运行检索器组件。任何可以并发运行的组件,通常因为它们位于 pipeline 的并行分支上,都会被 AsyncPipeline 的运行逻辑自动调度为并发运行。

让我们创建一个包含自定义组件的小示例,以更详细地说明并发执行。

from haystack import component

@component
class WaitingComponent:
    """
    A test component that simulates async operations by waiting for a specified time
    before returning a message.

    ### Usage example
    ```python
    test_comp = AsyncTestComponent(name="TestComponent", wait_time=2)

    # Sync usage
    result = test_comp.run(user_msg="Hello")
    print(result["message"])  # prints after 2 seconds

    # Async usage
    result = await test_comp.run_async(user_msg="Hello")
    print(result["message"])  # prints after 2 seconds
    ```
    """

    def __init__(self, name: str, wait_time: int = 1):
        self.name = name
        self.wait_time = wait_time

    @component.output_types(message=str)
    def run(self, user_msg: str) -> dict:
        """
        Synchronous method that waits for the specified time and returns a message.

        :param user_msg: Input message from the user (unused in output but required for example)
        :return: Dictionary containing the output message
        """
        print(f"Component {self.name} starts running...")
        time.sleep(self.wait_time)
        print(f"Component {self.name} is done!")
        return {"message": f"Message from {self.name}"}


wait_1 = WaitingComponent(name="wait_1", wait_time=1)
wait_2 = WaitingComponent(name="wait_2", wait_time=2)
wait_3 = WaitingComponent(name="wait_3", wait_time=3)
wait_4 = WaitingComponent(name="wait_4", wait_time=4)
wait_5 = WaitingComponent(name="wait_5", wait_time=5)

pp = AsyncPipeline()


pp.add_component("wait_1", wait_1)
pp.add_component("wait_2", wait_2)
pp.add_component("wait_3", wait_3)
pp.add_component("wait_4", wait_4)
pp.add_component("wait_5", wait_5)

pp.connect("wait_1", "wait_2")
pp.connect("wait_3", "wait_4")

pp.show()

您可以看到这个 pipeline 有 3 个并行分支。让我们运行这个 pipeline 来看看它是如何并发执行组件的。

async for output in pp.run_async_generator({"user_msg": "Hello"}, include_outputs_from=["wait_1", "wait_2", "wait_3", "wait_4", "wait_10"]):
    if len(output.keys()) == 1:
        print(output)
{'wait_1': {'message': 'Message from wait_1'}}
{'wait_3': {'message': 'Message from wait_3'}}
{'wait_2': {'message': 'Message from wait_2'}}
{'wait_5': {'message': 'Message from wait_5'}}
{'wait_4': {'message': 'Message from wait_4'}}

自定义异步组件

单个组件可以通过实现一个 run_async 协程来选择支持异步,该协程具有与 run 方法相同的签名(即输入参数和输出)。设置此限制是为了确保无论组件是否支持异步执行,pipeline 的连接方式都相同,从而允许与现有 pipeline 进行即插即用式的向后兼容。

from typing import Dict, Any
from haystack import component

@component
class MyCustomComponent:
    def __init__(self, my_custom_param: str):
        self.my_custom_param = my_custom_param

    @component.output_types(original=str, concatenated=str)
    def run(self, input: str) -> Dict[str, Any]:
        return {
            "original": input,
            "concatenated": input + self.my_custom_param
        }

    async def do_io_bound_op(self, input: str) -> str:
        # Do some IO-bound operation here
        return input + self.my_custom_param

    @component.output_types(original=str, concatenated=str)
    async def run_async(self, input: str) -> Dict[str, Any]:
        return {
            "original": input,
            "concatenated": await self.do_io_bound_op(input)
        }