This notebook demonstrates Nexus Forge data querying features.
from kgforge.core import KnowledgeGraphForge
A configuration file is needed in order to create a KnowledgeGraphForge session. A configuration can be generated using the notebook 00-Initialization.ipynb.
forge = KnowledgeGraphForge("../../configurations/forge.yml")
--------------------------------------------------------------------------- Exception Traceback (most recent call last) Cell In[2], line 1 ----> 1 forge = KnowledgeGraphForge("../../configurations/forge.yml") File ~/work_dir/nexus-forge/venv/lib/python3.8/site-packages/kgforge/core/forge.py:249, in KnowledgeGraphForge.__init__(self, configuration, **kwargs) 246 resolvers_config = config.pop("Resolvers", None) 247 # Format: Optional[Dict[scope_name, Dict[resolver_name, Resolver]]]. 248 self._resolvers: Optional[Dict[str, Dict[str, Resolver]]] = ( --> 249 prepare_resolvers(resolvers_config, store_config) 250 if resolvers_config 251 else None 252 ) 254 # Formatters. 255 self._formatters: Optional[Dict[str, str]] = config.pop("Formatters", None) File ~/work_dir/nexus-forge/venv/lib/python3.8/site-packages/kgforge/core/forge.py:958, in prepare_resolvers(config, store_config) 955 def prepare_resolvers( 956 config: Dict, store_config: Dict 957 ) -> Dict[str, Dict[str, Resolver]]: --> 958 return { 959 scope: dict(prepare_resolver(x, store_config) for x in configs) 960 for scope, configs in config.items() 961 } File ~/work_dir/nexus-forge/venv/lib/python3.8/site-packages/kgforge/core/forge.py:959, in <dictcomp>(.0) 955 def prepare_resolvers( 956 config: Dict, store_config: Dict 957 ) -> Dict[str, Dict[str, Resolver]]: 958 return { --> 959 scope: dict(prepare_resolver(x, store_config) for x in configs) 960 for scope, configs in config.items() 961 } File ~/work_dir/nexus-forge/venv/lib/python3.8/site-packages/kgforge/core/forge.py:959, in <genexpr>(.0) 955 def prepare_resolvers( 956 config: Dict, store_config: Dict 957 ) -> Dict[str, Dict[str, Resolver]]: 958 return { --> 959 scope: dict(prepare_resolver(x, store_config) for x in configs) 960 for scope, configs in config.items() 961 } File ~/work_dir/nexus-forge/venv/lib/python3.8/site-packages/kgforge/core/forge.py:982, in prepare_resolver(config, store_config) 980 resolver_name = config.pop("resolver") 981 resolver = import_class(resolver_name, "resolvers") --> 982 return resolver.__name__, resolver(**config) File ~/work_dir/nexus-forge/venv/lib/python3.8/site-packages/kgforge/specializations/resolvers/demo_resolver.py:33, in DemoResolver.__init__(self, source, targets, result_resource_mapping, **source_config) 31 def __init__(self, source: str, targets: List[Dict[str, Any]], result_resource_mapping: str, 32 **source_config) -> None: ---> 33 super().__init__(source, targets, result_resource_mapping, **source_config) File ~/work_dir/nexus-forge/venv/lib/python3.8/site-packages/kgforge/core/archetypes/resolver.py:53, in Resolver.__init__(self, source, targets, result_resource_mapping, **source_config) 51 filters = None 52 self.targets[target["identifier"]] = {"bucket": target["bucket"], "filters": filters} ---> 53 self.result_mapping: Any = self.mapping.load(result_resource_mapping) 54 self.service: Any = self._initialize_service(self.source, self.targets, **source_config) File ~/work_dir/nexus-forge/venv/lib/python3.8/site-packages/kgforge/core/archetypes/mapping.py:69, in Mapping.load(cls, source, mapping_type) 67 if e is not None: 68 return e ---> 69 raise Exception("Mapping loading failed") 71 if mapping_type == MappingType.FILE: 72 return cls.load_file(source) Exception: Mapping loading failed
from kgforge.core import Resource
from kgforge.specializations.resources import Dataset
from kgforge.core.wrappings.paths import Filter, FilterOperator
jane = Resource(type="Person", name="Jane Doe", award=["Nobel"])
forge.register(jane)
resource = forge.retrieve(jane.id)
resource == jane
jane = Resource(type="Person", name="Jane Doe", award=["Nobel"])
forge.register(jane)
forge.tag(jane, "v1")
jane.email = ["jane.doe@epfl.ch", "jane.doe@example.org"]
forge.update(jane)
try:
# DemoStore
print(jane._store_metadata.version)
except:
# BlueBrainNexus
print(jane._store_metadata._rev)
jane_v1 = forge.retrieve(jane.id, version=1)
jane_v1_tag = forge.retrieve(jane.id, version="v1")
jane_v1_rev = forge.retrieve(jane.id+"?rev=1")
jane_v1 == jane_v1_tag
jane_v1 == jane_v1_rev
jane_v1 != jane
try:
# DemoStore
print(jane_v1._store_metadata.version)
except:
# BlueBrainNexus
print(jane_v1._store_metadata._rev)
It is possible to retrieve resources stored in buckets different then the configured one. The configured store should of course support it.
resource = forge.retrieve(jane.id, cross_bucket=True) # cross_bucket defaults to False
resource._store_metadata
resource._last_action
resource._synchronized
One can also use the value of _self
from ._stote_metadata to retrieve a resource
import copy
other_resource = copy.deepcopy(resource)
other_resource.id = "https://myincreadibleid-987654321"
forge.register(other_resource)
url = other_resource._store_metadata['_self']
same_resource_url = forge.retrieve(id=url)
same_resource_id = forge.retrieve(id=other_resource.id)
Confirm they are the same
same_resource_id == same_resource_url
When using BlueBrainNexusStore, it is possible to retrieve resources' payload as they were registered (retrieve_source=True) without any changes related to store added metadata or JSONLD framing.
resource = forge.retrieve(jane.id, retrieve_source=False) # retrieve_source defaults to True
forge.as_json(resource)
resource._store_metadata
resource._last_action
resource._synchronized
resource = forge.retrieve("123")
resource is None
Note: DemoModel and RdfModel schemas have not been synchronized yet. This section is to be run with RdfModel. Commented lines are for DemoModel.
jane = Resource(type="Person", name="Jane Doe")
contribution_jane = Resource(type="Contribution", agent=jane)
john = Resource(type="Person", name="John Smith")
contribution_john = Resource(type="Contribution", agent=john)
dataset = Dataset(forge, type="Dataset", contribution=[contribution_jane, contribution_john])
dataset.add_distribution("../../data/associations.tsv")
forge.register(dataset)
forge.as_json(dataset)
The paths
method load the template or property paths (ie. expected properties) for a given type.
Please refer to the Modeling.ipynb notebook to learn about templates and types.
p = forge.paths("Dataset")
Autocompletion is enabled on p
and this can be used to create search filters.
Note: There is a known issue for RdfModel which requires using p.type.id
instead of p.type
.
All python comparison operators are supported.
resources = forge.search(p.type.id=="Person", limit=3)
type(resources)
len(resources)
forge.as_dataframe(resources)
forge.as_dataframe(resources, store_metadata=True)
# Search results are not synchronized
resources[0]._synchronized
Property autocompletion is available on a path p
even for nested properties like p.contribution
.
# Search for resources of type Dataset and with text/tab-separated-values as distribution.encodingFormat
resources = forge.search(p.type.id == "Dataset", p.distribution.encodingFormat == "text/tab-separated-values", limit=3)
len(resources)
forge.as_dataframe(resources)
A dictionary can be provided for filters:
This feature is not supported when using the DemoStore
# Search for resources of type Dataset and with text/tab-separated-values as distribution.encodingFormat
# and created a given dateTime (by default, dateTime values should be signaled by the suffix "^^xsd:dateTime")
filters = {
"type": "Dataset",
"distribution":{"encodingFormat":"text/tab-separated-values"},
"_createdAt":dataset._store_metadata._createdAt+"^^xsd:dateTime"
}
resources = forge.search(filters, limit=3)
type(resources)
len(resources)
forge.as_dataframe(resources, store_metadata=True)
[f"{op.value} ({op.name})" for op in FilterOperator] # These are equivalent to the Python comparison operators
# Search for resources of type Dataset and with text/tab-separated-values as distribution.encodingFormat
# and created a given dateTime (dateTime values should be signaled by the suffix "^^xsd:dateTime")
filter_1 = Filter(operator=FilterOperator.EQUAL, path=["type"], value="Dataset")
filter_2 = Filter(operator=FilterOperator.EQUAL, path=["distribution","encodingFormat"], value="text/tab-separated-values")
filter_3 = Filter(operator=FilterOperator.LOWER_OR_Equal_Than, path=["_createdAt"], value=dataset._store_metadata._createdAt+"^^xsd:dateTime")
resources = forge.search(filter_1, filter_2, filter_3, limit=3)
type(resources)
len(resources)
forge.as_dataframe(resources, store_metadata=True)
Two types of search endpoints are supported: 'sparql' (default) for graph queries and 'elastic' for document oriented queries. The types of available search endpoint can be configured (see 00-Initialization.ipynb for an example of search endpoints config) or set when creating a KnowledgeGraphForge session using the 'searchendpoints' arguments.
The search endpoint to hit when calling forge.search(...) is 'sparql' by default but can be specified using the 'search_endpoint' argument.
# Search for resources of type Person
filters = {"type": "Person"}
resources = forge.search(filters, limit=3, search_endpoint='sparql')
type(resources)
len(resources)
forge.as_dataframe(resources, store_metadata=True)
# Search for resources of type Person and retrieve their ids and names.
filters = {"@type": "http://schema.org/Person"}
resources = forge.search(filters, limit=3,
search_endpoint='elastic',
includes=["@id", "@type"]) # fields can also be excluded with 'excludes'
type(resources)
len(resources)
forge.as_dataframe(resources, store_metadata=True)
# Search results are not synchronized
resources[0]._synchronized
resources[0].id
resources[0].type
It is possible to search for resources stored in buckets different than the configured one. The configured store should of course support it.
resources = forge.search(p.type.id == "Association", limit=3, cross_bucket=True) # cross_bucket defaults to False
type(resources)
len(resources)
forge.as_dataframe(resources)
#Furthermore it is possible to filter by bucket when cross_bucket is set to True. Setting a bucket value when cross_bucket is False will trigger a not_supported exception.
resources = forge.search(p.type.id == "Person", limit=3, cross_bucket=True, bucket="dke/kgforge") # add a bucket
type(resources)
len(resources)
forge.as_dataframe(resources)
When using BlueBrainNexusStore, it is possible to retrieve resources' payload as they were registered (retrieve_source=True) without any changes related to store added metadata or JSONLD framing.
resources = forge.search(p.type.id == "Association", limit=3, retrieve_source=False) # retrieve_source defaults to True
type(resources)
len(resources)
forge.as_dataframe(resources)
SPARQL is used as a query language to perform graph traversing.
Nexus Forge implements a SPARQL query rewriting strategy leveraging a configured RDFModel that lets users write SPARQL queries without adding prefix declarations, prefix names or long IRIs. With this strategy, only type and property names can be provided.
Please refer to the Modeling.ipynb notebook to learn about templates.
Note: DemoStore doesn't implement SPARQL operations yet. Please use another store for this section.
Note: DemoModel and RdfModel schemas have not been synchronized yet. This section is to be run with RdfModel.
jane = Resource(type="Person", name="Jane Doe")
contribution_jane = Resource(type="Contribution", agent=jane)
john = Resource(type="Person", name="John Smith")
contribution_john = Resource(type="Contribution", agent=john)
association = Resource(type="Dataset", contribution=[contribution_jane, contribution_john])
forge.register(association)
forge.template("Dataset") # Templates help know which property to use when writing a query to serach for a given type
When a forge RDFModel is configured, then there is no need to provide prefixes and namespaces when writing a SPARQL query. Prefixes and namespaces will be automatically inferred from the provided schemas and/or JSON-LD context and the query rewritten accordingly.
query = """
SELECT ?id ?name ?contributor
WHERE {
?id a Dataset ;
contribution/agent ?contributor.
?contributor name ?name.
}
"""
resources = forge.sparql(query, limit=3)
type(resources)
len(resources)
print(resources[0])
forge.as_dataframe(resources)
resources = forge.sparql(query, limit=3, debug=True)
Regular SPARQL query can also be provided. When provided, the limit and offset arguments superseed any in query limit or offset values.
query = """
PREFIX dc: <http://purl.org/dc/elements/1.1/>
PREFIX dcat: <http://www.w3.org/ns/dcat#>
PREFIX dcterms: <http://purl.org/dc/terms/>
PREFIX mba: <http://api.brain-map.org/api/v2/data/Structure/>
PREFIX nsg: <https://neuroshapes.org/>
PREFIX owl: <http://www.w3.org/2002/07/owl#>
PREFIX prov: <http://www.w3.org/ns/prov#>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX schema: <http://schema.org/>
PREFIX sh: <http://www.w3.org/ns/shacl#>
PREFIX shsh: <http://www.w3.org/ns/shacl-shacl#>
PREFIX skos: <http://www.w3.org/2004/02/skos/core#>
PREFIX vann: <http://purl.org/vocab/vann/>
PREFIX void: <http://rdfs.org/ns/void#>
PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
PREFIX : <https://neuroshapes.org/>
SELECT ?id ?name
WHERE {
?id a schema:Dataset ;
nsg:contribution/prov:agent ?contributor.
?contributor schema:name ?name.
}
ORDER BY ?id
LIMIT 1
OFFSET 0
"""
# it is recommended to set 'rewrite' to 'False' to prevent the sparql query rewriting when a syntactically correct SPARQL query is provided.
resources = forge.sparql(query, rewrite=False, limit=3, offset=1, debug=True)
type(resources)
len(resources)
type(resources[0])
forge.as_dataframe(resources)
To not assign any limit or offset, one can pass None
to those parameters
query_without_limit = """
PREFIX dc: <http://purl.org/dc/elements/1.1/>
PREFIX dcat: <http://www.w3.org/ns/dcat#>
PREFIX dcterms: <http://purl.org/dc/terms/>
PREFIX mba: <http://api.brain-map.org/api/v2/data/Structure/>
PREFIX nsg: <https://neuroshapes.org/>
PREFIX owl: <http://www.w3.org/2002/07/owl#>
PREFIX prov: <http://www.w3.org/ns/prov#>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX schema: <http://schema.org/>
PREFIX sh: <http://www.w3.org/ns/shacl#>
PREFIX shsh: <http://www.w3.org/ns/shacl-shacl#>
PREFIX skos: <http://www.w3.org/2004/02/skos/core#>
PREFIX vann: <http://purl.org/vocab/vann/>
PREFIX void: <http://rdfs.org/ns/void#>
PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
PREFIX : <https://neuroshapes.org/>
SELECT ?id ?name
WHERE {
?id a schema:Dataset ;
nsg:contribution/prov:agent ?contributor.
?contributor schema:name ?name.
}
ORDER BY ?id
"""
resources = forge.sparql(query_without_limit, rewrite=False, limit=None, offset=None, debug=True)
len(resources)
If you only want to add the context, but keep the content of the query the same, you need to set the rewrite
parameter to False
query_without_context = """
SELECT ?id ?name ?contributor
WHERE {
?id a Dataset ;
contribution/agent ?contributor.
?contributor name ?name.
}
"""
resources = forge.sparql(query_without_context, limit=None, debug=True)
len(resources)
ElasticSearch DSL can be used as a query language search for resources provided that the configured store supports it. The 'BlueBrainNexusStore' supports ElasticSearch.
Note: DemoStore doesn't implement ElasaticSearch DSL operations.
jane = Resource(type="Person", name="Jane Doe")
contribution_jane = Resource(type="Contribution", agent=jane)
john = Resource(type="Person", name="John Smith")
contribution_john = Resource(type="Contribution", agent=john)
association = Resource(type="Dataset", contribution=[contribution_jane, contribution_john])
forge.register(association)
query = """
{
"_source": {
"includes": [
"@id",
"name"
]
},
"query": {
"term": {
"@type": "http://schema.org/Dataset"
}
}
}
"""
# limit and offset (when provided in this method call) superseed 'size' and 'from' values provided in the query
resources = forge.elastic(query, limit=3)
type(resources)
len(resources)
type(resources[0])
forge.as_dataframe(resources)
Note: DemoStore doesn't implement file operations yet. Please use another store for this section.
jane = Resource(type="Person", name="Jane Doe")
! ls -p ../../data | egrep -v /$
distribution = forge.attach("../../data")
association = Resource(type="Association", agent=jane, distribution=distribution)
forge.register(association)
# By default, the downladable file urls are collected from the json path "distribution.contentUrl" (follow="distribution.contentUrl") and
# the files are downloaded in the current path (path=".").
# The argument overwrite: bool can be provided to decide whether to overwrite (True) existing files with the same name or
# to create new ones (False) with their names suffixed with a timestamp.
# A cross_bucket argument can be provided to download data from the configured bucket (cross_bucket=False - the default value)
# or from a bucket different than the configured one (cross_bucket=True). The configured store should support crossing buckets for this to work.
forge.download(association)
# Specific content type can be downloaded.
forge.download(association, content_type="text/tab-separated-values")
# The urls or the files to download can be collected from a different json path (by setting a value for "follow") and
# the files downloaded to a different path (by setting a value for "path")
forge.download(association, follow="distribution.contentUrl", path="./downloaded/")
! ls -l ./downloaded/
#! rm -R ./downloaded/