📘 **TELUS Agriculture & Consumer Goods** 如何通过 **Haystack Agents** 转变促销交易

使用记忆进行对话式 RAG


在本笔记本中,我们将探讨如何将内存整合到RAG pipeline中,以实现与文档的对话,我们将使用InMemoryChatMessageStoreChatMessageRetrieverChatMessageWriter

有用资源

安装

使用pip安装Haystack、haystack-experimentaldatasets

!pip install -U haystack-ai datasets

输入OpenAI API密钥

import os
from getpass import getpass

if "OPENAI_API_KEY" not in os.environ:
    os.environ["OPENAI_API_KEY"] = getpass("Enter OpenAI API key:")
Enter OpenAI API key:··········

创建DocumentStore并索引文档

使用seven-wonders数据集创建一个索引

from haystack import Document
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.document_stores.in_memory import InMemoryDocumentStore
from datasets import load_dataset

dataset = load_dataset("bilgeyucel/seven-wonders", split="train")
docs = [Document(content=doc["content"], meta=doc["meta"]) for doc in dataset]

document_store = InMemoryDocumentStore()
document_store.write_documents(documents=docs)
/usr/local/lib/python3.10/dist-packages/huggingface_hub/utils/_auth.py:94: UserWarning: 
The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://hugging-face.cn/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.
  warnings.warn(



README.md:   0%|          | 0.00/46.0 [00:00<?, ?B/s]



(…)-00000-of-00001-4077bd623d55100a.parquet:   0%|          | 0.00/119k [00:00<?, ?B/s]



Generating train split:   0%|          | 0/151 [00:00<?, ? examples/s]





151

创建Memory

内存,即对话历史,被保存为InMemoryChatMessageStore中的ChatMessage对象。在需要时,您可以使用ChatMessageRetriever从聊天消息存储中检索对话历史。

要存储内存,请初始化一个InMemoryChatMessageStore、一个ChatMessageRetriever和一个ChatMessageWriter。从haystack-experimental包导入这些组件。

from haystack_experimental.chat_message_stores.in_memory import InMemoryChatMessageStore
from haystack_experimental.components.retrievers import ChatMessageRetriever
from haystack_experimental.components.writers import ChatMessageWriter

# Memory components
memory_store = InMemoryChatMessageStore()
memory_retriever = ChatMessageRetriever(memory_store)
memory_writer = ChatMessageWriter(memory_store)

带内存的RAG的Prompt模板

准备一个用于RAG的prompt模板,并额外添加一个用于内存的部分。内存信息将由ChatMessageRetrieverInMemoryChatMessageStore中检索,并通过memories prompt变量注入到prompt中。

from haystack.dataclasses import ChatMessage

system_message = ChatMessage.from_system("You are a helpful AI assistant using provided supporting documents and conversation history to assist humans")

user_message_template ="""Given the conversation history and the provided supporting documents, give a brief answer to the question.
Note that supporting documents are not part of the conversation. If question can't be answered from supporting documents, say so.

    Conversation history:
    {% for memory in memories %}
        {{ memory.text }}
    {% endfor %}

    Supporting documents:
    {% for doc in documents %}
        {{ doc.content }}
    {% endfor %}

    \nQuestion: {{query}}
    \nAnswer:
"""
user_message = ChatMessage.from_user(user_message_template)

构建流水线

将用于RAG和内存的组件添加到您的pipeline中。将自定义的ListJoiner组件集成到您的pipeline中,以处理来自用户和LLM的消息,并将它们写入内存存储。

注意ListJoiner组件将从Haystack 2.8.0版本开始可用!

from itertools import chain
from typing import Any

from haystack import component
from haystack.core.component.types import Variadic


@component
class ListJoiner:
    def __init__(self, _type: Any):
        component.set_output_types(self, values=_type)

    def run(self, values: Variadic[Any]):
        result = list(chain(*values))
        return {"values": result}
from typing import List
from haystack import Pipeline
from haystack.components.builders import ChatPromptBuilder, PromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.generators import OpenAIGenerator
from haystack.components.converters import OutputAdapter

pipeline = Pipeline()

# components for RAG
pipeline.add_component("retriever", InMemoryBM25Retriever(document_store=document_store, top_k=3))
pipeline.add_component("prompt_builder", ChatPromptBuilder(variables=["query", "documents", "memories"], required_variables=["query", "documents", "memories"]))
pipeline.add_component("llm", OpenAIChatGenerator())

# components for memory
pipeline.add_component("memory_retriever", memory_retriever)
pipeline.add_component("memory_writer", memory_writer)
pipeline.add_component("memory_joiner", ListJoiner(List[ChatMessage]))

# connections for RAG
pipeline.connect("retriever.documents", "prompt_builder.documents")
pipeline.connect("prompt_builder.prompt", "llm.messages")
pipeline.connect("llm.replies", "memory_joiner")

# connections for memory
pipeline.connect("memory_joiner", "memory_writer")
pipeline.connect("memory_retriever", "prompt_builder.memories")
<haystack.core.pipeline.pipeline.Pipeline object at 0x7f9b2dfaceb0>
🚅 Components
  - retriever: InMemoryBM25Retriever
  - prompt_builder: ChatPromptBuilder
  - llm: OpenAIChatGenerator
  - memory_retriever: ChatMessageRetriever
  - memory_writer: ChatMessageWriter
  - memory_joiner: ListJoiner
🛤️ Connections
  - retriever.documents -> prompt_builder.documents (List[Document])
  - prompt_builder.prompt -> llm.messages (List[ChatMessage])
  - llm.replies -> memory_joiner.values (List[ChatMessage])
  - memory_retriever.messages -> prompt_builder.memories (List[ChatMessage])
  - memory_joiner.values -> memory_writer.messages (List[ChatMessage])

可视化Pipeline

使用show()方法可视化Pipeline,以确认连接正确。

pipeline.show()

运行Pipeline

使用一些查询来测试Pipeline。确保每个用户查询也发送到memory_joiner,以便用户查询和LLM响应都一起存储在内存存储中。

以下是一些您可以尝试的示例查询

  • 罗德雕像看起来怎么样?
  • 是谁建造的?
while True:
    messages = [system_message, user_message]
    question = input("Enter your question or Q to exit.\n🧑 ")
    if question=="Q":
        break

    res = pipeline.run(data={"retriever": {"query": question},
                             "prompt_builder": {"template": messages, "query": question},
                             "memory_joiner": {"values": [ChatMessage.from_user(question)]}},
                            include_outputs_from=["llm"])
    assistant_resp = res['llm']['replies'][0]
    print(f"🤖 {assistant_resp.content}")
Enter your question or Q to exit.
🧑 What does Rhodes Statue look like?
🤖 The Rhodes statue, known as the Colossus of Rhodes, would have featured a head with curly hair and spikes of bronze or silver flame radiating from it, similar to the depictions found on contemporary Rhodian coins. However, the exact appearance of the statue is not known, as there are no surviving images of it.
Enter your question or Q to exit.
🧑 Who built it?
🤖 The Hanging Gardens of Babylon are said to have been built by the Neo-Babylonian King Nebuchadnezzar II for his Median wife, Queen Amytis.
Enter your question or Q to exit.
🧑 Q

⚠️ 如果您遵循了示例查询,您会注意到第二个问题的答案是错误的。这是因为检索到的文档与用户的查询不相关。检索是基于查询“Who built it?”,该查询没有足够的上下文来检索文档。让我们通过重述查询以进行搜索来解决这个问题。

用于重述用户查询的Prompt模板

在对话系统中,仅仅将内存注入prompt不足以有效地执行RAG。需要有一种机制根据对话历史来重述用户的查询,以确保检索到相关的文档。例如,如果第一个用户查询是“爱因斯坦的全名是什么?”,第二个查询是“他在哪里出生?”,系统应该理解“他”指的是爱因斯坦。然后,重述机制应该将第二个查询修改为“爱因斯坦在哪里出生?”以检索正确的文档。

我们可以使用LLM来重述用户的查询。让我们创建一个prompt,指示LLM重述查询,并结合对话历史,使其适合检索相关文档。

query_rephrase_template = """
        Rewrite the question for search while keeping its meaning and key terms intact.
        If the conversation history is empty, DO NOT change the query.
        Use conversation history only if necessary, and avoid extending the query with your own knowledge.
        If no changes are needed, output the current question as is.

        Conversation history:
        {% for memory in memories %}
            {{ memory.text }}
        {% endfor %}

        User Query: {{query}}
        Rewritten Query:
"""

构建对话式RAG Pipeline

现在,让我们通过添加一个新的PromptBuilderOpenAIGenerator和一个OutputAdapter来将查询重述功能集成到我们的pipeline中。OpenAIGenerator将重述用户的查询以进行搜索,OutputAdapterOpenAIGenerator的输出转换为InMemoryBM25Retriever的输入。Pipeline的其余部分将保持不变。

from typing import List
from haystack import Pipeline
from haystack.components.builders import ChatPromptBuilder, PromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.generators import OpenAIGenerator
from haystack.components.converters import OutputAdapter

conversational_rag = Pipeline()

# components for query rephrasing
conversational_rag.add_component("query_rephrase_prompt_builder", PromptBuilder(query_rephrase_template))
conversational_rag.add_component("query_rephrase_llm", OpenAIGenerator())
conversational_rag.add_component("list_to_str_adapter", OutputAdapter(template="{{ replies[0] }}", output_type=str))

# components for RAG
conversational_rag.add_component("retriever", InMemoryBM25Retriever(document_store=document_store, top_k=3))
conversational_rag.add_component("prompt_builder", ChatPromptBuilder(variables=["query", "documents", "memories"], required_variables=["query", "documents", "memories"]))
conversational_rag.add_component("llm", OpenAIChatGenerator())

# components for memory
conversational_rag.add_component("memory_retriever", ChatMessageRetriever(memory_store))
conversational_rag.add_component("memory_writer", ChatMessageWriter(memory_store))
conversational_rag.add_component("memory_joiner", ListJoiner(List[ChatMessage]))

# connections for query rephrasing
conversational_rag.connect("memory_retriever", "query_rephrase_prompt_builder.memories")
conversational_rag.connect("query_rephrase_prompt_builder.prompt", "query_rephrase_llm")
conversational_rag.connect("query_rephrase_llm.replies", "list_to_str_adapter")
conversational_rag.connect("list_to_str_adapter", "retriever.query")

# connections for RAG
conversational_rag.connect("retriever.documents", "prompt_builder.documents")
conversational_rag.connect("prompt_builder.prompt", "llm.messages")
conversational_rag.connect("llm.replies", "memory_joiner")

# connections for memory
conversational_rag.connect("memory_joiner", "memory_writer")
conversational_rag.connect("memory_retriever", "prompt_builder.memories")
<haystack.core.pipeline.pipeline.Pipeline object at 0x7f9b2deeee60>
🚅 Components
  - query_rephrase_prompt_builder: PromptBuilder
  - query_rephrase_llm: OpenAIGenerator
  - list_to_str_adapter: OutputAdapter
  - retriever: InMemoryBM25Retriever
  - prompt_builder: ChatPromptBuilder
  - llm: OpenAIChatGenerator
  - memory_retriever: ChatMessageRetriever
  - memory_writer: ChatMessageWriter
  - memory_joiner: ListJoiner
🛤️ Connections
  - query_rephrase_prompt_builder.prompt -> query_rephrase_llm.prompt (str)
  - query_rephrase_llm.replies -> list_to_str_adapter.replies (List[str])
  - list_to_str_adapter.output -> retriever.query (str)
  - retriever.documents -> prompt_builder.documents (List[Document])
  - prompt_builder.prompt -> llm.messages (List[ChatMessage])
  - llm.replies -> memory_joiner.values (List[ChatMessage])
  - memory_retriever.messages -> query_rephrase_prompt_builder.memories (List[ChatMessage])
  - memory_retriever.messages -> prompt_builder.memories (List[ChatMessage])
  - memory_joiner.values -> memory_writer.messages (List[ChatMessage])

让我们开始对话吧😀

现在,使用相关的输入运行pipeline。这次,不要直接将查询发送到retriever,而是将其传递给query_rephrase_prompt_builder以重述它。

以下是一些您可以尝试的示例查询和后续问题

  • 罗德雕像看起来怎么样? - 是谁建造的? - 他毁了吗?
  • 巴比伦花园在哪里? - 它是什么时候建造的?
while True:
    messages = [system_message, user_message]
    question = input("Enter your question or Q to exit.\n🧑 ")
    if question=="Q":
        break

    res = conversational_rag.run(data={"query_rephrase_prompt_builder": {"query": question},
                             "prompt_builder": {"template": messages, "query": question},
                             "memory_joiner": {"values": [ChatMessage.from_user(question)]}},
                            include_outputs_from=["llm","query_rephrase_llm"])
    search_query = res['query_rephrase_llm']['replies'][0]
    print(f"   🔎 Search Query: {search_query}")
    assistant_resp = res['llm']['replies'][0]
    print(f"🤖 {assistant_resp.text}")
Enter your question or Q to exit.
🧑 Where is Gardens of Babylon? 
   🔎 Search Query: Where are the Hanging Gardens of Babylon located?
🤖 The Hanging Gardens of Babylon were said to be located in the ancient city of Babylon, near present-day Hillah, Babil province, in Iraq.
Enter your question or Q to exit.
🧑 When was it built?
   🔎 Search Query: When was the Hanging Gardens of Babylon built?
🤖 The Hanging Gardens of Babylon are said to have been built during the reign of King Nebuchadnezzar II, who ruled between 605 and 562 BC.
Enter your question or Q to exit.
🧑 Q

✅ 请注意,这次通过查询重述,我们构建了一个能够处理后续查询并检索相关文档的对话式RAG pipeline。