📘 **TELUS Agriculture & Consumer Goods** 如何通过 **Haystack Agents** 转变促销交易

从查询中提取元数据过滤器


笔记本作者:David Batista

这是 **高级用例** 系列的第一部分

1️⃣ 从查询中提取元数据以改进检索和完整文章

2️⃣ 查询扩展 食谱完整文章

3️⃣ 查询分解食谱完整文章

4️⃣ 自动化元数据丰富

在此笔记本中,我们将讨论如何实现一个自定义组件 QueryMetadataExtractor,该组件从查询中提取实体并构建相应的元数据过滤器。

有用资源

设置开发环境

!pip install haystack-ai
!pip install sentence-transformers

输入您的 OPENAI_API_KEY。在此获取您的 OpenAI API 密钥

import os
from getpass import getpass

if "OPENAI_API_KEY" not in os.environ:
    os.environ["OPENAI_API_KEY"] = getpass("Enter OpenAI API key:")
Enter OpenAI API key:··········

实现QueryMetadataExtractor

创建一个自定义组件 QueryMetadataExtractor,它接收 querymetadata_fields 作为输入并输出 filters。该组件封装了一个生成式流水线,由 PromptBuilderOpenAIGenerator 组成。该流水线指示 LLM 从给定查询中提取关键词、短语或实体,然后这些实体可用作元数据过滤器。在提示中,我们包含指令以确保输出格式为 JSON,并提供 metadata_fieldsquery 以确保从查询中提取正确的实体。

一旦流水线在组件的 init 方法中初始化,我们将在 run 方法中后处理 LLM 的输出。此步骤确保提取的元数据格式正确,可以作为元数据过滤器使用。

import json
from typing import Dict, List

from haystack import Pipeline, component
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator

@component()
class QueryMetadataExtractor:

    def __init__(self):
        prompt = """
        You are part of an information system that processes users queries.
        Given a user query you extract information from it that matches a given list of metadata fields.
        The information to be extracted from the query must match the semantics associated with the given metadata fields.
        The information that you extracted from the query will then be used as filters to narrow down the search space
        when querying an index.
        Just include the value of the extracted metadata without including the name of the metadata field.
        The extracted information in 'Extracted metadata' must be returned as a valid JSON structure.
        ###
        Example 1:
        Query: "What was the revenue of Nvidia in 2022?"
        Metadata fields: {"company", "year"}
        Extracted metadata fields: {"company": "nvidia", "year": 2022}
        ###
        Example 2:
        Query: "What were the most influential publications in 2023 regarding Alzheimer's disease?"
        Metadata fields: {"disease", "year"}
        Extracted metadata fields: {"disease": "Alzheimer", "year": 2023}
        ###
        Example 3:
        Query: "{{query}}"
        Metadata fields: "{{metadata_fields}}"
        Extracted metadata fields:
        """
        self.pipeline = Pipeline()
        self.pipeline.add_component(name="builder", instance=PromptBuilder(prompt))
        self.pipeline.add_component(name="llm", instance=OpenAIGenerator(model="gpt-4o-mini"))
        self.pipeline.connect("builder", "llm")

    @component.output_types(filters=Dict[str, str])
    def run(self, query: str, metadata_fields: List[str]):
        result = self.pipeline.run({'builder': {'query': query, 'metadata_fields': metadata_fields}})
        metadata = json.loads(result['llm']['replies'][0])

        # this can be done with specific data structures and in a more sophisticated way
        filters = []
        for key, value in metadata.items():
            field = f"meta.{key}"
            filters.append({f"field": field, "operator": "==", "value": value})

        return {"filters": {"operator": "AND", "conditions": filters}}

首先,让我们单独测试 QueryMetadataExtractor,传入一个查询和一组元数据字段。

extractor = QueryMetadataExtractor()

query = "What were the most influential publications in 2022 regarding Parkinson's disease?"
metadata_fields = {"disease", "year"}

result = extractor.run(query, metadata_fields)
print(result)
{'filters': {'operator': 'AND', 'conditions': [{'field': 'meta.year', 'operator': '==', 'value': 2022}, {'field': 'meta.disease', 'operator': '==', 'value': 'Parkinson'}]}}

请注意,QueryMetadataExtractor 已从查询中提取了元数据字段,并以可直接传递给 Retriever 的格式返回。默认情况下,QueryMetadataExtractor 将所有元数据字段都作为条件,并使用 AND 运算符。

在 Pipeline 中使用 QueryMetadataExtractor

现在,让我们将 QueryMetadataExtractor 插入到 Pipeline 中,该 PipelineRetriever 连接到 DocumentStore,以实际了解其工作原理。

我们首先创建一个 InMemoryDocumentStore 并向其中添加一些文档。我们在每个文档的 “meta” 字段中包含有关 “year” 和 “disease” 的信息。

from haystack import Document
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.document_stores.types import DuplicatePolicy

documents = [
    Document(
        content="some publication about Alzheimer prevention research done over 2023 patients study",
        meta={"year": 2022, "disease": "Alzheimer", "author": "Michael Butter"}),
    Document(
        content="some text about investigation and treatment of Alzheimer disease",
        meta={"year": 2023, "disease": "Alzheimer", "author": "John Bread"}),
    Document(
        content="A study on the effectiveness of new therapies for Parkinson's disease",
        meta={"year": 2022, "disease": "Parkinson", "author": "Alice Smith"}
    ),
    Document(
        content="An overview of the latest research on the genetics of Parkinson's disease and its implications for treatment",
        meta={"year": 2023, "disease": "Parkinson", "author": "David Jones"}
    )
]

document_store = InMemoryDocumentStore(bm25_algorithm="BM25Plus")
document_store.write_documents(documents=documents, policy=DuplicatePolicy.OVERWRITE)
4

然后,我们创建一个包含 QueryMetadataExtractor 和连接到上述 InMemoryDocumentStoreInMemoryBM25Retriever 的 Pipeline。

Docs: Creating Pipelines 中了解连接组件和创建 Pipeline 的信息。

from haystack import Pipeline, Document
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever


retrieval_pipeline = Pipeline()
metadata_extractor = QueryMetadataExtractor()
retriever = InMemoryBM25Retriever(document_store=document_store)

retrieval_pipeline.add_component(instance=metadata_extractor, name="metadata_extractor")
retrieval_pipeline.add_component(instance=retriever, name="retriever")
retrieval_pipeline.connect("metadata_extractor.filters", "retriever.filters")
<haystack.core.pipeline.pipeline.Pipeline object at 0x789b1bba1900>
🚅 Components
  - metadata_extractor: LLMMetadataQueryExtractor
  - retriever: InMemoryBM25Retriever
🛤️ Connections
  - metadata_extractor.filters -> retriever.filters (Dict[str, str])

现在定义一个查询和元数据字段,并将它们传递给 Pipeline

query = "publications 2023 Alzheimer's disease"
metadata_fields = {"year", "author", "disease"}

retrieval_pipeline.run(data={"metadata_extractor": {"query": query, "metadata_fields": metadata_fields}, "retriever":{"query": query}})
Ranking by BM25...:   0%|          | 0/1 [00:00<?, ? docs/s]





{'retriever': {'documents': [Document(id=e3b0bfd497a9f83397945583e77b293429eb5bdead5680cc8f58dd4337372aa3, content: 'some text about investigation and treatment of Alzheimer disease', meta: {'year': 2023, 'disease': 'Alzheimer', 'author': 'John Bread'}, score: 2.772588722239781)]}}