#!/usr/bin/env python # coding: utf-8 # # Wikibase Agent # # 这个笔记本演示了一个非常简单的Wikibase代理,它使用SPARQL生成。尽管这段代码旨在针对任何Wikibase实例工作,但我们在测试中使用http://wikidata.org。 # # 如果您对Wikibase和SPARQL感兴趣,请考虑帮助改进这个代理。请查看[这里](https://github.com/donaldziff/langchain-wikibase)获取更多详细信息和未解决的问题。 # ## 预备知识 # ### API密钥和其他机密信息 # # 我们使用一个`.ini`文件,内容如下: # ``` # [OPENAI] # OPENAI_API_KEY=xyzzy # [WIKIDATA] # WIKIDATA_USER_AGENT_HEADER=argle-bargle # ``` # In[1]: # 导入配置文件模块 import configparser # 创建配置解析器对象 config = configparser.ConfigParser() # 读取指定路径下的配置文件 config.read("./secrets.ini") # ### OpenAI API 密钥 # # 除非您修改下面的代码以使用其他 LLM 供应商,否则需要一个 OpenAI API 密钥。 # In[2]: # 从配置文件中获取OpenAI API密钥 openai_api_key = config["OPENAI"]["OPENAI_API_KEY"] import os # 将OpenAI API密钥添加到环境变量中 os.environ.update({"OPENAI_API_KEY": openai_api_key}) # ### Wikidata用户代理标头 # # Wikidata政策要求提供用户代理标头。请参阅https://meta.wikimedia.org/wiki/User-Agent_policy。然而,目前该政策并没有严格执行。 # In[3]: # 检查配置文件中是否存在"WIKIDATA"部分 # 如果存在,则将"WIKIDATA_USER_AGENT_HEADER"的值赋给wikidata_user_agent_header变量,否则将其赋值为None wikidata_user_agent_header = ( None if not config.has_section("WIKIDATA") else config["WIKIDATA"]["WIKIDATA_USER_AGENT_HEADER"] ) # ### 如果需要,启用跟踪 # In[4]: # 导入os模块 import os # 设置环境变量LANGCHAIN_HANDLER为"langchain" os.environ["LANGCHAIN_HANDLER"] = "langchain" # 设置环境变量LANGCHAIN_SESSION为"default",确保该会话实际存在 os.environ["LANGCHAIN_SESSION"] = "default" # # 工具 # # 为这个简单代理提供了三种工具: # * `ItemLookup`: 用于查找物品的q编号 # * `PropertyLookup`: 用于查找属性的p编号 # * `SparqlQueryRunner`: 用于运行SPARQL查询 # ## 项目和属性查找 # # 项目和属性查找是通过一个单一方法实现的,使用一个弹性搜索终端点。并非所有的维基库实例都有这个功能,但维基数据有,这就是我们将要开始的地方。 # In[5]: def get_nested_value(o: dict, path: list) -> any: current = o for key in path: try: current = current[key] except KeyError: return None return current from typing import Optional import requests def vocab_lookup( search: str, entity_type: str = "item", url: str = "https://www.wikidata.org/w/api.php", user_agent_header: str = wikidata_user_agent_header, srqiprofile: str = None, ) -> Optional[str]: headers = {"Accept": "application/json"} if wikidata_user_agent_header is not None: headers["User-Agent"] = wikidata_user_agent_header if entity_type == "item": srnamespace = 0 srqiprofile = "classic_noboostlinks" if srqiprofile is None else srqiprofile elif entity_type == "property": srnamespace = 120 srqiprofile = "classic" if srqiprofile is None else srqiprofile else: raise ValueError("entity_type must be either 'property' or 'item'") params = { "action": "query", "list": "search", "srsearch": search, "srnamespace": srnamespace, "srlimit": 1, "srqiprofile": srqiprofile, "srwhat": "text", "format": "json", } response = requests.get(url, headers=headers, params=params) if response.status_code == 200: title = get_nested_value(response.json(), ["query", "search", 0, "title"]) if title is None: return f"I couldn't find any {entity_type} for '{search}'. Please rephrase your request and try again" # if there is a prefix, strip it off return title.split(":")[-1] else: return "Sorry, I got an error. Please try again." # In[6]: # 定义一个函数vocab_lookup,用于查找词汇表中的单词 def vocab_lookup(word): # 返回输入单词的大写形式 return word.upper() # 调用vocab_lookup函数,并打印结果 print(vocab_lookup("Malin 1")) # In[7]: # 导入所需的库 from wikidata.client import Client # 定义函数vocab_lookup,接收两个参数:term和entity_type def vocab_lookup(term, entity_type=None): # 创建一个Wikidata客户端对象 client = Client() # 使用客户端对象的search方法搜索term search_results = client.search(term) # 如果指定了entity_type,则过滤搜索结果 if entity_type: search_results = [result for result in search_results if result.entity_type == entity_type] # 返回搜索结果 return search_results # 调用vocab_lookup函数,并打印结果 print(vocab_lookup("instance of", entity_type="property")) 这段代码使用了`wikidata`库来进行词汇查询。首先,导入了`wikidata.client`模块。然后,定义了一个名为`vocab_lookup`的函数,该函数接收一个参数`term`和一个可选参数`entity_type`。在函数内部,创建了一个`Wikidata`客户端对象,并使用客户端对象的`search`方法搜索`term`。如果指定了`entity_type`,则对搜索结果进行过滤。最后,返回搜索结果并打印出来。 # In[8]: # 导入必要的库 import re # 定义函数 vocab_lookup,用于查找给定字符串中的单词 def vocab_lookup(text): # 使用正则表达式将字符串中的非字母字符替换为空格 cleaned_text = re.sub(r'[^a-zA-Z]', ' ', text) # 将字符串拆分为单词列表 words = cleaned_text.split() # 返回单词列表 return words # 调用函数 vocab_lookup,并打印结果 print(vocab_lookup("Ceci n'est pas un q-item")) 这段代码定义了一个名为 `vocab_lookup` 的函数,该函数接受一个字符串作为输入,并返回该字符串中的单词列表。在函数内部,使用正则表达式将字符串中的非字母字符替换为空格,然后将字符串拆分为单词列表。最后,调用 `vocab_lookup` 函数,并打印结果。输出结果将是 `['Ceci', 'n', 'est', 'pas', 'un', 'q', 'item']`。 # ## Sparql运行器 # 这个工具运行sparql - 默认情况下使用wikidata。 # In[9]: import json from typing import Any, Dict, List import requests def run_sparql( query: str, url="https://query.wikidata.org/sparql", user_agent_header: str = wikidata_user_agent_header, ) -> List[Dict[str, Any]]: # 设置请求头 headers = {"Accept": "application/json"} if wikidata_user_agent_header is not None: headers["User-Agent"] = wikidata_user_agent_header # 发送GET请求 response = requests.get( url, headers=headers, params={"query": query, "format": "json"} ) # 检查响应状态码 if response.status_code != 200: return "That query failed. Perhaps you could try a different one?" # 获取嵌套值 results = get_nested_value(response.json(), ["results", "bindings"]) # 将结果转换为JSON格式并返回 return json.dumps(results) 以上是一个运行SPARQL查询的函数。该函数使用requests库发送GET请求到指定的URL,并将查询结果转换为JSON格式返回。函数的参数包括查询字符串、URL和用户代理头。如果请求失败,函数会返回一个错误消息。 # In[10]: # 导入所需的库 from SPARQLWrapper import SPARQLWrapper, JSON def run_sparql(query): # 创建一个SPARQLWrapper对象 sparql = SPARQLWrapper("https://query.wikidata.org/sparql") # 设置查询语句 sparql.setQuery(query) # 设置返回结果的格式为JSON sparql.setReturnFormat(JSON) # 执行查询并获取结果 results = sparql.query().convert() # 返回结果 return results # 调用run_sparql函数并传入查询语句 run_sparql("SELECT (COUNT(?children) as ?count) WHERE { wd:Q1339 wdt:P40 ?children . }") # # 代理人 # ## 包装工具 # In[11]: 导入所需的模块和类 import re # 导入正则表达式模块 from typing import List, Union # 导入类型提示模块 from langchain.agents import ( # 导入langchain.agents模块中的类 AgentExecutor, # 导入AgentExecutor类 AgentOutputParser, # 导入AgentOutputParser类 LLMSingleActionAgent, # 导入LLMSingleActionAgent类 Tool, # 导入Tool类 ) from langchain.chains import LLMChain # 导入langchain.chains模块中的LLMChain类 from langchain.prompts import StringPromptTemplate # 导入langchain.prompts模块中的StringPromptTemplate类 from langchain_core.agents import AgentAction, AgentFinish # 导入langchain_core.agents模块中的AgentAction和AgentFinish类 # In[12]: # 定义代理可以使用的工具来回答用户查询 tools = [ Tool( name="ItemLookup", func=(lambda x: vocab_lookup(x, entity_type="item")), description="用于在需要知道项目的q编号时很有用", ), Tool( name="PropertyLookup", func=(lambda x: vocab_lookup(x, entity_type="property")), description="用于在需要知道属性的p编号时很有用", ), Tool( name="SparqlQueryRunner", func=run_sparql, description="用于从维基库获取结果很有用", ), ] # ## 提示 # In[13]: # 设置基本模板 template = """ 通过对维基库进行sparql查询来回答以下问题,其中p和q项对你完全未知。在生成sparql之前,你需要先发现p和q项。不要假设你知道任何概念的p和q项。始终使用工具来查找所有的p和q项。 生成sparql后,你应该运行它。结果将以json格式返回。用自然语言总结json结果。 你可以假设以下前缀: PREFIX wd: PREFIX wdt: PREFIX p: PREFIX ps: 在生成sparql时: * 尽量避免使用"count"和"filter"查询,如果可能的话 * 永远不要用反引号括起sparql 你可以使用以下工具: {tools} 使用以下格式: 问题:必须提供自然语言答案的输入问题 思考:你应该始终考虑该做什么 行动:要采取的行动,应该是[{tool_names}]中的一个 行动输入:行动的输入 观察:行动的结果 ...(这个思考/行动/行动输入/观察可以重复N次) 思考:我现在知道最终答案 最终答案:原始输入问题的最终答案 问题:{input} {agent_scratchpad}""" # In[14]: # 设置一个提示模板 class CustomPromptTemplate(StringPromptTemplate): # 要使用的模板 template: str # 可用工具的列表 tools: List[Tool] def format(self, **kwargs) -> str: # 获取中间步骤(AgentAction,Observation元组) # 以特定方式格式化它们 intermediate_steps = kwargs.pop("intermediate_steps") thoughts = "" for action, observation in intermediate_steps: thoughts += action.log thoughts += f"\nObservation: {observation}\nThought: " # 将agent_scratchpad变量设置为该值 kwargs["agent_scratchpad"] = thoughts # 从提供的工具列表创建一个tools变量 kwargs["tools"] = "\n".join( [f"{tool.name}: {tool.description}" for tool in self.tools] ) # 为提供的工具创建一个工具名称列表 kwargs["tool_names"] = ", ".join([tool.name for tool in self.tools]) return self.template.format(**kwargs) # In[15]: # 创建一个自定义提示模板 prompt = CustomPromptTemplate( template=template, # 使用指定的模板 tools=tools, # 使用指定的工具 # 由于`agent_scratchpad`、`tools`和`tool_names`变量是动态生成的,因此在此省略 # 需要包含`intermediate_steps`变量,因为这是必需的 input_variables=["input", "intermediate_steps"], # 指定输入变量 ) # ## 输出解析器 # 这与langchain文档中的内容保持不变 # In[16]: class CustomOutputParser(AgentOutputParser): def parse(self, llm_output: str) -> Union[AgentAction, AgentFinish]: # 检查代理是否应该结束 if "Final Answer:" in llm_output: return AgentFinish( # 返回值通常始终是一个带有单个`output`键的字典 # 目前不建议尝试其他操作 :) return_values={"output": llm_output.split("Final Answer:")[-1].strip()}, log=llm_output, ) # 解析出动作和动作输入 regex = r"Action: (.*?)[\n]*Action Input:[\s]*(.*)" match = re.search(regex, llm_output, re.DOTALL) if not match: raise ValueError(f"无法解析LLM输出:`{llm_output}`") action = match.group(1).strip() action_input = match.group(2) # 返回动作和动作输入 return AgentAction( tool=action, tool_input=action_input.strip(" ").strip('"'), log=llm_output ) # In[17]: # 创建一个名为CustomOutputParser的自定义输出解析器对象 output_parser = CustomOutputParser() # ## 指定LLM模型 # In[18]: # 导入langchain_openai库中的ChatOpenAI类 from langchain_openai import ChatOpenAI # 创建一个ChatOpenAI对象,使用"gpt-4"模型,设置温度为0 llm = ChatOpenAI(model="gpt-4", temperature=0) # ## 代理和代理执行者 # In[19]: # 创建一个LLM链,包含LLM和一个提示 llm_chain = LLMChain(llm=llm, prompt=prompt) # In[20]: # 创建一个包含工具名称的列表 tool_names = [tool.name for tool in tools] # 创建一个LLMSingleActionAgent对象,传入llm_chain、output_parser、stop、allowed_tools参数 agent = LLMSingleActionAgent( llm_chain=llm_chain, # llm_chain参数 output_parser=output_parser, # output_parser参数 stop=["\nObservation:"], # stop参数,当输出中包含"\nObservation:"时停止 allowed_tools=tool_names, # allowed_tools参数,允许使用的工具名称列表 ) # In[21]: # 导入AgentExecutor类 from rasa.core.agent import AgentExecutor # 创建AgentExecutor对象,并传入agent和tools参数,verbose设置为True agent_executor = AgentExecutor.from_agent_and_tools( agent=agent, tools=tools, verbose=True ) # ## 运行它! # In[22]: # 如果您喜欢使用内联追踪,请取消注释以下行 # agent_executor.agent.llm_chain.verbose = True # In[23]: # 导入必要的模块 import agent_executor # 调用agent_executor模块的run函数,并传入参数"How many children did J.S. Bach have?" agent_executor.run("How many children did J.S. Bach have?") # In[24]: # 创建一个agent_executor对象,并调用其run方法,传入一个问题作为参数 agent_executor.run( "Hakeem Olajuwon的Basketball-Reference.com NBA球员ID是多少?" ) # In[ ]: