使用 KEDA 和 Haystack 扩展 NLP 索引管道 — 第二部分:部署
通过 LLM 实现检索增强生成问答
2023 年 5 月 1 日在本系列文章的第一部分,我们讨论了检索增强生成的强大功能。我们还通过 Haystack 管道探索了如何创建一个将文件转换为带有嵌入的搜索文档的 Python 应用程序。但是,仅仅拥一个能在单机上将文件转换为文本片段和嵌入的 Python 程序,对于生产级部署来说是远远不够的。
在本部分,我们将探讨如何将索引使用者部署到 Kubernetes,以及如何使用 KEDA 对其进行自动扩展。这将使我们能够有效地向向量数据库添加文本和嵌入,从而为检索增强型 LLM 搜索引擎提供支持。
我们将使用第一部分所述的架构,该架构涉及将待索引的文件排队到 AWS SQS,并使用 Haystack 管道并行消费这些文件。让我们开始部署和扩展吧!
在 Kubernetes 上部署并使用 KEDA 进行扩展
在本节中,我们将学习如何在 Kubernetes 上设置 KEDA,并配置自动扩展,以根据 Kubernetes 中待处理文件的数量来扩展我们的使用者。我们将使用以下工具:
- k3d 用于创建本地 Kubernetes 集群以部署我们的使用者
- KEDA 用于部署后扩展使用者
- localstack 作为本地 AWS 云栈,用于测试我们的应用程序
为了与我们的本地 Kubernetes 集群通信,我们将使用 kubectl。
⚠️ 某些类型的索引管道需要 GPU。这尤其适用于在同一台机器上生成嵌入(而不是通过外部推理服务)或管道内使用模型的情况。在这种情况下,Kubernetes 集群中必须有 GPU pod 来运行模型。
安装和设置
在开始部署和扩展我们的应用程序之前,我们需要设置一个本地 Kubernetes 集群并部署一系列服务。
创建本地 Kubernetes 集群
首先,使用 k3d 创建一个名为 haystack-keda-cluster 的新 Kubernetes 集群。
k3d cluster create haystack-keda-cluster
# check the the status via: kubectl cluster-info
接下来,我们将创建一个名为 indexing 的命名空间,用于部署我们的服务。
kubectl create namespace indexing
安装服务 — KEDA 和 LocalStack
要设置 LocalStack,请添加 Helm chart 并将 LocalStack 安装在 indexing 命名空间中。
helm repo add localstack https://localstack.github.io/helm-charts
helm install localstack localstack/localstack --namespace indexing
我们将对 KEDA 执行相同的步骤。
helm repo add kedacore https://kedacore.github.io/charts
helm install keda kedacore/keda --namespace indexing
我们可以通过运行 indexing kubectl get pods -n indexing 来验证设置。
创建 SQS 队列和 S3 存储桶
我们的索引使用者将连接到 LocalStack 上的队列并从 S3 下载文件。因此,在部署应用程序之前,我们需要创建必要的资源。
为了创建队列和存储桶,我们将使用与我们在 Docker Compose 开发环境中使用过的相同的 shell 脚本。要从容器内部运行脚本,请将脚本通过管道传递给 kubectl exec 命令。
cat ./scripts/sqs_bucket_bootstrap.sh | kubectl exec -i -n indexing deployment/localstack -- /bin/bash
如果我们通过 kubectl logs -f deployment/localstack -c localstack -n indexing 获取日志,应该会看到队列和存储桶已被创建。
2023-04-22T15:19:34.166 INFO --- [ asgi_gw_1] localstack.request.aws : AWS sqs.CreateQueue => 200
2023-04-22T15:19:34.533 INFO --- [ asgi_gw_0] localstack.request.aws : AWS s3.CreateBucket => 200
部署索引使用者
现在我们已经将 LocalStack 和 KEDA 部署到我们的 Kubernetes 集群,我们可以开始部署我们的索引使用者了。索引使用者通过使用部署文件 deployment-consumer.yaml 来部署为 Kubernetes deployments。
# deployment-consumer.yaml
# link to file: <https://github.com/ArzelaAscoIi/haystack-keda-indexing/tree/main/kubernetes>
kind: Deployment
apiVersion: apps/v1
metadata:
name: indexing-consumer
labels:
k8s-app: indexing-consumer
spec:
# we want to start with 0 replicas and scale up on demand
replicas: 0
selector:
matchLabels:
k8s-app: indexing-consumer
template:
metadata:
name: indexing-consumer
labels:
k8s-app: indexing-consumer
spec:
containers:
- name: indexing-consumer
command: ["python3", "consumer.py"]
# public consumer image
image: arzelaascoli/keda-haystack-consumer:latest
env:
# localstack configuration
- name: AWS_ENDPOINT
value: http://localstack:4566
- name: AWS_REGION
value: eu-central-1
- name: AWS_ACCESS_KEY_ID
value: test
- name: AWS_SECRET_ACCESS_KEY
value: test
# Resource estimations
# TODO: adjust these to our needs and the load we expect
resources:
requests:
memory: 1000Mi
cpu: 750m
limits:
memory: 2500Mi
cpu: 2000m
我们可以使用 kubectl 将此 YAML 应用到我们的 indexing 命名空间。
kubectl apply -f ./kubernetes/deployment-consumer.yaml --namespace indexing
为了验证使用者能否成功启动并连接到队列,我们可以将副本数量扩展到 1,并检查正在运行的 pod。
# scale deployment
kubectl scale deployment indexing-consumer --namespace=indexing --replicas=1
# get pods
kubectl get pods -n indexing
# check logs
kubectl logs -f deployment/indexing-consumer -c indexing-consumer -n indexing
系统将日志显示未找到要处理的文件。
│ 2023-04-23 15:43:14 [info ] No files to process │
│ 2023-04-23 15:43:19 [info ] No files to process │
│ 2023-04-23 15:43:24 [info ] No files to process │
│ 2023-04-23 15:43:29 [info ] No files to process
接下来,我们将设置基于 SQS 队列长度的自动扩展,以实现自动扩展并在没有待处理文件时缩减到零。
基于队列长度配置自动扩展
在成功创建所有必需的文件索引服务后,我们现在可以配置 KEDA 来检查队列长度并相应地扩展索引使用者。
要设置 SQS 上的 KEDA 触发器,我们需要通过创建 Kubernetes 密钥对象来配置身份验证。
# secrets-localstack.yaml
# link to file: <https://github.com/ArzelaAscoIi/haystack-keda-indexing/blob/main/kubernetes/keda/secrets-localstack.yaml>
apiVersion: v1
kind: Secret
metadata:
name: aws-secrets
namespace: indexing
data:
AWS_ACCESS_KEY_ID: dGVzdA== # base64 encoded string for "test"
AWS_SECRET_ACCESS_KEY: dGVzdA== # base64 encoded string for "test"
该密钥随后通过 TriggerAuthentication 对象映射到 KEDA,KEDA 将使用基于凭证的身份验证。
# trigger-authentication.yaml
# link to file: <https://github.com/ArzelaAscoIi/haystack-keda-indexing/blob/main/kubernetes/keda/trigger-authentication.yaml>
apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
name: keda-trigger-auth-aws-credentials
namespace: indexing
spec:
secretTargetRef:
- parameter: awsAccessKeyID # Required.
name: aws-secrets # Required.
key: AWS_ACCESS_KEY_ID # Required.
- parameter: awsSecretAccessKey # Required.
name: aws-secrets # Required.
key: AWS_SECRET_ACCESS_KEY # Required.
现在操作员被允许访问 LocalStack 资源,我们可以创建一个带有 aws-sqs-queue触发器的 scaled object。
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: indexing-consumer-scaled-object
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: indexing-consumer # Mandatory. Must be in the same namespace as the ScaledObject
pollingInterval: 30
cooldownPeriod: 300
idleReplicaCount: 0
minReplicaCount: 0
maxReplicaCount: 2
fallback: # defines a number of replicas to fallback to if a scaler is in an error state.
failureThreshold: 3
replicas: 4
triggers:
- type: aws-sqs-queue
authenticationRef:
name: keda-trigger-auth-aws-credentials
metadata:
# KEDA will use the value of the environment variable of the `consumer-file-ingestion` containers
queueURL: https://:4566/000000000000/test-queue
queueLength: "10" # Should roughly equal the number of messages that can be processed in 1 minute
awsRegion: "eu-central-1"
awsEndpoint: "http://localstack:4566"
scaleOnInFlight: "false" # Exclude in-flight messages from the queue length calculation
在通过 kubectl apply --f ./kubernetes/keda --namespace indexing 应用这三个 YAML 文件后,我们可以转发端口以允许上传文件到 LocalStack。
kubectl port-forward deployment/localstack 4566:4566 -n indexing
我们可以使用上传脚本来添加一个文件 test.txt,运行 python3 upload.py。
# upload.py
# link to file: <https://github.com/ArzelaAscoIi/haystack-keda-indexing/blob/main/upload.py>
aws_service = AWSService(SQS_QUEUE, S3_BUCKET, LOCAL_DOWNLOAD_DIR)
aws_service.upload_file(Path("./data/test.txt"))
一旦文件成功上传并排队,KEDA 将负责将部署从 0 个副本扩展到 1 个副本。Kubernetes 将列出一个 indexing-consumer pod。
NAME READY STATUS
localstack-8fc647d9d-xkrsk 1/1 Running
keda-operator-metrics-apiserver-7bcfdd7c9b-7pbkp 1/1 Running
keda-operator-6857fbc758-xtc44 1/1 Running
keda-admission-webhooks-59978445df-q85jr 1/1 Running
indexing-consumer-656d98db6f-psz6q 0/1 ContainerCreating
启动后,文件将被获取并从队列中移除,然后进行索引。
结论和后续步骤
本文介绍了如何创建一个可扩展的应用程序,将文本和 PDF 文件转换为包含文本和嵌入的文档。KEDA 允许按需扩展每个应用程序。通过这种简单的架构,我们能够水平扩展嵌入的创建。
KEDA 有什么用? — KEDA 使我们能够基于队列长度扩展使用者。替代方案是使用基于 CPU 使用率的水平 pod 自动扩展,这将在从队列中获取元素后触发。但是,这种方法不允许缩减到零。由于这些任务需要 GPU,一个持续运行的空闲机器可能很昂贵。
如何在没有 k3d 的情况下部署? — 本教程适用于任何 Kubernetes 集群。只需按照提供的说明操作即可。
我需要什么资源? — 在生产环境中运行此程序时,集群需要 GPU 节点,这可能需要额外的配置。
有没有简单的方法来部署多个管道? — 在下一篇文章中,我将分享一篇关于如何使用用 Python 编写的 Kubernetes Operator Framework(Kopf)来动态创建这些资源的教程。
