#!/usr/bin/env python # coding: utf-8 # In[ ]: # ✅ Install dependencies get_ipython().run_line_magic('pip', 'install openai pinecone-client authzed') # In[ ]: # ✅ Load environment and setup clients import os from dotenv import load_dotenv from authzed.api.v1 import Client, LookupResourcesRequest, ObjectReference, SubjectReference from grpcutil import insecure_bearer_token_credentials from pinecone import Pinecone from pinecone import ServerlessSpec from openai import OpenAI load_dotenv(override=True) # In[ ]: # Connect to SpiceDB spicedb_client = Client( os.getenv("SPICEDB_ADDR"), insecure_bearer_token_credentials(os.getenv("SPICEDB_API_KEY")) ) print (os.getenv("SPICEDB_API_KEY")) # Connect to Pinecone pc = Pinecone(api_key=os.getenv("PINECONE_API_KEY")) index_name = "agents" pc.create_index( name=index_name, dimension=1536, metric="cosine", spec=ServerlessSpec( cloud="aws", region="us-east-1" ) ) index = pc.Index(index_name) # Connect to OpenAI openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) # In[ ]: # ✅ Define SpiceDB Schema from authzed.api.v1 import WriteSchemaRequest SCHEMA = """ definition agent {} definition document { relation reader: agent permission read = reader } """ try: await spicedb_client.WriteSchema(WriteSchemaRequest(schema=SCHEMA)) print("✅ SpiceDB schema applied.") except Exception as e: print(f"❌ Schema error: {type(e).__name__}: {e}") # In[ ]: from authzed.api.v1 import ( ObjectReference, Relationship, RelationshipUpdate, SubjectReference, WriteRelationshipsRequest, ) relationships = [ ("doc-1", "sales-agent"), ("doc-2", "sales-agent"), ("doc-3", "hr-agent"), ] updates = [ RelationshipUpdate( operation=RelationshipUpdate.Operation.OPERATION_TOUCH, relationship=Relationship( resource=ObjectReference(object_type="document", object_id=doc), relation="reader", subject=SubjectReference( object=ObjectReference(object_type="agent", object_id=agent) ), ), ) for doc, agent in relationships ] try: await spicedb_client.WriteRelationships(WriteRelationshipsRequest(updates=updates)) print("✅ Relationships written.") except Exception as e: print(f"❌ Relationship error: {type(e).__name__}: {e}") # In[ ]: # ✅ Upsert example documents to Pinecone def get_query_embedding(text): response = openai_client.embeddings.create( model="text-embedding-ada-002", input=text ) return response.data[0].embedding documents = [ {"doc_id": "doc-1", "text": "Sales reports for Q4 2024, 33% increase in revenue this quarter!"}, {"doc_id": "doc-2", "text": "Customer usecases for sales team"}, {"doc_id": "doc-3", "text": "Employee handbook for company policies and benefits."} ] to_upsert = [] for i, doc in enumerate(documents): embedding = get_query_embedding(doc["text"]) to_upsert.append({ "id": doc["doc_id"], "values": embedding, "metadata": {"doc_id": doc["doc_id"], "text": doc["text"]} }) index.upsert(vectors=to_upsert) print("✅ Documents upserted to Pinecone.") # In[ ]: # ✅ Helper: Get authorized documents for agent async def get_authorized_documents(agent_id: str): subject = SubjectReference( object=ObjectReference(object_type="agent", object_id=agent_id) ) lookup = spicedb_client.LookupResources( LookupResourcesRequest( subject=subject, permission="read", resource_object_type="document", ) ) authorized = [] async for res in lookup: authorized.append(res.resource_object_id) return authorized # ✅ Main query function with secure pre-check def get_query_embedding(text): response = openai_client.embeddings.create( model="text-embedding-ada-002", input=text ) return response.data[0].embedding # In[ ]: import re async def query_rag_with_authz(agent_id, user_query): vector = get_query_embedding(user_query) pinecone_results = index.query(vector=vector, top_k=5, include_metadata=True) allowed_ids = await get_authorized_documents(agent_id) print(f"🎯 Allowed IDs: {allowed_ids}") results_output = [] unauthorized_docs = [] for match in pinecone_results["matches"]: doc_id = match["metadata"].get("doc_id") print(f"🔎 Checking doc_id: {doc_id}") if doc_id in allowed_ids: results_output.append(f"[Authorized: {doc_id}]\n{match['metadata']['text']}") else: results_output.append(f"[Not authorized: {doc_id}]\nYou are not authorized to view the contents of document '{doc_id}'.") unauthorized_docs.append(doc_id) for doc_id in unauthorized_docs: pattern = r'\b' + re.escape(doc_id) + r'\b' if re.search(pattern, user_query): return f"You are not authorized to view the contents of document '{doc_id}'." if not results_output: return "⛔ No matching documents found." prompt = ( "You are an AI assistant. Answer ONLY based on the following context.\n\n" + "\n\n".join(results_output) + f"\n\nQ: {user_query}\nA:" ) chat = openai_client.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": prompt}], temperature=0 ) return chat.choices[0].message.content # In[ ]: # ✅ Demo test await query_rag_with_authz("sales-agent", "What is the content of project doc-1?") # In[ ]: # Generate a summary of all the documents the AI agent has access to async def summarize_accessible_docs(agent_id: str): from authzed.api.v1 import LookupResourcesRequest, ObjectReference, SubjectReference from openai.types.chat import ChatCompletion # Step 1: Get documents the agent has "read" permission on response = spicedb_client.LookupResources( LookupResourcesRequest( resource_object_type="document", permission="read", subject=SubjectReference( object=ObjectReference(object_type="agent", object_id=agent_id) ), ) ) document_ids = [res.resource_object_id async for res in response] print(f"🔍 {agent_id} can access: {document_ids}") if not document_ids: return "❌ No accessible documents." # ✅ Step 2: Fetch content from Pinecone using fetch docs_to_summarize = [] fetch_response = index.fetch(ids=document_ids) for doc_id in document_ids: try: vector_data = fetch_response.vectors.get(doc_id) if not vector_data: print(f"⚠️ No vector found for {doc_id}") continue content = vector_data.metadata.get("text", "") docs_to_summarize.append(content) except Exception as e: print(f"⚠️ Error fetching {doc_id}: {e}") if not docs_to_summarize: return "❌ No content found to summarize." combined_text = "\n\n".join(docs_to_summarize) # Step 3: Summarize with OpenAI, using grounded prompt summary_prompt = ( "You are an AI assistant. Based ONLY on the following documents, " "generate a concise summary of their contents. Do not use any outside knowledge.\n\n" + combined_text + "\n\nSummary:" ) chat_response: ChatCompletion = openai_client.chat.completions.create( messages=[{"role": "user", "content": summary_prompt}], model="gpt-4", temperature=0 # for factual, deterministic summaries ) return chat_response.choices[0].message.content # In[ ]: import asyncio summary = await summarize_accessible_docs("sales-agent") print(summary) # 🔍 sales-agent can access: ['doc-1', 'doc-2'] # The documents contain sales reports for Q4 2024, indicating a 33% increase in revenue for that quarter. They also include customer use cases for the sales team. # In[ ]: # ❌ Remove permission for sales-agent to view doc-2 try: resp = await spicedb_client.WriteRelationships( WriteRelationshipsRequest( updates=[ RelationshipUpdate( operation=RelationshipUpdate.Operation.OPERATION_DELETE, relationship=Relationship( resource=ObjectReference(object_type="document", object_id="doc-2"), relation="reader", subject=SubjectReference( object=ObjectReference( object_type="agent", object_id="sales-agent", ) ), ), ), ] ) ) except Exception as e: print(f"Write relationships error: {type(e).__name__}: {e}") # In[ ]: # ✅ Demo test await query_rag_with_authz("sales-agent", "What is the content of project doc-2?")