📘 **TELUS Agriculture & Consumer Goods** 如何通过 **Haystack Agents** 转变促销交易

使用 KEDA 和 Haystack 扩展 NLP 索引管道 — 第二部分:部署

通过 LLM 实现检索增强生成问答

在本系列文章的第一部分,我们讨论了检索增强生成的强大功能。我们还通过 Haystack 管道探索了如何创建一个将文件转换为带有嵌入的搜索文档的 Python 应用程序。但是,仅仅拥一个能在单机上将文件转换为文本片段和嵌入的 Python 程序,对于生产级部署来说是远远不够的。

在本部分,我们将探讨如何将索引使用者部署到 Kubernetes,以及如何使用 KEDA 对其进行自动扩展。这将使我们能够有效地向向量数据库添加文本和嵌入,从而为检索增强型 LLM 搜索引擎提供支持

我们将使用第一部分所述的架构,该架构涉及将待索引的文件排队到 AWS SQS,并使用 Haystack 管道并行消费这些文件。让我们开始部署和扩展吧!

在 Kubernetes 上部署并使用 KEDA 进行扩展

在本节中,我们将学习如何在 Kubernetes 上设置 KEDA,并配置自动扩展,以根据 Kubernetes 中待处理文件的数量来扩展我们的使用者。我们将使用以下工具:

  • k3d 用于创建本地 Kubernetes 集群以部署我们的使用者
  • KEDA 用于部署后扩展使用者
  • localstack 作为本地 AWS 云栈,用于测试我们的应用程序

为了与我们的本地 Kubernetes 集群通信,我们将使用 kubectl

⚠️ 某些类型的索引管道需要 GPU。这尤其适用于在同一台机器上生成嵌入(而不是通过外部推理服务)或管道内使用模型的情况。在这种情况下,Kubernetes 集群中必须有 GPU pod 来运行模型。

安装和设置

在开始部署和扩展我们的应用程序之前,我们需要设置一个本地 Kubernetes 集群并部署一系列服务。

创建本地 Kubernetes 集群

首先,使用 k3d 创建一个名为 haystack-keda-cluster 的新 Kubernetes 集群。

k3d cluster create haystack-keda-cluster  
# check the the status via: kubectl cluster-info

接下来,我们将创建一个名为 indexing 的命名空间,用于部署我们的服务。

kubectl create namespace indexing

安装服务 — KEDA 和 LocalStack

要设置 LocalStack,请添加 Helm chart 并将 LocalStack 安装在 indexing 命名空间中。

helm repo add localstack https://localstack.github.io/helm-charts  
helm install localstack localstack/localstack --namespace indexing

我们将对 KEDA 执行相同的步骤。

helm repo add kedacore https://kedacore.github.io/charts  
helm install keda kedacore/keda --namespace indexing

我们可以通过运行 indexing kubectl get pods -n indexing 来验证设置。

创建 SQS 队列和 S3 存储桶

我们的索引使用者将连接到 LocalStack 上的队列并从 S3 下载文件。因此,在部署应用程序之前,我们需要创建必要的资源。

为了创建队列和存储桶,我们将使用与我们在 Docker Compose 开发环境中使用过的相同的 shell 脚本。要从容器内部运行脚本,请将脚本通过管道传递给 kubectl exec 命令。

cat ./scripts/sqs_bucket_bootstrap.sh | kubectl exec -i -n indexing deployment/localstack -- /bin/bash

如果我们通过 kubectl logs -f deployment/localstack -c localstack -n indexing 获取日志,应该会看到队列和存储桶已被创建。

2023-04-22T15:19:34.166  INFO --- [   asgi_gw_1] localstack.request.aws     : AWS sqs.CreateQueue => 200  
2023-04-22T15:19:34.533  INFO --- [   asgi_gw_0] localstack.request.aws     : AWS s3.CreateBucket => 200

部署索引使用者

现在我们已经将 LocalStack 和 KEDA 部署到我们的 Kubernetes 集群,我们可以开始部署我们的索引使用者了。索引使用者通过使用部署文件 deployment-consumer.yaml 来部署为 Kubernetes deployments。

# deployment-consumer.yaml  
# link to file: <https://github.com/ArzelaAscoIi/haystack-keda-indexing/tree/main/kubernetes>  
  
kind: Deployment  
apiVersion: apps/v1  
metadata:  
  name: indexing-consumer  
  labels:  
    k8s-app: indexing-consumer  
spec:  
  # we want to start with 0 replicas and scale up on demand  
  replicas: 0  
  selector:  
    matchLabels:  
      k8s-app: indexing-consumer  
  template:  
    metadata:  
      name: indexing-consumer  
      labels:  
        k8s-app: indexing-consumer  
    spec:  
      containers:  
        - name: indexing-consumer  
          command: ["python3", "consumer.py"]  
          # public consumer image  
          image: arzelaascoli/keda-haystack-consumer:latest  
          env:  
            # localstack configuration  
            - name: AWS_ENDPOINT  
              value: http://localstack:4566  
            - name: AWS_REGION  
              value: eu-central-1  
            - name: AWS_ACCESS_KEY_ID  
              value: test  
            - name: AWS_SECRET_ACCESS_KEY  
              value: test  
          # Resource estimations  
          # TODO: adjust these to our needs and the load we expect  
          resources:  
            requests:  
              memory: 1000Mi  
              cpu: 750m  
            limits:  
              memory: 2500Mi  
              cpu: 2000m

我们可以使用 kubectl 将此 YAML 应用到我们的 indexing 命名空间。

kubectl apply -f ./kubernetes/deployment-consumer.yaml --namespace indexing

为了验证使用者能否成功启动并连接到队列,我们可以将副本数量扩展到 1,并检查正在运行的 pod。

# scale deployment   
kubectl scale deployment indexing-consumer --namespace=indexing --replicas=1  
# get pods   
kubectl get pods -n indexing  
# check logs   
kubectl logs -f deployment/indexing-consumer -c indexing-consumer -n indexing

系统将日志显示未找到要处理的文件。

│ 2023-04-23 15:43:14 [info     ] No files to process                                                                                                       │  
│ 2023-04-23 15:43:19 [info     ] No files to process                                                                                                       │  
│ 2023-04-23 15:43:24 [info     ] No files to process                                                                                                       │  
│ 2023-04-23 15:43:29 [info     ] No files to process

接下来,我们将设置基于 SQS 队列长度的自动扩展,以实现自动扩展并在没有待处理文件时缩减到零。

基于队列长度配置自动扩展

在成功创建所有必需的文件索引服务后,我们现在可以配置 KEDA 来检查队列长度并相应地扩展索引使用者。

要设置 SQS 上的 KEDA 触发器,我们需要通过创建 Kubernetes 密钥对象来配置身份验证。

# secrets-localstack.yaml  
# link to file: <https://github.com/ArzelaAscoIi/haystack-keda-indexing/blob/main/kubernetes/keda/secrets-localstack.yaml>  
  
apiVersion: v1  
kind: Secret  
metadata:  
  name: aws-secrets  
  namespace: indexing  
data:  
  AWS_ACCESS_KEY_ID: dGVzdA== # base64 encoded string for "test"  
  AWS_SECRET_ACCESS_KEY: dGVzdA== # base64 encoded string for "test"

该密钥随后通过 TriggerAuthentication 对象映射到 KEDA,KEDA 将使用基于凭证的身份验证

# trigger-authentication.yaml  
# link to file: <https://github.com/ArzelaAscoIi/haystack-keda-indexing/blob/main/kubernetes/keda/trigger-authentication.yaml>  
  
apiVersion: keda.sh/v1alpha1  
kind: TriggerAuthentication  
metadata:  
  name: keda-trigger-auth-aws-credentials  
  namespace: indexing  
spec:  
  secretTargetRef:  
    - parameter: awsAccessKeyID # Required.  
      name: aws-secrets # Required.  
      key: AWS_ACCESS_KEY_ID # Required.  
    - parameter: awsSecretAccessKey # Required.  
      name: aws-secrets # Required.  
      key: AWS_SECRET_ACCESS_KEY # Required.

现在操作员被允许访问 LocalStack 资源,我们可以创建一个带有 aws-sqs-queue触发器的 scaled object。

apiVersion: keda.sh/v1alpha1  
kind: ScaledObject  
metadata:  
  name: indexing-consumer-scaled-object  
spec:  
  scaleTargetRef:  
    apiVersion: apps/v1  
    kind: Deployment  
    name: indexing-consumer # Mandatory. Must be in the same namespace as the ScaledObject  
  pollingInterval: 30  
  cooldownPeriod: 300  
  idleReplicaCount: 0  
  minReplicaCount: 0  
  maxReplicaCount: 2  
  fallback: # defines a number of replicas to fallback to if a scaler is in an error state.  
    failureThreshold: 3  
    replicas: 4  
  triggers:  
    - type: aws-sqs-queue  
      authenticationRef:  
        name: keda-trigger-auth-aws-credentials  
      metadata:  
        # KEDA will use the value of the environment variable of the `consumer-file-ingestion` containers  
        queueURL: https://:4566/000000000000/test-queue  
        queueLength: "10" # Should roughly equal the number of messages that can be processed in 1 minute  
        awsRegion: "eu-central-1"  
        awsEndpoint: "http://localstack:4566"  
        scaleOnInFlight: "false" # Exclude in-flight messages from the queue length calculation

在通过 kubectl apply --f ./kubernetes/keda --namespace indexing 应用这三个 YAML 文件后,我们可以转发端口以允许上传文件到 LocalStack。

 kubectl port-forward deployment/localstack 4566:4566 -n indexing

我们可以使用上传脚本来添加一个文件 test.txt,运行 python3 upload.py

# upload.py   
# link to file: <https://github.com/ArzelaAscoIi/haystack-keda-indexing/blob/main/upload.py>  
aws_service = AWSService(SQS_QUEUE, S3_BUCKET, LOCAL_DOWNLOAD_DIR)  
aws_service.upload_file(Path("./data/test.txt"))

一旦文件成功上传并排队,KEDA 将负责将部署从 0 个副本扩展到 1 个副本。Kubernetes 将列出一个 indexing-consumer pod。

NAME                                               READY   STATUS                
localstack-8fc647d9d-xkrsk                         1/1     Running               
keda-operator-metrics-apiserver-7bcfdd7c9b-7pbkp   1/1     Running              
keda-operator-6857fbc758-xtc44                     1/1     Running               
keda-admission-webhooks-59978445df-q85jr           1/1     Running              
indexing-consumer-656d98db6f-psz6q                 0/1     ContainerCreating 

启动后,文件将被获取并从队列中移除,然后进行索引。

结论和后续步骤

本文介绍了如何创建一个可扩展的应用程序,将文本和 PDF 文件转换为包含文本和嵌入的文档。KEDA 允许按需扩展每个应用程序。通过这种简单的架构,我们能够水平扩展嵌入的创建。

KEDA 有什么用? — KEDA 使我们能够基于队列长度扩展使用者。替代方案是使用基于 CPU 使用率的水平 pod 自动扩展,这将在从队列中获取元素后触发。但是,这种方法不允许缩减到零。由于这些任务需要 GPU,一个持续运行的空闲机器可能很昂贵。

如何在没有 k3d 的情况下部署? — 本教程适用于任何 Kubernetes 集群。只需按照提供的说明操作即可。

我需要什么资源? — 在生产环境中运行此程序时,集群需要 GPU 节点,这可能需要额外的配置。

有没有简单的方法来部署多个管道? — 在下一篇文章中,我将分享一篇关于如何使用用 Python 编写的 Kubernetes Operator Framework(Kopf)来动态创建这些资源的教程。