📘 **TELUS Agriculture & Consumer Goods** 如何通过 **Haystack Agents** 转变促销交易

使用 Ray Serve 在生产环境中运行 Haystack Pipelines

一篇关于使用 Ray Serve 扩展 Haystack pipelines 以用于生产环境的指南

Ray Serve 是一个构建在线推理 API 的基于 Ray 框架的库。Serve 被设计成与框架无关,并且其简单的设计可以让你快速集成几乎任何需要部署的 Python 逻辑,但构建复杂的推理服务仍然是可能且直接的。

Haystack pipelines 可能非常复杂,但即使是更简单的 pipeline 也由多个组件组成,这些组件又可能依赖于不同的模型和技术——这一点使得它们成为测试 Ray Serve 功能的良好基准。

服务一个 Retriever - Reader QA pipeline

准备一组文档

让我们从本地部署一个简单但并非微不足道的 Haystack pipeline 开始:一个在现有文档集上工作的问答系统。为了简单起见,我们将使用一个运行 Elasticsearch 并填充了关于国家和首都的文档集的 Docker 镜像,这些文档可以立即进行查询。所以在我们开始之前,先运行这个镜像

docker run -p 9200:9200 -p 9300:9300 -d deepset/elasticsearch-countries-and-capitals:latest

Elasticsearch 实例将通过端口 9200 接受来自 localhost 的连接。

运行 Haystack pipeline

Haystack pipelines 可以通过 Python 代码或 yaml 定义 来定义,yaml 定义最终会被透明地转换为 Python 代码。我们将使用后一种格式,所以打开一个编辑器并将以下定义保存在一个名为 pipeline.yml 的文件中

version: ignore

# define all the building-blocks for a Pipeline
components:    
  - name: DocumentStore
    type: ElasticsearchDocumentStore
    params:
      host: localhost

  - name: Retriever
    type: DensePassageRetriever
    params:
      document_store: DocumentStore
      top_k: 5

  - name: Reader
    type: FARMReader
    params:
      model_name_or_path: deepset/roberta-base-squad2
      context_window_size: 1000
      return_no_answer: true

pipelines:
  - name: query
    nodes:
    - name: Retriever
      inputs: [Query]
    - name: Reader
      inputs: [Retriever]

为了执行查询,我们需要一些 Python 代码来读取上面的 yaml 配置并告诉 Haystack 运行生成的 pipeline

from pathlib import Path

from haystack import Pipeline

config = Path("pipeline.yml")
pipeline = Pipeline.load_from_yaml(config, "query")
answer = pipeline.run("What is the capital of France?")

print(f"Answer: {answer}")

正如你所见,每次我们想进行一个问题时,都需要重新构建 pipeline 并调用 run 方法——让我们将其变成一个推理服务并通过 HTTP 查询它。

通过 Ray Serve 运行 Haystack pipeline

从调用者的角度来看,Ray Serve 部署看起来与常规 HTTP 服务器完全相同,并且为了提供“入口”端点,最基本的要求是编写一个类似于以下的 Python 类

@serve.deployment
class MinimalIngress:
  async def __call__(self, request: Request) -> str:
      name = await request.json()["name"]
      return f"Hello {name}"

存在一个 async def __call__ 方法是我们为了启动 Ray Serve 部署需要满足的唯一契约。为了创建一个运行 Haystack pipeline 的部署,让我们创建一个名为 capitals.py 的文件并添加以下代码

from pathlib import Path

from ray import serve
from starlette.requests import Request
from haystack import Pipeline

@serve.deployment
class HaystackQA:
    def __init__(self):
        # Load the pipeline from file and store it in self._pipeline
        # so we can reuse it every time __call__ is invoked.
        config = Path("pipeline.yml")
        self._pipeline = Pipeline.load_from_yaml(config, "query")

    async def __call__(self, request: Request) -> str:
        query: str = str(await request.body())

        res = self._pipeline.run(query=query)
        answers = res.get("answers", [])
        if answers:
            # Return the first answer
            return answers[0].answer
        return ""

haystack_deployment = HaystackQA.bind()

关于上面的代码有几点需要注意

  • 我们添加了一个 __init__ 方法,我们在其中为部署的整个生命周期初始化 pipeline 一次。
  • 每个用 @serve.deployment 装饰的类都有一个 bind 方法,它告诉 Ray Serve 将我们的 HaystackQA 类附加到一个我们命名为 haystack_deployment 的部署对象上。稍后,当我们启动服务器进程时,Ray Serve 将会拾取这个变量的值。

Python 文件 capitals.py 的内容现在是我们“服务”部署所需的所有内容。Ray Serve 可以直接从 Python 运行,但在这个例子中,我们将展示如何从命令行启动进程,这更接近于我们在生产环境中会做的事情。从激活了包含 ray 的 Python 环境的 shell 中,你可以直接运行

serve run capitals:haystack_deployment

你应该会在 shell 中看到几行日志滚动,如果一切顺利,最后的消息 Deployed Serve app successfully. 将会告诉我们部署已准备好接受连接——我们现在可以进行查询了。我们将使用 curl,但实际上任何 HTTP 客户端都会以相同的方式工作。从命令行

curl -X POST -d "What is the capital of Italy?" https://:8000 

第一次收到响应可能需要一些时间,因为 Haystack 会在本地下载模型,但最终你应该会看到响应 Rome

扩展部署

到目前为止我们所做的与将 pipeline.run() 调用放在 REST API 后面没有太大区别,但当涉及到将我们的 pipeline 投入生产时,Ray Serve 就显示出其优势了。例如,假设我们想水平扩展我们的 HaystackQA 以同时处理多个请求。我们只需要像这样修改 Python 代码

@serve.deployment(num_replicas=3)  # this is the only line to change!
class HaystackQA:
    def __init__(self):
			...

我们现在可以停止服务器并使用相同的 serve run 命令重新启动它:就是这样,我们现在有 3 个 HaystackQA 部署实例,正如你在运行于 http://127.0.0.1:8265/#/actors 的 Ray 控制面板上看到的。

万一我们想根据当前流量调整部署数量以优化成本,Ray Serve 提供了开箱即用的自动伸缩功能。我们只需要像这样再次修改 @serve.deployment 装饰器

@serve.deployment(
    autoscaling_config={
        "min_replicas": 1,
        "initial_replicas": 2,
        "max_replicas": 5,
        "downscale_delay_s": 30,
    }
)
class HaystackQA:
    def __init__(self):
			...

autoscaling_config 参数是不言自明的,而 文档 在详细介绍其背后的逻辑方面非常有帮助。在这种情况下,我们告诉 Ray Serve 我们想将部署扩展到最多 5 个副本,并至少缩减到 1 个。我们还想从 2 个副本开始,并且在 30 秒没有活动时进行缩减。再次,你可以通过启动服务器并查看 http://127.0.0.1:8265/#/actors 上的仪表板来观察这一切是如何在后台工作的。

下一步

本文介绍的部署策略对于许多用例来说都是可以的,从尝试 Haystack 到演示应用程序和小 POC,但细心的读者会注意到扩展整个 Haystack pipeline 的限制:pipeline 的某些节点可能比其他节点更受益于水平扩展和冗余,这是否可以用 Ray Serve 实现?答案是肯定的,我们将在即将发布的文章中介绍如何将一个 pipeline 分割成多个部署,然后这些部署可以由 Ray Serve 以不同的策略进行管理。