Pathway是一个开放的数据处理框架。它允许您轻松开发数据转换管道和机器学习应用程序,这些应用程序可以与实时数据源和不断变化的数据一起工作。
本笔记本演示了如何使用LlamaIndex
实时数据索引管道。您可以使用提供的PathwayRetriever
从LLM应用程序中查询此管道的结果。然而,在幕后,Pathway在每次数据更改时更新索引,从而为您提供始终最新的答案。
在本笔记本中,我们将使用一个公共演示文档处理管道,该管道可以:
要创建您自己的文档处理管道,请查看托管的提供或按照本笔记本中的步骤构建您自己的。
我们将使用llama_index.retrievers.pathway.PathwayRetriever
检索器连接到索引,该检索器实现了retrieve
接口。
本文档中描述的基本管道允许轻松构建存储在云位置中的文件的简单索引。然而,Pathway提供了构建实时数据管道和应用程序所需的一切,包括类似SQL的操作,如在不同数据源之间进行groupby-reductions和连接,数据的基于时间的分组和窗口处理,以及各种连接器。
有关Pathway数据摄入管道和向量存储的更多详细信息,请访问向量存储管道。
要使用PathwayRetrievier
,您必须安装llama-index-retrievers-pathway
包。
!pip install llama-index-retrievers-pathway
要实例化和配置PathwayRetriever
,您需要提供文档索引管道的url
或host
和port
。在下面的代码中,我们使用一个公开可用的演示管道,其REST API可以在https://demo-document-indexing.pathway.stream
上访问。该演示管道从Google Drive和Sharepoint摄取文档,并维护用于检索文档的索引。
from llama_index.retrievers.pathway import PathwayRetriever
retriever = PathwayRetriever(
url="https://demo-document-indexing.pathway.stream"
)
retriever.retrieve(str_or_query_bundle="what is pathway")
from llama_index.core.query_engine import RetrieverQueryEngine
query_engine = RetrieverQueryEngine.from_args(
retriever,
)
response = query_engine.query("Tell me about Pathway")
print(str(response))
在数据科学和机器学习项目中,数据处理是一个至关重要的步骤。在本教程中,我们将学习如何构建自己的数据处理流程,以便有效地准备数据用于建模和分析。
我们将使用Python中的一些流行的库来构建我们的数据处理流程,包括Pandas和Scikit-learn。我们将按照以下步骤进行:
让我们开始吧!
安装 pathway
包。然后下载示例数据。
%pip install pathway
%pip install llama-index-embeddings-openai
!mkdir -p 'data/'
!wget 'https://gist.githubusercontent.com/janchorowski/dd22a293f3d99d1b726eedc7d46d2fc0/raw/pathway_readme.md' -O 'data/pathway_readme.md'
Pathway可以同时监听许多来源,比如本地文件、S3文件夹、云存储和任何数据流,以便进行数据更改的监控。
更多信息请参见pathway-io。
import pathway as pwdata_sources = []data_sources.append( pw.io.fs.read( "./data", format="binary", mode="streaming", with_metadata=True, ) # 这将创建一个`pathway`连接器,用于跟踪./data目录中的所有文件)# 这将创建一个用于跟踪Google Drive中文件的连接器。# 请按照https://pathway.com/developers/tutorials/connectors/gdrive-connector/上的说明获取凭据# data_sources.append(# pw.io.gdrive.read(object_id="17H4YpBOAKQzEJ93xmC2z170l0bP2npMy", service_user_credentials_file="credentials.json", with_metadata=True))
让我们创建文档索引流水线。transformations
应该是一个以 Embedding
转换结束的 TransformComponent
列表。
在这个例子中,让我们首先使用 TokenTextSplitter
分割文本,然后使用 OpenAIEmbedding
进行嵌入。
from pathway.xpacks.llm.vector_store import VectorStoreServerfrom llama_index.embeddings.openai import OpenAIEmbeddingfrom llama_index.core.node_parser import TokenTextSplitter# 创建OpenAIEmbedding对象并设置embed_batch_size为10embed_model = OpenAIEmbedding(embed_batch_size=10)# 定义transformations_example列表,包括TokenTextSplitter对象和embed_model对象transformations_example = [ TokenTextSplitter( chunk_size=150, chunk_overlap=10, separator=" ", ), embed_model,]# 通过VectorStoreServer.from_llamaindex_components创建processing_pipeline对象processing_pipeline = VectorStoreServer.from_llamaindex_components( *data_sources, transformations=transformations_example,)# 定义Pathway的主机和端口PATHWAY_HOST = "127.0.0.1"PATHWAY_PORT = 8754# `threaded`以分离模式运行pathway,当从终端或容器中运行时,必须将其设置为False# 有关`with_cache`的更多信息,请查看https://pathway.com/developers/api-docs/persistence-apiprocessing_pipeline.run_server( host=PATHWAY_HOST, port=PATHWAY_PORT, with_cache=False, threaded=True)
from llama_index.retrievers.pathway import PathwayRetriever
retriever = PathwayRetriever(host=PATHWAY_HOST, port=PATHWAY_PORT)
retriever.retrieve(str_or_query_bundle="what is pathway")