在本指南中,我们将向您展示如何在Google Drive文件上构建一个“实时”RAG管道。
该管道将索引Google Drive文件并将它们转储到Redis向量存储中。随后,每次重新运行摄取管道时,管道将传播增量更新,这样只有更改的文档才会在向量存储中更新。这意味着我们不需要重新索引所有文档!
我们使用以下数据源 - 您需要复制这些文件并将它们上传到您自己的Google Drive目录中!
注意: 您还需要设置一个服务帐户和credentials.json。有关更多详细信息,请参阅我们的LlamaHub页面上的Google Drive加载程序:https://llamahub.ai/l/readers/llama-index-readers-google?from=readers
我们安装所需的包并启动 Redis Docker 镜像。
%pip install llama-index-storage-docstore-redis
%pip install llama-index-vector-stores-redis
%pip install llama-index-embeddings-huggingface
%pip install llama-index-readers-google
# 如果创建一个新的容器!docker run -d --name redis-stack -p 6379:6379 -p 8001:8001 redis/redis-stack:latest# # 如果启动一个已存在的容器# !docker start -a redis-stack
d32273cc1267d3221afa780db0edcd6ce5eee08ae88886f645410b9a220d4916
import os
os.environ["OPENAI_API_KEY"] = "sk-..."
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.ingestion import (
DocstoreStrategy,
IngestionPipeline,
IngestionCache,
)
from llama_index.storage.kvstore.redis import RedisKVStore as RedisCache
from llama_index.storage.docstore.redis import RedisDocumentStore
from llama_index.core.node_parser import SentenceSplitter
from llama_index.vector_stores.redis import RedisVectorStore
from redisvl.schema import IndexSchema
embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5")
model.safetensors: 0%| | 0.00/133M [00:00<?, ?B/s]
tokenizer_config.json: 0%| | 0.00/366 [00:00<?, ?B/s]
vocab.txt: 0%| | 0.00/232k [00:00<?, ?B/s]
tokenizer.json: 0%| | 0.00/711k [00:00<?, ?B/s]
special_tokens_map.json: 0%| | 0.00/125 [00:00<?, ?B/s]
1_Pooling/config.json: 0%| | 0.00/190 [00:00<?, ?B/s]
custom_schema = IndexSchema.from_dict( { "index": {"name": "gdrive", "prefix": "doc"}, # 自定义索引的字段 "fields": [ # llamaindex所需的字段 {"type": "tag", "name": "id"}, {"type": "tag", "name": "doc_id"}, {"type": "text", "name": "text"}, # 用于bge-small-en-v1.5嵌入的自定义向量字段 { "type": "vector", "name": "vector", "attrs": { "dims": 384, "algorithm": "hnsw", "distance_metric": "cosine", }, }, ], })vector_store = RedisVectorStore( schema=custom_schema, redis_url="redis://localhost:6379",)
# 可选:如果存在,清除向量存储if vector_store.index_exists(): vector_store.delete_index()
# 设置摄取缓存层cache = IngestionCache( cache=RedisCache.from_host_and_port("localhost", 6379), collection="redis_cache",)
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(),
embed_model,
],
docstore=RedisDocumentStore.from_host_and_port(
"localhost", 6379, namespace="document_store"
),
vector_store=vector_store,
cache=cache,
docstore_strategy=DocstoreStrategy.UPSERTS,
)
我们定义索引来包装底层的向量存储。
from llama_index.core import VectorStoreIndex
index = VectorStoreIndex.from_vector_store(
pipeline.vector_store, embed_model=embed_model
)
from llama_index.readers.google import GoogleDriveReader
loader = GoogleDriveReader()
def load_data(folder_id: str): # 加载数据 docs = loader.load_data(folder_id=folder_id) for doc in docs: # 将文档id设置为文件名 doc.id_ = doc.metadata["file_name"] return docsdocs = load_data(folder_id="1RFhr3-KmOZCR5rtp4dlOMNl3LKe1kOA5")# 打印docs# print(docs)
nodes = pipeline.run(documents=docs)
print(f"Ingested {len(nodes)} Nodes")
由于这是我们第一次启动向量存储,我们看到我们已经将所有文档转换/摄入到其中(通过分块,然后通过嵌入)。
在开始分析数据之前,通常需要先了解数据的基本情况。这包括数据的结构、特征、类型等。在这个阶段,可以提出一些问题来帮助我们更好地理解数据。例如:
通过提出这些问题,我们可以更好地准备数据,并为进一步的分析做好准备。
query_engine = index.as_query_engine()
response = query_engine.query("What are the sub-types of question answering?")
print(str(response))
The sub-types of question answering mentioned in the context are semantic search and summarization.
docs = load_data(folder_id="1RFhr3-KmOZCR5rtp4dlOMNl3LKe1kOA5")
nodes = pipeline.run(documents=docs)
print(f"Ingested {len(nodes)} Nodes")
注意只有一个节点被摄入。这是因为只有一个文档发生了变化,而其他文档保持不变。这意味着我们只需要重新转换和重新嵌入一个文档!
在处理新数据时,通常会有一些问题需要解决。这些问题可能涉及数据的质量、特征的含义、数据分布等方面。在处理新数据时,及时提出问题并寻找答案是非常重要的。
query_engine = index.as_query_engine()
response = query_engine.query("What are the sub-types of question answering?")
print(str(response))
The sub-types of question answering mentioned in the context are semantic search, summarization, and structured analytics.