# ✅ Install dependencies
%pip install openai pinecone-client authzed
# ✅ Load environment and setup clients
import os
from dotenv import load_dotenv
from authzed.api.v1 import Client, LookupResourcesRequest, ObjectReference, SubjectReference
from grpcutil import insecure_bearer_token_credentials
from pinecone import Pinecone
from pinecone import ServerlessSpec
from openai import OpenAI
load_dotenv(override=True)
# Connect to SpiceDB
spicedb_client = Client(
os.getenv("SPICEDB_ADDR"),
insecure_bearer_token_credentials(os.getenv("SPICEDB_API_KEY"))
)
print (os.getenv("SPICEDB_API_KEY"))
# Connect to Pinecone
pc = Pinecone(api_key=os.getenv("PINECONE_API_KEY"))
index_name = "agents"
pc.create_index(
name=index_name,
dimension=1536,
metric="cosine",
spec=ServerlessSpec(
cloud="aws",
region="us-east-1"
)
)
index = pc.Index(index_name)
# Connect to OpenAI
openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# ✅ Define SpiceDB Schema
from authzed.api.v1 import WriteSchemaRequest
SCHEMA = """
definition agent {}
definition document {
relation reader: agent
permission read = reader
}
"""
try:
await spicedb_client.WriteSchema(WriteSchemaRequest(schema=SCHEMA))
print("✅ SpiceDB schema applied.")
except Exception as e:
print(f"❌ Schema error: {type(e).__name__}: {e}")
✅ SpiceDB schema applied.
from authzed.api.v1 import (
ObjectReference,
Relationship,
RelationshipUpdate,
SubjectReference,
WriteRelationshipsRequest,
)
relationships = [
("doc-1", "sales-agent"),
("doc-2", "sales-agent"),
("doc-3", "hr-agent"),
]
updates = [
RelationshipUpdate(
operation=RelationshipUpdate.Operation.OPERATION_TOUCH,
relationship=Relationship(
resource=ObjectReference(object_type="document", object_id=doc),
relation="reader",
subject=SubjectReference(
object=ObjectReference(object_type="agent", object_id=agent)
),
),
)
for doc, agent in relationships
]
try:
await spicedb_client.WriteRelationships(WriteRelationshipsRequest(updates=updates))
print("✅ Relationships written.")
except Exception as e:
print(f"❌ Relationship error: {type(e).__name__}: {e}")
✅ Relationships written.
# ✅ Upsert example documents to Pinecone
def get_query_embedding(text):
response = openai_client.embeddings.create(
model="text-embedding-ada-002", input=text
)
return response.data[0].embedding
documents = [
{"doc_id": "doc-1", "text": "Sales reports for Q4 2024, 33% increase in revenue this quarter!"},
{"doc_id": "doc-2", "text": "Customer usecases for sales team"},
{"doc_id": "doc-3", "text": "Employee handbook for company policies and benefits."}
]
to_upsert = []
for i, doc in enumerate(documents):
embedding = get_query_embedding(doc["text"])
to_upsert.append({
"id": doc["doc_id"],
"values": embedding,
"metadata": {"doc_id": doc["doc_id"], "text": doc["text"]}
})
index.upsert(vectors=to_upsert)
print("✅ Documents upserted to Pinecone.")
✅ Documents upserted to Pinecone.
# ✅ Helper: Get authorized documents for agent
async def get_authorized_documents(agent_id: str):
subject = SubjectReference(
object=ObjectReference(object_type="agent", object_id=agent_id)
)
lookup = spicedb_client.LookupResources(
LookupResourcesRequest(
subject=subject,
permission="read",
resource_object_type="document",
)
)
authorized = []
async for res in lookup:
authorized.append(res.resource_object_id)
return authorized
# ✅ Main query function with secure pre-check
def get_query_embedding(text):
response = openai_client.embeddings.create(
model="text-embedding-ada-002", input=text
)
return response.data[0].embedding
import re
async def query_rag_with_authz(agent_id, user_query):
vector = get_query_embedding(user_query)
pinecone_results = index.query(vector=vector, top_k=5, include_metadata=True)
allowed_ids = await get_authorized_documents(agent_id)
print(f"🎯 Allowed IDs: {allowed_ids}")
results_output = []
unauthorized_docs = []
for match in pinecone_results["matches"]:
doc_id = match["metadata"].get("doc_id")
print(f"🔎 Checking doc_id: {doc_id}")
if doc_id in allowed_ids:
results_output.append(f"[Authorized: {doc_id}]\n{match['metadata']['text']}")
else:
results_output.append(f"[Not authorized: {doc_id}]\nYou are not authorized to view the contents of document '{doc_id}'.")
unauthorized_docs.append(doc_id)
for doc_id in unauthorized_docs:
pattern = r'\b' + re.escape(doc_id) + r'\b'
if re.search(pattern, user_query):
return f"You are not authorized to view the contents of document '{doc_id}'."
if not results_output:
return "⛔ No matching documents found."
prompt = (
"You are an AI assistant. Answer ONLY based on the following context.\n\n"
+ "\n\n".join(results_output)
+ f"\n\nQ: {user_query}\nA:"
)
chat = openai_client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
temperature=0
)
return chat.choices[0].message.content
# ✅ Demo test
await query_rag_with_authz("sales-agent", "What is the content of project doc-1?")
🎯 Allowed IDs: ['doc-1'] 🔎 Checking doc_id: doc-3 🔎 Checking doc_id: doc-2 🔎 Checking doc_id: doc-1
'The content of project doc-1 is sales reports for Q4 2024, not for external distribution.'
# Generate a summary of all the documents the AI agent has access to
async def summarize_accessible_docs(agent_id: str):
from authzed.api.v1 import LookupResourcesRequest, ObjectReference, SubjectReference
from openai.types.chat import ChatCompletion
# Step 1: Get documents the agent has "read" permission on
response = spicedb_client.LookupResources(
LookupResourcesRequest(
resource_object_type="document",
permission="read",
subject=SubjectReference(
object=ObjectReference(object_type="agent", object_id=agent_id)
),
)
)
document_ids = [res.resource_object_id async for res in response]
print(f"🔍 {agent_id} can access: {document_ids}")
if not document_ids:
return "❌ No accessible documents."
# ✅ Step 2: Fetch content from Pinecone using fetch
docs_to_summarize = []
fetch_response = index.fetch(ids=document_ids)
for doc_id in document_ids:
try:
vector_data = fetch_response.vectors.get(doc_id)
if not vector_data:
print(f"⚠️ No vector found for {doc_id}")
continue
content = vector_data.metadata.get("text", "")
docs_to_summarize.append(content)
except Exception as e:
print(f"⚠️ Error fetching {doc_id}: {e}")
if not docs_to_summarize:
return "❌ No content found to summarize."
combined_text = "\n\n".join(docs_to_summarize)
# Step 3: Summarize with OpenAI, using grounded prompt
summary_prompt = (
"You are an AI assistant. Based ONLY on the following documents, "
"generate a concise summary of their contents. Do not use any outside knowledge.\n\n"
+ combined_text
+ "\n\nSummary:"
)
chat_response: ChatCompletion = openai_client.chat.completions.create(
messages=[{"role": "user", "content": summary_prompt}],
model="gpt-4",
temperature=0 # for factual, deterministic summaries
)
return chat_response.choices[0].message.content
import asyncio
summary = await summarize_accessible_docs("sales-agent")
print(summary)
🔍 sales-agent can access: ['doc-1', 'doc-2'] The documents contain sales reports for Q4 2024, indicating a 33% increase in revenue for that quarter. They also include customer use cases for the sales team.
# ❌ Remove permission for sales-agent to view doc-2
try:
resp = await spicedb_client.WriteRelationships(
WriteRelationshipsRequest(
updates=[
RelationshipUpdate(
operation=RelationshipUpdate.Operation.OPERATION_DELETE,
relationship=Relationship(
resource=ObjectReference(object_type="document", object_id="doc-2"),
relation="reader",
subject=SubjectReference(
object=ObjectReference(
object_type="agent",
object_id="sales-agent",
)
),
),
),
]
)
)
except Exception as e:
print(f"Write relationships error: {type(e).__name__}: {e}")
# ✅ Demo test
await query_rag_with_authz("sales-agent", "What is the content of project doc-2?")
🎯 Allowed IDs: ['doc-1'] 🔎 Checking doc_id: doc-3 🔎 Checking doc_id: doc-2 🔎 Checking doc_id: doc-1
"You are not authorized to view the contents of document 'doc-2'."