#!/usr/bin/env python # coding: utf-8 # In[ ]: # 安装所需的包 get_ipython().system(' pip install langchain_community tiktoken langchain-openai langchainhub chromadb langchain langgraph') # # LangGraph 检索代理 # # 我们可以在 [LangGraph](https://python.langchain.com/docs/langgraph) 中实现 [检索代理](https://python.langchain.com/docs/use_cases/question_answering/conversational_retrieval_agents)。 # # ## 检索器 # In[1]: from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.document_loaders import WebBaseLoader from langchain_community.vectorstores import Chroma from langchain_openai import OpenAIEmbeddings urls = [ "https://lilianweng.github.io/posts/2023-06-23-agent/", "https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/", "https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/", ] docs = [WebBaseLoader(url).load() for url in urls] docs_list = [item for sublist in docs for item in sublist] text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder( chunk_size=100, chunk_overlap=50 ) doc_splits = text_splitter.split_documents(docs_list) # Add to vectorDB vectorstore = Chroma.from_documents( documents=doc_splits, collection_name="rag-chroma", embedding=OpenAIEmbeddings(), ) retriever = vectorstore.as_retriever() # In[7]: from langchain.tools.retriever import create_retriever_tool # 使用create_retriever_tool函数创建一个工具对象 tool = create_retriever_tool( retriever, # 检索器对象 "retrieve_blog_posts", # 工具名称 "Search and return information about Lilian Weng blog posts." # 工具描述 ) # 将工具对象添加到工具列表中 tools = [tool] from langgraph.prebuilt import ToolExecutor # 创建ToolExecutor对象,并将工具列表传递给它 tool_executor = ToolExecutor(tools) # ## 代理状态 # # 我们将定义一个图。 # # 一个`state`对象会在每个节点之间传递。 # # 我们的状态将是一个`messages`列表。 # # 我们的图中的每个节点都会向其附加。 # In[6]: # 导入必要的库 import operator from typing import Annotated, Sequence, TypedDict # 导入自定义模块 from langchain_core.messages import BaseMessage # 定义一个名为AgentState的TypedDict类型 class AgentState(TypedDict): # 定义AgentState中的键值对,键为'messages',值为一个注解类型为Annotated的序列,序列元素为BaseMessage类型,注解为operator.add messages: Annotated[Sequence[BaseMessage], operator.add] # ## 节点和边 # # 每个节点将会 - # # 1/ 要么是一个函数,要么是一个可运行的对象。 # # 2/ 修改 `state`。 # # 边决定了下一个要调用的节点。 # # 我们可以像这样布置一个代理 RAG 图: # # ![Screenshot 2024-02-02 at 1.36.50 PM.png](attachment:f886806c-0aec-4c2a-8027-67339530cb60.png) # In[12]: import json import operator from typing import Annotated, Sequence, TypedDict from langchain.output_parsers import PydanticOutputParser from langchain.prompts import PromptTemplate from langchain.tools.render import format_tool_to_openai_function from langchain_core.messages import BaseMessage, FunctionMessage from langchain_core.pydantic_v1 import BaseModel, Field from langchain_openai import ChatOpenAI from langgraph.prebuilt import ToolInvocation ### Edges def should_retrieve(state): """ Decides whether the agent should retrieve more information or end the process. This function checks the last message in the state for a function call. If a function call is present, the process continues to retrieve information. Otherwise, it ends the process. Args: state (messages): The current state of the agent, including all messages. Returns: str: A decision to either "continue" the retrieval process or "end" it. """ print("---DECIDE TO RETRIEVE---") messages = state["messages"] last_message = messages[-1] # If there is no function call, then we finish if "function_call" not in last_message.additional_kwargs: print("---DECISION: DO NOT RETRIEVE / DONE---") return "end" # Otherwise there is a function call, so we continue else: print("---DECISION: RETRIEVE---") return "continue" def check_relevance(state): """ Determines whether the Agent should continue based on the relevance of retrieved documents. This function checks if the last message in the conversation is of type FunctionMessage, indicating that document retrieval has been performed. It then evaluates the relevance of these documents to the user's initial question using a predefined model and output parser. If the documents are relevant, the conversation is considered complete. Otherwise, the retrieval process is continued. Args: state messages: The current state of the conversation, including all messages. Returns: str: A directive to either "end" the conversation if relevant documents are found, or "continue" the retrieval process. """ print("---CHECK RELEVANCE---") # Output class FunctionOutput(BaseModel): binary_score: str = Field(description="Relevance score 'yes' or 'no'") # Create an instance of the PydanticOutputParser parser = PydanticOutputParser(pydantic_object=FunctionOutput) # Get the format instructions from the output parser format_instructions = parser.get_format_instructions() # Create a prompt template with format instructions and the query prompt = PromptTemplate( template="""You are a grader assessing relevance of retrieved docs to a user question. \n Here are the retrieved docs: \n ------- \n {context} \n ------- \n Here is the user question: {question} If the docs contain keyword(s) in the user question, then score them as relevant. \n Give a binary score 'yes' or 'no' score to indicate whether the docs are relevant to the question. \n Output format instructions: \n {format_instructions}""", input_variables=["question"], partial_variables={"format_instructions": format_instructions}, ) model = ChatOpenAI(temperature=0, model="gpt-4-0125-preview") chain = prompt | model | parser messages = state["messages"] last_message = messages[-1] score = chain.invoke( {"question": messages[0].content, "context": last_message.content} ) # If relevant if score.binary_score == "yes": print("---DECISION: DOCS RELEVANT---") return "yes" else: print("---DECISION: DOCS NOT RELEVANT---") print(score.binary_score) return "no" ### Nodes # Define the function that calls the model def call_model(state): """ Invokes the agent model to generate a response based on the current state. This function calls the agent model to generate a response to the current conversation state. The response is added to the state's messages. Args: state (messages): The current state of the agent, including all messages. Returns: dict: The updated state with the new message added to the list of messages. """ print("---CALL AGENT---") messages = state["messages"] model = ChatOpenAI(temperature=0, streaming=True, model="gpt-4-0125-preview") functions = [format_tool_to_openai_function(t) for t in tools] model = model.bind_functions(functions) response = model.invoke(messages) # We return a list, because this will get added to the existing list return {"messages": [response]} # Define the function to execute tools def call_tool(state): """ Executes a tool based on the last message's function call. This function is responsible for executing a tool invocation based on the function call specified in the last message. The result from the tool execution is added to the conversation state as a new message. Args: state (messages): The current state of the agent, including all messages. Returns: dict: The updated state with the new function message added to the list of messages. """ print("---EXECUTE RETRIEVAL---") messages = state["messages"] # Based on the continue condition # we know the last message involves a function call last_message = messages[-1] # We construct an ToolInvocation from the function_call action = ToolInvocation( tool=last_message.additional_kwargs["function_call"]["name"], tool_input=json.loads( last_message.additional_kwargs["function_call"]["arguments"] ), ) # We call the tool_executor and get back a response response = tool_executor.invoke(action) # print(type(response)) # We use the response to create a FunctionMessage function_message = FunctionMessage(content=str(response), name=action.tool) # We return a list, because this will get added to the existing list return {"messages": [function_message]} # ## 图表 # # * 从一个代理开始,名为 `call_model` # * 代理做出决策,决定是否调用一个函数 # * 如果是的话,就执行 `action` 来调用工具(检索器) # * 然后将工具的输出添加到消息(`state`)中,并将其传递给代理进行调用 # In[13]: from langgraph.graph import END, StateGraph # 导入END和StateGraph类 # 定义一个新的图 workflow = StateGraph(AgentState) # 创建一个状态图对象,AgentState是状态的名称 # 定义我们将循环的节点 workflow.add_node("agent", call_model) # 添加一个名为"agent"的节点,并指定调用call_model函数 workflow.add_node("action", call_tool) # 添加一个名为"action"的节点,并指定调用call_tool函数 # In[14]: # 调用代理节点来决定是否检索 workflow.set_entry_point("agent") # 决定是否检索 workflow.add_conditional_edges( "agent", # 评估代理决定 should_retrieve, { # 调用工具节点 "continue": "action", "end": END, }, ) # 在调用`action`节点后采取的边缘 workflow.add_conditional_edges( "action", # 评估代理决定 check_relevance, { # 调用代理节点 "yes": "agent", "no": END, # 占位符 }, ) # 编译 app = workflow.compile() # In[15]: # 导入 pprint 模块 import pprint # 从 langchain_core.messages 模块中导入 HumanMessage 类 from langchain_core.messages import HumanMessage # 定义输入数据 inputs = { "messages": [ HumanMessage( content="What are the types of agent memory based on Lilian Weng's blog post?" ) ] } # 循环遍历 app.stream(inputs) 的输出 for output in app.stream(inputs): # 遍历输出中的每个键值对 for key, value in output.items(): # 打印输出节点的名称 pprint.pprint(f"Output from node '{key}':") # 打印分隔线 pprint.pprint("---") # 使用 pprint 打印值,设置缩进、宽度和深度 pprint.pprint(value, indent=2, width=80, depth=None) # 打印分隔线 pprint.pprint("\n---\n") # 追踪: # # https://smith.langchain.com/public/6f45c61b-69a0-4b35-bab9-679a8840a2d6/r # In[ ]: