使用 LangChain 和 Elasticsearch 实现隐私至上的 AI 搜索

02-prism.jpeg

在过去的几个周末,我一直沉浸在“提示工程”世界中并为之着迷,了解了像 Elasticsearch® 这样的矢量数据库如何通过充当长期记忆和语义知识存储,为 ChatGPT 等大型语言模型 (LLM) 提供动力。不过,有一点让我和许多其他经验丰富的数据架构师感到困扰,那就是,许多教程和演示完全依赖于向大型网络公司和基于云的 AI 公司发送用户的私有数据。 

私有数据形式多样,受到保护的原因也不止一个。无论是初创公司还是企业型公司,都知道私有数据有时就是他们的竞争优势。内部数据和客户数据通常包含个人身份信息,如果不加以保护,在法律和现实生活中都会带来影响。在可观测性和安全性领域,如果在利用第三方服务时不够谨慎,可能会导致数据泄露。我们甚至听说过与使用 AI 聊天工具有关的网络安全漏洞指控。

没有一种设计是完全没有风险或者完全私有的,即使是与 Elastic 这样力保隐私安全的公司合作,或者在真正的气隙环境中进行部署,也都是如此。不过,我已经处理过足够多的敏感数据用例,知道采用隐私至上的方法来实现 AI 搜索是非常有价值的。我很喜欢我的同事 Jeff Vestal 对如何在 Elasticsearch 中使用 OpenAI 工具的精彩讲解,但本文将另辟蹊径。

对于这个项目所采用的方法,我有两个目标:

  • 私有 — 对于这个目标,我是认真的。虽然我会使用云托管的 Elasticsearch,但如果用例需要的话,我希望它能完全在气隙环境中运行。我们来证明一下,在不向第三方发送私有数据的情况下可以实现 AI 搜索。
  • 趣味性 — 另外,实现 AI 搜索时不能缺少趣味性。我们将使用 Wookieepedia(一个在数据科学实践中很受欢迎的星球大战社区 Wiki)中抓取的一小段数据,创建一个私有 AI 问答助手。写这篇文章的时候已经快 5 月 4 日(星球大战日)了,虽然在这篇文章发表的时候早已经过了这个日期,但星战影迷全年无休。

要自行尝试,最简单的方法是在 Elastic Cloud 上启动一个 Elasticsearch 实例,然后运行所提供的 Python 记事本,小规模地实现该项目。如果您想对 Wookieepedia 中 18 万段星球大战知识进行全文抓取,并创建一个精通星球大战知识的搜索应用程序,可以按照 GitHub 存储库中的代码操作。

全部完成后,它应该如下所示:

“may the fourth be with you”字样

本着开放的精神,我们引入两种开源技术来辅助 Elasticsearch:Hugging Face 转换器库和新颖有趣的 Python 库 LangChain,它们可以加快作为矢量数据库的 Elasticsearch 的工作速度。此外,LangChain 还有一个优势,我们的 LLM 一旦设置好,就可以通过编程实现互换,让我们可以自由地试验各种模型。

工作原理

LangChain 是什么?LangChain 是一个 Python 和 JavaScript 框架,用于开发由大型语言模型提供支持的应用程序。LangChain 将与 OpenAI 的 API 配合使用,但它也擅长于抽象化数据库和 AI 工具之间的差异。

ChatGPT 本身在星球大战问答方面表现不错。不过,它的训练数据集已经是几年前的数据,而我们需要的是最新电视节目和活动中有关星球大战宇宙的答案。还有一点要注意,我们是假装这些数据过于私密,不能与云中的大型 LLM 共享。我们可以自己用较新的数据调整大型语言模型,但有一种简单得多的方法可以实现这一点,而且还能让我们始终使用最新的数据。

现在介绍一种规模较小、易于自托管的 LLM。我用 Google 的 flan-t5-large 模型得到了不错的结果,它可以很好地从注入的上下文中解析出答案,弥补了训练不足的缺陷。我们将使用语义搜索对私有知识进行检索,然后将上下文和一个问题注入到我们的私有 LLM 中。

私有 AI 搜索

1.从 Wookieepedia 中抓取所有规则文章,并将数据放入暂存的 Python Pickle 文件中。

2A.使用 LangChain 内置的 Vectorstore 库将这些文章的每个段落加载到 Elasticsearch 中。

2B.或者,我们可以将 LangChain 与在 Elasticsearch 中托管 pytorch 转换器的新方法进行比较。我们将把文本嵌入模型部署到 Elasticsearch 中,以利用分布式计算并加快这个过程。

3.问题传入后,我们将使用 Elasticsearch 的矢量搜索找到与问题在语义上最相似的段落。然后,选取该段落并将其添加到一个本地小型 LLM 的提示中,作为这个问题的上下文,然后让神奇的生成式 AI 针对我们的问答题目给出简短回答。

设置 Python 和 Elasticsearch 环境

确保您的计算机上安装了 Python 3.9 或类似版本。我使用 3.9 版本是为了更容易与 GPU 加速库兼容,但这个项目不需要这样做。任何最近的 3.X 版本的 Python 都可以。

python3 -m venv venv
source venv/bin/activate
pip install --upgrade pip
pip install beautifulsoup4 eland elasticsearch huggingface-hub langchain tqdm torch requests sentence_transformers

如果您已经下载了示例代码,可以使用下面的 pip install 命令拉取我使用的代码的确切版本。

pip install -r requirements.txt

您可以按照此处的说明设置 Elasticsearch 集群。最简单的入门方法是使用免费的云试用版。

在文件夹中创建一个 .env 文件,然后为 Elasticsearch 加载连接详细信息。

export ES_SERVER="YOURDESSERVERNAME.es.us-central1.gcp.cloud.es.io"
export ES_USERNAME="YOUR READ WRITE AND INDEX CREATING USER"
export ES_PASSWORD="YOUR PASSWORD"

第 1 步.抓取数据

代码存储库中有一个小型数据集,位于 Dataset/starwars_small_sample_data.pickle。如果您能接受小规模的任务,可以跳过这一步。

抓取代码改编自 Dennis Bakhuis 的优秀数据科学博客项目,去看看吧!他只拉取了每篇文章的第一段,而我修改了代码,拉取了全部内容。他可能需要将数据保持在适合主内存的大小,但我们没有这方面的问题,因为我们有 Elasticsearch,它可以将内存扩展到 PB 级别。

您也可以在这里轻松地插入自己的私有数据源。LangChain 有一些优秀的实用程序库,可以将文本数据拆分成更短的块。

抓取不是本文的重点,如果您想自己小规模运行,可以看看 Python 记事本,或者下载源代码并运行以下内容:

source .env
python3 step-1A-scrape-urls.py
python3 step-1B-scrape-content.py

完成后,您应该能够查看如下所示的已保存的 Pickle 文件,确保它正常工作。

from pathlib import Path
import pickle


bookFilePath = "starwars_*_data*.pickle"
files = sorted(Path('./Dataset').glob(bookFilePath))
for fn in files:
   with open(fn,'rb') as f:
       part = pickle.load(f)
       for key, value in part.items():
           title = value['title'].strip()
           print(title)

如果您跳过了网页抓取,只需将 bookFilePath 更改为“starwars_small_sample_data.pickle”,即可使用我在 GitHub 存储库中提供的示例。

第 2A 步.在 Elasticsearch 中加载嵌入模型

完整的代码展示了我是如何只使用 LangChain 来实现这一操作的。代码的关键部分是循环遍历已保存的 Pickle 文件(如上面的示例),提取出作为段落的字符串列表,然后将它们传递给 LangChain Vectorstorefrom_texts() 函数。

from langchain.vectorstores import ElasticVectorSearch
from langchain.embeddings import HuggingFaceEmbeddings
from pathlib import Path
import pickle
import os
from tqdm import tqdm


model_name = "sentence-transformers/all-mpnet-base-v2"
hf = HuggingFaceEmbeddings(model_name=model_name)


index_name = "book_wookieepedia_mpnet"
endpoint = os.getenv('ES_SERVER', 'ERROR')
username = os.getenv('ES_USERNAME', 'ERROR')
password = os.getenv('ES_PASSWORD', 'ERROR')
url = f"https://{username}:{password}@{endpoint}:443"
db = ElasticVectorSearch(embedding=hf, elasticsearch_url=url, index_name=index_name)


batchtext = []
bookFilePath = "starwars_*_data*.pickle"
files = sorted(Path('./Dataset').glob(bookFilePath))
for fn in files:
    with open(fn,'rb') as f:
       part = pickle.load(f)
       for ix, (key, value) in tqdm(enumerate(part.items()), total=len(part)):
           paragraphs = value['paragraph']
           for p in paragraphs:
               batchtext.append(p)
       db.from_texts(batchtext,
                     embedding=hf,
                     elasticsearch_url=url,
                     index_name=index_name)

第 2B 步.使用托管式训练模型节省时间和成本

我发现,在我的旧版 Intel Macbook 上,创建嵌入模型需要处理好几个小时。我还是把时间说短了,好像需要好几天才能完成。我觉得使用 Elastic 托管服务中可动态扩展的 Machine Learning (ML) 节点,可以更快、更经济地完成这项工作。免费试用版中的集群不支持扩展该层,因此,这一步对某些用户可能更有意义。

最后的结果是:这种方法在 Elastic Cloud 中运行的节点上耗时 40 分钟,成本为 5 美元/小时,这比我在本地完成的速度要快得多,而且与 OpenAI(目前按 token 计费)处理嵌入模型的成本不相上下。如何高效地完成是一个更大的话题,但有一点我非常满意,那就是我可以快速得到一个在 Elastic Cloud 中运行的并行推理管道,而不需要学习新技能或将数据交给非私有 API。

在这一步中,我们将把嵌入生成的任务转移给 Elasticsearch 集群,它可以托管嵌入模型,并以分布式的方式嵌入文本段落。为此,我们必须加载数据并使用采集管道,确保最终的形式与 LangChain 所用的索引映射一致。在 Kibana 的开发工具中运行以下 REST 命令:

PUT /book_wookieepedia_mpnet
{
 "settings": {
   "number_of_shards": 4
 },
 "mappings": {
   "properties": {
     "metadata": {
       "type": "object"
     },
     "text": {
       "type": "text"
     },
     "vector": {
       "type": "dense_vector",
       "dims": 768
     }
   }
 }
}

接下来,使用 eland Python 库将嵌入模型上传到 Elasticsearch。

source .env
python3 step-3A-upload-model.py

下面,我们进入 Elastic Cloud 控制台,将我们的 ML 层扩展到总共 64 个 vCPU(是我目前笔记本电脑性能的 8 倍)。

Machine Learning 实例

现在,我们将在 Kibana 中部署训练好的 ML 模型。性能测试表明,在大规模部署时,用户应该从每个模型分配 1 个线程开始,然后逐步增加分配数量以提高吞吐量。可在此处找到相关文档和指南。我进行了试验,对于这个较小的数据集,我用 32 个实例(每个实例 2 个线程)得到了最佳结果。要设置这一项,请前往“堆栈管理”>“Machine Learning”。使用同步保存的对象功能,让 Kibana 看到我们用 Python 代码推送到 Elasticsearch 中的模型。然后,在单击模型后出现的菜单中进行部署。 

起始句转换器

现在,我们再次使用开发工具创建一个新的索引和采集管道,该管道用于处理文档中的文本段落,将结果放入名为“vector”的密集矢量字段中,并将该段落复制到预期的“text”字段中。

PUT /book_wookieepedia_mpnet
{
 "settings": {
   "number_of_shards": 4
 },
 "mappings": {
   "properties": {
     "metadata": {
       "type": "object"
     },
     "text": {
       "type": "text"
     },
     "vector": {
       "type": "dense_vector",
       "dims": 768
     }
   }
 }
}


PUT _ingest/pipeline/sw-embeddings
{
 "description": "Text embedding pipeline",
 "processors": [
   {
     "inference": {
       "model_id": "sentence-transformers__all-mpnet-base-v2",
       "target_field": "text_embedding",
       "field_map": {
         "text": "text_field"
       }
     }
   },
   {
     "set":{
       "field": "vector",
       "copy_from": "text_embedding.predicted_value"
     }
   },
   {
     "remove": {
       "field": "text_embedding"
     }
   }
 ],
 "on_failure": [
   {
     "set": {
       "description": "Index document to 'failed-<index>'",
       "field": "_index",
       "value": "failed-{{{_index}}}"
     }
   },
   {
     "set": {
       "description": "Set error message",
       "field": "ingest.failure",
       "value": "{{_ingest.on_failure_message}}"
     }
   }
 ]
}

测试管道,确保它能正常运行。

POST _ingest/pipeline/sw-embeddings/_simulate
{
 "docs": [
   {
     "_source": {
       "text": "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.",
       "metadata": {
         "a": "b"
       }
     }
   }
 ]
}

现在,我们已经准备好使用 Elasticsearch 的普通 Python 库批量加载数据,目标是我们的采集管道正确地创建矢量嵌入并对数据进行转换,以符合 LangChain 的预期。

source .env
python3 step-3B-batch-hosted-vectorize.py

成功!按照 OpenAI 的标准,这些数据约为 1300 万个 token,因此在 OpenAI 或类似云服务中生成这些矢量的费用约为 5.40 美元。使用 Elastic Cloud,处理这些数据需要 40 分钟,成本为 5 美元/小时。

加载完数据后,请记得使用云控制台将 Cloud ML 的规模缩减到零或更合理的值。

第 3 步.在星球大战问答游戏中获胜

接下来我们来试一试 LLM 和 LangChain。我创建了一个 lib_llm.py 库文件来保存这些代码。

from langchain import PromptTemplate, HuggingFaceHub, LLMChain
from langchain.llms import HuggingFacePipeline
from transformers import AutoTokenizer, pipeline, AutoModelForSeq2SeqLM
from langchain.vectorstores import ElasticVectorSearch
from langchain.embeddings import HuggingFaceEmbeddings
import os


cache_dir = "./cache"
def getFlanLarge():
  
   model_id = 'google/flan-t5-large'
   print(f">> Prep. Get {model_id} ready to go")
   tokenizer = AutoTokenizer.from_pretrained(model_id)
   model = AutoModelForSeq2SeqLM.from_pretrained(model_id, cache_dir=cache_dir)
  
   pipe = pipeline(
       "text2text-generation",
       model=model,
       tokenizer=tokenizer,
       max_length=100
   )
   llm = HuggingFacePipeline(pipeline=pipe)
   return llm


local_llm = getFlanLarge()


def make_the_llm():
   template_informed = """
   I am a helpful AI that answers questions.
   When I don't know the answer I say I don't know.
   I know context: {context}
   when asked: {question}
   my response using only information in the context is: """
   prompt_informed = PromptTemplate(
       template=template_informed,
       input_variables=["context", "question"])
   return LLMChain(prompt=prompt_informed, llm=local_llm)


## continued below

template_informed 是其中非常关键但也很容易理解的部分。我们要为提示模板设置格式,它将接受两个参数:上下文和用户提出的问题。

最后的主代码(接上文)如下所示:

## continued from above


topic = "Star Wars"
index_name = "book_wookieepedia_mpnet"


# Create the HuggingFace Transformer like before
model_name = "sentence-transformers/all-mpnet-base-v2"
hf = HuggingFaceEmbeddings(model_name=model_name)


## Elasticsearch as a vector db, just like before
endpoint = os.getenv('ES_SERVER', 'ERROR')
username = os.getenv('ES_USERNAME', 'ERROR')
password = os.getenv('ES_PASSWORD', 'ERROR')
url = f"https://{username}:{password}@{endpoint}:443"
db = ElasticVectorSearch(embedding=hf, elasticsearch_url=url, index_name=index_name)


## set up the conversational LLM
llm_chain_informed= make_the_llm()


def ask_a_question(question):
   ## get the relevant chunk from Elasticsearch for a question
   similar_docs = db.similarity_search(question)
   print(f'The most relevant passage: \n\t{similar_docs[0].page_content}')
   informed_context= similar_docs[0].page_content
   informed_response = llm_chain_informed.run(
       context=informed_context,
       question=question)
   return informed_response




# The conversational loop
print(f'I am a trivia chat bot, ask me any question about {topic}')
while True:
   command = input("User Question >> ")
   response= ask_a_question(command)
   print(f"\tAnswer  : {response}")

结论

通过执行一些数据整理,我们现在已经使用了 AI,不会将我们的数据向第三方托管式 LLM 公开。AI 的世界瞬息万变,但对于私有数据的安全性和控制权的保护,我们所有人都应该严肃对待,因为数据泄露会对监管、财务和人员等方面造成不良后果。这种情况不太可能改变。我们与客户合作,通过搜索来调查欺诈行为、保卫国家安全并改善弱势患者群体的治疗效果。隐私是非常重要的。要进一步了解 Elastic 在这些领域的用途,请参阅以下内容:

您是否和我一样爱上了 LangChain?一位睿智的老绝地武士曾说过:“很好。你已经迈出了通往更广阔世界的第一步。” 从这里出发,有很多条路可以走。LangChain 让 AI 提示工程变得不再复杂。我知道 Elasticsearch 在这里还能发挥许多其他作用,比如充当生成式 AI 的长期记忆,我非常期待看到这个瞬息万变的领域中层出不穷的新事物。

在本博文中,我们可能使用了第三方生成式 AI 工具,这些工具由其各自所有者拥有和运营。Elastic 对第三方工具没有任何控制权,对其内容、操作或使用不承担任何责任或义务,对您使用此类工具可能造成的任何损失或损害也不承担任何责任或义务。在 AI 工具中使用个人、敏感或机密信息时,请务必谨慎。您提交的任何数据都可能用于 AI 训练或其他目的。Elastic 不保证您所提供信息的安全性或保密性。在使用任何生成式 AI 工具之前,您应自行熟悉隐私惯例和使用条款。

Elastic、Elasticsearch 及相关标志为 Elasticsearch N.V. 在美国和其他国家/地区的商标、徽标或注册商标。所有其他公司和产品名称均为其相应所有者的商标、徽标或注册商标。