#!/usr/bin/env python
# coding: utf-8
# # Querying
#
# This notebook demonstrates Nexus Forge data [querying features](https://nexus-forge.readthedocs.io/en/latest/interaction.html#querying).
# In[1]:
from kgforge.core import KnowledgeGraphForge
# A configuration file is needed in order to create a KnowledgeGraphForge session. A configuration can be generated using the notebook [00-Initialization.ipynb](00%20-%20Initialization.ipynb).
# In[2]:
forge = KnowledgeGraphForge("../../configurations/forge.yml")
# ## Imports
# In[3]:
from kgforge.core import Resource
from kgforge.specializations.resources import Dataset
from kgforge.core.wrappings.paths import Filter, FilterOperator
# ## Retrieval
# ### latest version
# In[4]:
jane = Resource(type="Person", name="Jane Doe", award=["Nobel"])
# In[5]:
forge.register(jane)
# In[6]:
resource = forge.retrieve(jane.id)
# In[7]:
resource == jane
# ### specific version
# In[8]:
jane = Resource(type="Person", name="Jane Doe", award=["Nobel"])
# In[9]:
forge.register(jane)
# In[10]:
forge.tag(jane, "v1")
# In[11]:
jane.email = ["jane.doe@epfl.ch", "jane.doe@example.org"]
# In[12]:
forge.update(jane)
# In[13]:
try:
# DemoStore
print(jane._store_metadata.version)
except:
# BlueBrainNexus
print(jane._store_metadata._rev)
# In[14]:
jane_v1 = forge.retrieve(jane.id, version=1)
# In[15]:
jane_v1_tag = forge.retrieve(jane.id, version="v1")
# In[16]:
jane_v1_rev = forge.retrieve(jane.id+"?rev=1")
# In[17]:
jane_v1 == jane_v1_tag
# In[18]:
jane_v1 == jane_v1_rev
# In[19]:
jane_v1 != jane
# In[20]:
try:
# DemoStore
print(jane_v1._store_metadata.version)
except:
# BlueBrainNexus
print(jane_v1._store_metadata._rev)
# ### crossbucket retrieval
# It is possible to retrieve resources stored in buckets different then the configured one. The configured store should of course support it.
# In[21]:
resource = forge.retrieve(jane.id, cross_bucket=True) # cross_bucket defaults to False
# In[22]:
resource._store_metadata
# In[23]:
resource._last_action
# In[24]:
resource._synchronized
# ### Retrieving using the resource url
# One can also use the value of `_self` from ._stote_metadata to retrieve a resource
# In[25]:
import copy
import string
import random
# In[26]:
other_resource = copy.deepcopy(resource)
# In[27]:
other_resource.id = f"https://my-incredible-id-{''.join(random.choices(string.digits, k=5))}"
# In[28]:
forge.register(other_resource)
# In[29]:
url = other_resource._store_metadata['_self']
# In[30]:
same_resource_url = forge.retrieve(id=url)
same_resource_id = forge.retrieve(id=other_resource.id)
# Confirm they are the same
# In[31]:
same_resource_id == same_resource_url
# ### Original source retrieval
# When using BlueBrainNexusStore, it is possible to retrieve resources' payload as they were registered (retrieve_source=True) without any changes related to store added metadata or JSONLD framing.
# In[32]:
resource = forge.retrieve(jane.id, retrieve_source=False) # retrieve_source defaults to True
# In[33]:
forge.as_json(resource)
# In[34]:
resource._store_metadata
# In[35]:
resource._last_action
# In[36]:
resource._synchronized
# ### error handling
# In[37]:
resource = forge.retrieve("123")
# In[38]:
resource is None
# ## Searching
# Note: DemoModel and RdfModel schemas have not been synchronized yet. This section is to be run with RdfModel. Commented lines are for DemoModel.
# In[39]:
jane = Resource(type="Person", name="Jane Doe")
contribution_jane = Resource(type="Contribution", agent=jane)
# In[40]:
john = Resource(type="Person", name="John Smith")
contribution_john = Resource(type="Contribution", agent=john)
# In[41]:
dataset = Dataset(forge, type="Dataset", contribution=[contribution_jane, contribution_john])
dataset.add_distribution("../../data/associations.tsv")
# In[42]:
forge.register(dataset)
# In[43]:
forge.as_json(dataset)
# ### Using resource paths as filters
# The `paths` method load the template or property paths (ie. expected properties) for a given type.
#
# Please refer to the [Modeling.ipynb](11%20-%20Modeling.ipynb) notebook to learn about templates and types.
# In[44]:
p = forge.paths("Dataset")
# Autocompletion is enabled on `p` and this can be used to create search filters.
# Note: There is a known issue for RdfModel which requires using `p.type.id` instead of `p.type`.
# All [python comparison operators](https://www.w3schools.com/python/gloss_python_comparison_operators.asp) are supported.
# In[45]:
resources = forge.search(p.type.id=="Person", limit=3)
# In[46]:
type(resources)
# In[47]:
len(resources)
# In[48]:
forge.as_dataframe(resources)
# In[49]:
forge.as_dataframe(resources, store_metadata=True)
# In[50]:
# Search results are not synchronized
resources[0]._synchronized
# #### Using nested resource property
# Property autocompletion is available on a path `p` even for nested properties like `p.contribution`.
# In[51]:
# Search for resources of type Dataset and with text/tab-separated-values as distribution.encodingFormat
resources = forge.search(p.type.id == "Dataset", p.distribution.encodingFormat == "text/tab-separated-values", limit=3)
# In[52]:
len(resources)
# In[53]:
forge.as_dataframe(resources)
# ### Using dictionaries as filters
# A dictionary can be provided for filters:
# * {'type': {'id':'Dataset'}} is equivalent to p.type.id=="Dataset"
# * only the '==' operator is supported
# * nested dict are supported
# * it is not mandatory for the provided properties and values to be defined in the forge model. Results will be retrieved if there are corresponding data in the store.
#
# This feature is not supported when using the DemoStore
#
# In[54]:
# Search for resources of type Dataset and with text/tab-separated-values as distribution.encodingFormat
# and created a given dateTime (by default, dateTime values should be signaled by the suffix "^^xsd:dateTime")
filters = {
"type": "Dataset",
"distribution":{"encodingFormat":"text/tab-separated-values"},
"_createdAt":dataset._store_metadata._createdAt+"^^xsd:dateTime"
}
resources = forge.search(filters, limit=3)
# In[55]:
type(resources)
# In[56]:
len(resources)
# In[57]:
forge.as_dataframe(resources, store_metadata=True)
# ### Using built-in Filter objects
# #### Supported filter operators
# In[58]:
[f"{op.value} ({op.name})" for op in FilterOperator] # These are equivalent to the Python comparison operators
# In[59]:
# Search for resources of type Dataset and with text/tab-separated-values as distribution.encodingFormat
# and created a given dateTime (dateTime values should be signaled by the suffix "^^xsd:dateTime")
filter_1 = Filter(operator=FilterOperator.EQUAL, path=["type"], value="Dataset")
filter_2 = Filter(operator=FilterOperator.EQUAL, path=["distribution","encodingFormat"], value="text/tab-separated-values")
filter_3 = Filter(operator=FilterOperator.LOWER_OR_Equal_Than, path=["_createdAt"], value=dataset._store_metadata._createdAt+"^^xsd:dateTime")
resources = forge.search(filter_1, filter_2, filter_3, limit=3)
# In[60]:
type(resources)
# In[61]:
len(resources)
# In[62]:
forge.as_dataframe(resources, store_metadata=True)
# ### Using search endpoints
#
# Two types of search endpoints are supported: 'sparql' (default) for graph queries and 'elastic' for document oriented queries. The types of available search endpoint can be configured (see [00-Initialization.ipynb](00%20-%20Initialization.ipynb) for an example of search endpoints config) or set when creating a KnowledgeGraphForge session using the 'searchendpoints' arguments.
#
# The search endpoint to hit when calling forge.search(...) is 'sparql' by default but can be specified using the 'search_endpoint' argument.
#
# The data that is available through these searchendpoints is limited to data indexed in specific indices that are configured through views. The project is set-up with default a elastic search view that indexes all data, and a default sparql view that indexes all data.
# You may define a view that targets a subset of the data based on some filters.
# #### SPARQL Search Endpoint
# In[63]:
# Search for resources of type Person
filters = {"type": "Person"}
resources = forge.search(filters, limit=3, search_endpoint='sparql')
# In[64]:
type(resources)
# In[65]:
len(resources)
# In[66]:
forge.as_dataframe(resources, store_metadata=True)
# #### ElasticSearch Endpoint
# In[67]:
# Search for resources of type Person and retrieve their ids and names.
filters = {"@type": "http://schema.org/Person"}
resources = forge.search(
filters, limit=3, search_endpoint='elastic', includes=["@id", "@type"]
) # fields can also be excluded with 'excludes'
# In[68]:
type(resources)
# In[69]:
len(resources)
# In[70]:
forge.as_dataframe(resources, store_metadata=True)
# In[71]:
# Search results are not synchronized
resources[0]._synchronized
# In[72]:
resources[0].id
# In[73]:
resources[0].type
# #### ElasticSearch Endpoint - Alternative view
# - Create a view that only indexes resources of type http://schema.org/Person
# In[74]:
import requests
payload = {
"@type": ["View", "ElasticSearchView"],
"pipeline": [{
"name": "filterByType",
"config": {"types": ["http://schema.org/Person"]}
}],
"mapping": {
"dynamic": True,
"properties": {
"@id": {
"type": "keyword"
},
"@type": {
"type": "keyword"
}
}
}
}
url = f"{forge._store.endpoint}/views/{forge._store.bucket}"
headers = {
"mode": "cors",
"Content-Type": "application/json",
"Accept": "application/ld+json, application/json",
"Authorization": "Bearer " + forge._store.token
}
response = requests.post(url=url, headers=headers, json=payload)
# In[75]:
view_id = response.json()["@id"]
view_id
# - Query the view (may take a few seconds to get a response due to indexing time)
# In[76]:
result_1 = forge.search({"@type": "Dataset"}, search_endpoint='elastic', view=view_id, cross_bucket=True, debug=True)
result_2 = forge.search({"@type": "Person"}, search_endpoint='elastic', view=view_id, cross_bucket=True, debug=True)
len(result_1), len(result_2)
# Providing a view is feature that is also available through the forge.elastic, and forge.sparql calls
# ### Crossbucket search
# It is possible to search for resources stored in buckets different than the configured one. The configured store should of course support it.
# In[77]:
resources = forge.search(p.type.id == "Association", limit=3, cross_bucket=True) # cross_bucket defaults to False
# In[78]:
type(resources)
# In[79]:
len(resources)
# In[80]:
forge.as_dataframe(resources)
# In[81]:
#Furthermore it is possible to filter by bucket when cross_bucket is set to True. Setting a bucket value when cross_bucket is False will trigger a not_supported exception.
resources = forge.search(p.type.id == "Person", limit=3, cross_bucket=True, bucket="dke/kgforge") # add a bucket
# In[82]:
type(resources)
# In[83]:
len(resources)
# In[84]:
forge.as_dataframe(resources)
# ### Searching original source
# When using BlueBrainNexusStore, it is possible to retrieve resources' payload as they were registered (retrieve_source=True) without any changes related to store added metadata or JSONLD framing.
# In[85]:
resources = forge.search(p.type.id == "Association", limit=3, retrieve_source=False) # retrieve_source defaults to True
# In[86]:
type(resources)
# In[87]:
len(resources)
# In[88]:
forge.as_dataframe(resources)
# ## Graph traversing
#
# SPARQL is used as a query language to perform graph traversing.
#
# Nexus Forge implements a SPARQL query rewriting strategy leveraging a configured RDFModel that lets users write SPARQL queries without adding prefix declarations, prefix names or long IRIs. With this strategy, only type and property names can be provided.
#
# Please refer to the [Modeling.ipynb](11%20-%20Modeling.ipynb) notebook to learn about templates.
# Note: DemoStore doesn't implement SPARQL operations yet. Please use another store for this section.
# Note: DemoModel and RdfModel schemas have not been synchronized yet. This section is to be run with RdfModel.
# In[89]:
jane = Resource(type="Person", name="Jane Doe")
contribution_jane = Resource(type="Contribution", agent=jane)
# In[90]:
john = Resource(type="Person", name="John Smith")
contribution_john = Resource(type="Contribution", agent=john)
# In[91]:
association = Resource(type="Dataset", contribution=[contribution_jane, contribution_john])
# In[92]:
forge.register(association)
# In[93]:
forge.template("Dataset") # Templates help know which property to use when writing a query to serach for a given type
# ### Prefix and namespace free SPARQL query
#
# When a forge RDFModel is configured, then there is no need to provide prefixes and namespaces when writing a SPARQL query. Prefixes and namespaces will be automatically inferred from the provided schemas and/or JSON-LD context and the query rewritten accordingly.
# In[94]:
query = """
SELECT ?id ?name ?contributor
WHERE {
?id a Dataset ;
contribution/agent ?contributor.
?contributor name ?name.
}
"""
# In[95]:
resources = forge.sparql(query, limit=3)
# In[96]:
type(resources)
# In[97]:
len(resources)
# In[98]:
print(resources[0])
# In[99]:
forge.as_dataframe(resources)
# ### display rewritten SPARQL query
# In[100]:
resources = forge.sparql(query, limit=3, debug=True)
# ### Full SPARQL query
#
# Regular SPARQL query can also be provided. When provided, the limit and offset arguments superseed any in query limit or offset values.
# In[101]:
query = """
PREFIX dc:
PREFIX dcat:
PREFIX dcterms:
PREFIX mba:
PREFIX nsg:
PREFIX owl:
PREFIX prov:
PREFIX rdf:
PREFIX rdfs:
PREFIX schema:
PREFIX sh:
PREFIX shsh:
PREFIX skos:
PREFIX vann:
PREFIX void:
PREFIX xsd:
PREFIX :
SELECT ?id ?name
WHERE {
?id a schema:Dataset ;
nsg:contribution/prov:agent ?contributor.
?contributor schema:name ?name.
}
ORDER BY ?id
LIMIT 1
OFFSET 0
"""
# In[102]:
# it is recommended to set 'rewrite' to 'False' to prevent the sparql query rewriting when a syntactically correct SPARQL query is provided.
resources = forge.sparql(query, rewrite=False, limit=3, offset=1, debug=True)
# In[103]:
type(resources)
# In[104]:
len(resources)
# In[105]:
type(resources[0])
# In[106]:
forge.as_dataframe(resources)
# ### Avoid rewriting the query
# To not assign any limit or offset, one can pass `None` to those parameters
# In[107]:
query_without_limit = """
PREFIX dc:
PREFIX dcat:
PREFIX dcterms:
PREFIX mba:
PREFIX nsg:
PREFIX owl:
PREFIX prov:
PREFIX rdf:
PREFIX rdfs:
PREFIX schema:
PREFIX sh:
PREFIX shsh:
PREFIX skos:
PREFIX vann:
PREFIX void:
PREFIX xsd:
PREFIX :
SELECT ?id ?name
WHERE {
?id a schema:Dataset ;
nsg:contribution/prov:agent ?contributor.
?contributor schema:name ?name.
}
ORDER BY ?id
"""
# In[108]:
resources = forge.sparql(query_without_limit, rewrite=False, limit=None, offset=None, debug=True)
# In[109]:
len(resources)
# If you only want to add the context, but keep the content of the query the same, you need to set the `rewrite` parameter to `False`
# In[110]:
query_without_context = """
SELECT ?id ?name ?contributor
WHERE {
?id a Dataset ;
contribution/agent ?contributor.
?contributor name ?name.
}
"""
# In[111]:
resources = forge.sparql(query_without_context, limit=None, debug=True)
# In[112]:
len(resources)
# ## ElasticSearch DSL Query
#
# ElasticSearch DSL can be used as a query language search for resources provided that the configured store supports it. The 'BlueBrainNexusStore' supports ElasticSearch.
# Note: DemoStore doesn't implement ElasaticSearch DSL operations.
# In[113]:
jane = Resource(type="Person", name="Jane Doe")
contribution_jane = Resource(type="Contribution", agent=jane)
# In[114]:
john = Resource(type="Person", name="John Smith")
contribution_john = Resource(type="Contribution", agent=john)
# In[115]:
association = Resource(type="Dataset", contribution=[contribution_jane, contribution_john])
# In[116]:
forge.register(association)
# ### Plain ElasticSearch DSL
# In[117]:
query = """
{
"_source": {
"includes": [
"@id",
"name"
]
},
"query": {
"term": {
"@type": "http://schema.org/Dataset"
}
}
}
"""
# In[118]:
# limit and offset (when provided in this method call) superseed 'size' and 'from' values provided in the query
resources = forge.elastic(query, limit=3)
# In[119]:
type(resources)
# In[120]:
len(resources)
# In[121]:
type(resources[0])
# In[122]:
forge.as_dataframe(resources)
# ### ElasticSearch results as dictionaries
# In[136]:
resources_2 = forge.elastic(query, limit=3, as_resource=False)
# In[137]:
type(resources_2[0])
# In[138]:
resources_2[0]
# ## Downloading
# Note: DemoStore doesn't implement file operations yet. Please use another store for this section.
# In[123]:
jane = Resource(type="Person", name="Jane Doe")
# In[124]:
get_ipython().system(' ls -p ../../data | egrep -v /$')
# In[125]:
distribution = forge.attach("../../data")
# In[126]:
association = Resource(type="Association", agent=jane, distribution=distribution)
# In[127]:
forge.register(association)
# In[128]:
# By default, the downladable file urls are collected from the json path "distribution.contentUrl" (follow="distribution.contentUrl") and
# the files are downloaded in the current path (path=".").
# The argument overwrite: bool can be provided to decide whether to overwrite (True) existing files with the same name or
# to create new ones (False) with their names suffixed with a timestamp.
# A cross_bucket argument can be provided to download data from the configured bucket (cross_bucket=False - the default value)
# or from a bucket different than the configured one (cross_bucket=True). The configured store should support crossing buckets for this to work.
forge.download(association)
# In[129]:
# Specific content type can be downloaded.
forge.download(association, content_type="text/tab-separated-values")
# In[130]:
# The urls or the files to download can be collected from a different json path (by setting a value for "follow") and
# the files downloaded to a different path (by setting a value for "path")
forge.download(association, follow="distribution.contentUrl", path="./downloaded/")
# In[131]:
get_ipython().system(' ls -l ./downloaded/')
# In[132]:
#! rm -R ./downloaded/