使用 Ray Serve 在生产环境中运行 Haystack Pipelines
一篇关于使用 Ray Serve 扩展 Haystack pipelines 以用于生产环境的指南
2023 年 6 月 14 日Ray Serve 是一个构建在线推理 API 的基于 Ray 框架的库。Serve 被设计成与框架无关,并且其简单的设计可以让你快速集成几乎任何需要部署的 Python 逻辑,但构建复杂的推理服务仍然是可能且直接的。
Haystack pipelines 可能非常复杂,但即使是更简单的 pipeline 也由多个组件组成,这些组件又可能依赖于不同的模型和技术——这一点使得它们成为测试 Ray Serve 功能的良好基准。
服务一个 Retriever - Reader QA pipeline
准备一组文档
让我们从本地部署一个简单但并非微不足道的 Haystack pipeline 开始:一个在现有文档集上工作的问答系统。为了简单起见,我们将使用一个运行 Elasticsearch 并填充了关于国家和首都的文档集的 Docker 镜像,这些文档可以立即进行查询。所以在我们开始之前,先运行这个镜像
docker run -p 9200:9200 -p 9300:9300 -d deepset/elasticsearch-countries-and-capitals:latest
Elasticsearch 实例将通过端口 9200 接受来自 localhost 的连接。
运行 Haystack pipeline
Haystack pipelines 可以通过 Python 代码或 yaml 定义 来定义,yaml 定义最终会被透明地转换为 Python 代码。我们将使用后一种格式,所以打开一个编辑器并将以下定义保存在一个名为 pipeline.yml 的文件中
version: ignore
# define all the building-blocks for a Pipeline
components:
- name: DocumentStore
type: ElasticsearchDocumentStore
params:
host: localhost
- name: Retriever
type: DensePassageRetriever
params:
document_store: DocumentStore
top_k: 5
- name: Reader
type: FARMReader
params:
model_name_or_path: deepset/roberta-base-squad2
context_window_size: 1000
return_no_answer: true
pipelines:
- name: query
nodes:
- name: Retriever
inputs: [Query]
- name: Reader
inputs: [Retriever]
为了执行查询,我们需要一些 Python 代码来读取上面的 yaml 配置并告诉 Haystack 运行生成的 pipeline
from pathlib import Path
from haystack import Pipeline
config = Path("pipeline.yml")
pipeline = Pipeline.load_from_yaml(config, "query")
answer = pipeline.run("What is the capital of France?")
print(f"Answer: {answer}")
正如你所见,每次我们想进行一个问题时,都需要重新构建 pipeline 并调用 run 方法——让我们将其变成一个推理服务并通过 HTTP 查询它。
通过 Ray Serve 运行 Haystack pipeline
从调用者的角度来看,Ray Serve 部署看起来与常规 HTTP 服务器完全相同,并且为了提供“入口”端点,最基本的要求是编写一个类似于以下的 Python 类
@serve.deployment
class MinimalIngress:
async def __call__(self, request: Request) -> str:
name = await request.json()["name"]
return f"Hello {name}"
存在一个 async def __call__ 方法是我们为了启动 Ray Serve 部署需要满足的唯一契约。为了创建一个运行 Haystack pipeline 的部署,让我们创建一个名为 capitals.py 的文件并添加以下代码
from pathlib import Path
from ray import serve
from starlette.requests import Request
from haystack import Pipeline
@serve.deployment
class HaystackQA:
def __init__(self):
# Load the pipeline from file and store it in self._pipeline
# so we can reuse it every time __call__ is invoked.
config = Path("pipeline.yml")
self._pipeline = Pipeline.load_from_yaml(config, "query")
async def __call__(self, request: Request) -> str:
query: str = str(await request.body())
res = self._pipeline.run(query=query)
answers = res.get("answers", [])
if answers:
# Return the first answer
return answers[0].answer
return ""
haystack_deployment = HaystackQA.bind()
关于上面的代码有几点需要注意
- 我们添加了一个
__init__方法,我们在其中为部署的整个生命周期初始化 pipeline 一次。 - 每个用
@serve.deployment装饰的类都有一个bind方法,它告诉 Ray Serve 将我们的HaystackQA类附加到一个我们命名为haystack_deployment的部署对象上。稍后,当我们启动服务器进程时,Ray Serve 将会拾取这个变量的值。
Python 文件 capitals.py 的内容现在是我们“服务”部署所需的所有内容。Ray Serve 可以直接从 Python 运行,但在这个例子中,我们将展示如何从命令行启动进程,这更接近于我们在生产环境中会做的事情。从激活了包含 ray 的 Python 环境的 shell 中,你可以直接运行
serve run capitals:haystack_deployment
你应该会在 shell 中看到几行日志滚动,如果一切顺利,最后的消息 Deployed Serve app successfully. 将会告诉我们部署已准备好接受连接——我们现在可以进行查询了。我们将使用 curl,但实际上任何 HTTP 客户端都会以相同的方式工作。从命令行
curl -X POST -d "What is the capital of Italy?" https://:8000
第一次收到响应可能需要一些时间,因为 Haystack 会在本地下载模型,但最终你应该会看到响应 Rome。
扩展部署
到目前为止我们所做的与将 pipeline.run() 调用放在 REST API 后面没有太大区别,但当涉及到将我们的 pipeline 投入生产时,Ray Serve 就显示出其优势了。例如,假设我们想水平扩展我们的 HaystackQA 以同时处理多个请求。我们只需要像这样修改 Python 代码
@serve.deployment(num_replicas=3) # this is the only line to change!
class HaystackQA:
def __init__(self):
...
我们现在可以停止服务器并使用相同的 serve run 命令重新启动它:就是这样,我们现在有 3 个 HaystackQA 部署实例,正如你在运行于 http://127.0.0.1:8265/#/actors 的 Ray 控制面板上看到的。
万一我们想根据当前流量调整部署数量以优化成本,Ray Serve 提供了开箱即用的自动伸缩功能。我们只需要像这样再次修改 @serve.deployment 装饰器
@serve.deployment(
autoscaling_config={
"min_replicas": 1,
"initial_replicas": 2,
"max_replicas": 5,
"downscale_delay_s": 30,
}
)
class HaystackQA:
def __init__(self):
...
autoscaling_config 参数是不言自明的,而 文档 在详细介绍其背后的逻辑方面非常有帮助。在这种情况下,我们告诉 Ray Serve 我们想将部署扩展到最多 5 个副本,并至少缩减到 1 个。我们还想从 2 个副本开始,并且在 30 秒没有活动时进行缩减。再次,你可以通过启动服务器并查看 http://127.0.0.1:8265/#/actors 上的仪表板来观察这一切是如何在后台工作的。
下一步
本文介绍的部署策略对于许多用例来说都是可以的,从尝试 Haystack 到演示应用程序和小 POC,但细心的读者会注意到扩展整个 Haystack pipeline 的限制:pipeline 的某些节点可能比其他节点更受益于水平扩展和冗余,这是否可以用 Ray Serve 实现?答案是肯定的,我们将在即将发布的文章中介绍如何将一个 pipeline 分割成多个部署,然后这些部署可以由 Ray Serve 以不同的策略进行管理。
