#!/usr/bin/env python # coding: utf-8 # # Querying # # This notebook demonstrates Nexus Forge data [querying features](https://nexus-forge.readthedocs.io/en/latest/interaction.html#querying). # In[1]: from kgforge.core import KnowledgeGraphForge # A configuration file is needed in order to create a KnowledgeGraphForge session. A configuration can be generated using the notebook [00-Initialization.ipynb](00%20-%20Initialization.ipynb). # In[2]: forge = KnowledgeGraphForge("../../configurations/forge.yml") # ## Imports # In[3]: from kgforge.core import Resource from kgforge.specializations.resources import Dataset from kgforge.core.wrappings.paths import Filter, FilterOperator # ## Retrieval # ### latest version # In[4]: jane = Resource(type="Person", name="Jane Doe", award=["Nobel"]) # In[5]: forge.register(jane) # In[6]: resource = forge.retrieve(jane.id) # In[7]: resource == jane # ### specific version # In[8]: jane = Resource(type="Person", name="Jane Doe", award=["Nobel"]) # In[9]: forge.register(jane) # In[10]: forge.tag(jane, "v1") # In[11]: jane.email = ["jane.doe@epfl.ch", "jane.doe@example.org"] # In[12]: forge.update(jane) # In[13]: try: # DemoStore print(jane._store_metadata.version) except: # BlueBrainNexus print(jane._store_metadata._rev) # In[14]: jane_v1 = forge.retrieve(jane.id, version=1) # In[15]: jane_v1_tag = forge.retrieve(jane.id, version="v1") # In[16]: jane_v1_rev = forge.retrieve(jane.id+"?rev=1") # In[17]: jane_v1 == jane_v1_tag # In[18]: jane_v1 == jane_v1_rev # In[19]: jane_v1 != jane # In[20]: try: # DemoStore print(jane_v1._store_metadata.version) except: # BlueBrainNexus print(jane_v1._store_metadata._rev) # ### crossbucket retrieval # It is possible to retrieve resources stored in buckets different then the configured one. The configured store should of course support it. # In[21]: resource = forge.retrieve(jane.id, cross_bucket=True) # cross_bucket defaults to False # In[22]: resource._store_metadata # In[23]: resource._last_action # In[24]: resource._synchronized # ### Retrieving using the resource url # One can also use the value of `_self` from ._stote_metadata to retrieve a resource # In[25]: import copy import string import random # In[26]: other_resource = copy.deepcopy(resource) # In[27]: other_resource.id = f"https://my-incredible-id-{''.join(random.choices(string.digits, k=5))}" # In[28]: forge.register(other_resource) # In[29]: url = other_resource._store_metadata['_self'] # In[30]: same_resource_url = forge.retrieve(id=url) same_resource_id = forge.retrieve(id=other_resource.id) # Confirm they are the same # In[31]: same_resource_id == same_resource_url # ### Original source retrieval # When using BlueBrainNexusStore, it is possible to retrieve resources' payload as they were registered (retrieve_source=True) without any changes related to store added metadata or JSONLD framing. # In[32]: resource = forge.retrieve(jane.id, retrieve_source=False) # retrieve_source defaults to True # In[33]: forge.as_json(resource) # In[34]: resource._store_metadata # In[35]: resource._last_action # In[36]: resource._synchronized # ### error handling # In[37]: resource = forge.retrieve("123") # In[38]: resource is None # ## Searching # Note: DemoModel and RdfModel schemas have not been synchronized yet. This section is to be run with RdfModel. Commented lines are for DemoModel. # In[39]: jane = Resource(type="Person", name="Jane Doe") contribution_jane = Resource(type="Contribution", agent=jane) # In[40]: john = Resource(type="Person", name="John Smith") contribution_john = Resource(type="Contribution", agent=john) # In[41]: dataset = Dataset(forge, type="Dataset", contribution=[contribution_jane, contribution_john]) dataset.add_distribution("../../data/associations.tsv") # In[42]: forge.register(dataset) # In[43]: forge.as_json(dataset) # ### Using resource paths as filters # The `paths` method load the template or property paths (ie. expected properties) for a given type. # # Please refer to the [Modeling.ipynb](11%20-%20Modeling.ipynb) notebook to learn about templates and types. # In[44]: p = forge.paths("Dataset") # Autocompletion is enabled on `p` and this can be used to create search filters. # Note: There is a known issue for RdfModel which requires using `p.type.id` instead of `p.type`. # All [python comparison operators](https://www.w3schools.com/python/gloss_python_comparison_operators.asp) are supported. # In[45]: resources = forge.search(p.type.id=="Person", limit=3) # In[46]: type(resources) # In[47]: len(resources) # In[48]: forge.as_dataframe(resources) # In[49]: forge.as_dataframe(resources, store_metadata=True) # In[50]: # Search results are not synchronized resources[0]._synchronized # #### Using nested resource property # Property autocompletion is available on a path `p` even for nested properties like `p.contribution`. # In[51]: # Search for resources of type Dataset and with text/tab-separated-values as distribution.encodingFormat resources = forge.search(p.type.id == "Dataset", p.distribution.encodingFormat == "text/tab-separated-values", limit=3) # In[52]: len(resources) # In[53]: forge.as_dataframe(resources) # ### Using dictionaries as filters # A dictionary can be provided for filters: # * {'type': {'id':'Dataset'}} is equivalent to p.type.id=="Dataset" # * only the '==' operator is supported # * nested dict are supported # * it is not mandatory for the provided properties and values to be defined in the forge model. Results will be retrieved if there are corresponding data in the store. # # This feature is not supported when using the DemoStore # # In[54]: # Search for resources of type Dataset and with text/tab-separated-values as distribution.encodingFormat # and created a given dateTime (by default, dateTime values should be signaled by the suffix "^^xsd:dateTime") filters = { "type": "Dataset", "distribution":{"encodingFormat":"text/tab-separated-values"}, "_createdAt":dataset._store_metadata._createdAt+"^^xsd:dateTime" } resources = forge.search(filters, limit=3) # In[55]: type(resources) # In[56]: len(resources) # In[57]: forge.as_dataframe(resources, store_metadata=True) # ### Using built-in Filter objects # #### Supported filter operators # In[58]: [f"{op.value} ({op.name})" for op in FilterOperator] # These are equivalent to the Python comparison operators # In[59]: # Search for resources of type Dataset and with text/tab-separated-values as distribution.encodingFormat # and created a given dateTime (dateTime values should be signaled by the suffix "^^xsd:dateTime") filter_1 = Filter(operator=FilterOperator.EQUAL, path=["type"], value="Dataset") filter_2 = Filter(operator=FilterOperator.EQUAL, path=["distribution","encodingFormat"], value="text/tab-separated-values") filter_3 = Filter(operator=FilterOperator.LOWER_OR_Equal_Than, path=["_createdAt"], value=dataset._store_metadata._createdAt+"^^xsd:dateTime") resources = forge.search(filter_1, filter_2, filter_3, limit=3) # In[60]: type(resources) # In[61]: len(resources) # In[62]: forge.as_dataframe(resources, store_metadata=True) # ### Using search endpoints # # Two types of search endpoints are supported: 'sparql' (default) for graph queries and 'elastic' for document oriented queries. The types of available search endpoint can be configured (see [00-Initialization.ipynb](00%20-%20Initialization.ipynb) for an example of search endpoints config) or set when creating a KnowledgeGraphForge session using the 'searchendpoints' arguments. # # The search endpoint to hit when calling forge.search(...) is 'sparql' by default but can be specified using the 'search_endpoint' argument. # # The data that is available through these searchendpoints is limited to data indexed in specific indices that are configured through views. The project is set-up with default a elastic search view that indexes all data, and a default sparql view that indexes all data. # You may define a view that targets a subset of the data based on some filters. # #### SPARQL Search Endpoint # In[63]: # Search for resources of type Person filters = {"type": "Person"} resources = forge.search(filters, limit=3, search_endpoint='sparql') # In[64]: type(resources) # In[65]: len(resources) # In[66]: forge.as_dataframe(resources, store_metadata=True) # #### ElasticSearch Endpoint # In[67]: # Search for resources of type Person and retrieve their ids and names. filters = {"@type": "http://schema.org/Person"} resources = forge.search( filters, limit=3, search_endpoint='elastic', includes=["@id", "@type"] ) # fields can also be excluded with 'excludes' # In[68]: type(resources) # In[69]: len(resources) # In[70]: forge.as_dataframe(resources, store_metadata=True) # In[71]: # Search results are not synchronized resources[0]._synchronized # In[72]: resources[0].id # In[73]: resources[0].type # #### ElasticSearch Endpoint - Alternative view # - Create a view that only indexes resources of type http://schema.org/Person # In[74]: import requests payload = { "@type": ["View", "ElasticSearchView"], "pipeline": [{ "name": "filterByType", "config": {"types": ["http://schema.org/Person"]} }], "mapping": { "dynamic": True, "properties": { "@id": { "type": "keyword" }, "@type": { "type": "keyword" } } } } url = f"{forge._store.endpoint}/views/{forge._store.bucket}" headers = { "mode": "cors", "Content-Type": "application/json", "Accept": "application/ld+json, application/json", "Authorization": "Bearer " + forge._store.token } response = requests.post(url=url, headers=headers, json=payload) # In[75]: view_id = response.json()["@id"] view_id # - Query the view (may take a few seconds to get a response due to indexing time) # In[76]: result_1 = forge.search({"@type": "Dataset"}, search_endpoint='elastic', view=view_id, cross_bucket=True, debug=True) result_2 = forge.search({"@type": "Person"}, search_endpoint='elastic', view=view_id, cross_bucket=True, debug=True) len(result_1), len(result_2) # Providing a view is feature that is also available through the forge.elastic, and forge.sparql calls # ### Crossbucket search # It is possible to search for resources stored in buckets different than the configured one. The configured store should of course support it. # In[77]: resources = forge.search(p.type.id == "Association", limit=3, cross_bucket=True) # cross_bucket defaults to False # In[78]: type(resources) # In[79]: len(resources) # In[80]: forge.as_dataframe(resources) # In[81]: #Furthermore it is possible to filter by bucket when cross_bucket is set to True. Setting a bucket value when cross_bucket is False will trigger a not_supported exception. resources = forge.search(p.type.id == "Person", limit=3, cross_bucket=True, bucket="dke/kgforge") # add a bucket # In[82]: type(resources) # In[83]: len(resources) # In[84]: forge.as_dataframe(resources) # ### Searching original source # When using BlueBrainNexusStore, it is possible to retrieve resources' payload as they were registered (retrieve_source=True) without any changes related to store added metadata or JSONLD framing. # In[85]: resources = forge.search(p.type.id == "Association", limit=3, retrieve_source=False) # retrieve_source defaults to True # In[86]: type(resources) # In[87]: len(resources) # In[88]: forge.as_dataframe(resources) # ## Graph traversing # # SPARQL is used as a query language to perform graph traversing. # # Nexus Forge implements a SPARQL query rewriting strategy leveraging a configured RDFModel that lets users write SPARQL queries without adding prefix declarations, prefix names or long IRIs. With this strategy, only type and property names can be provided. # # Please refer to the [Modeling.ipynb](11%20-%20Modeling.ipynb) notebook to learn about templates. # Note: DemoStore doesn't implement SPARQL operations yet. Please use another store for this section. # Note: DemoModel and RdfModel schemas have not been synchronized yet. This section is to be run with RdfModel. # In[89]: jane = Resource(type="Person", name="Jane Doe") contribution_jane = Resource(type="Contribution", agent=jane) # In[90]: john = Resource(type="Person", name="John Smith") contribution_john = Resource(type="Contribution", agent=john) # In[91]: association = Resource(type="Dataset", contribution=[contribution_jane, contribution_john]) # In[92]: forge.register(association) # In[93]: forge.template("Dataset") # Templates help know which property to use when writing a query to serach for a given type # ### Prefix and namespace free SPARQL query # # When a forge RDFModel is configured, then there is no need to provide prefixes and namespaces when writing a SPARQL query. Prefixes and namespaces will be automatically inferred from the provided schemas and/or JSON-LD context and the query rewritten accordingly. # In[94]: query = """ SELECT ?id ?name ?contributor WHERE { ?id a Dataset ; contribution/agent ?contributor. ?contributor name ?name. } """ # In[95]: resources = forge.sparql(query, limit=3) # In[96]: type(resources) # In[97]: len(resources) # In[98]: print(resources[0]) # In[99]: forge.as_dataframe(resources) # ### display rewritten SPARQL query # In[100]: resources = forge.sparql(query, limit=3, debug=True) # ### Full SPARQL query # # Regular SPARQL query can also be provided. When provided, the limit and offset arguments superseed any in query limit or offset values. # In[101]: query = """ PREFIX dc: PREFIX dcat: PREFIX dcterms: PREFIX mba: PREFIX nsg: PREFIX owl: PREFIX prov: PREFIX rdf: PREFIX rdfs: PREFIX schema: PREFIX sh: PREFIX shsh: PREFIX skos: PREFIX vann: PREFIX void: PREFIX xsd: PREFIX : SELECT ?id ?name WHERE { ?id a schema:Dataset ; nsg:contribution/prov:agent ?contributor. ?contributor schema:name ?name. } ORDER BY ?id LIMIT 1 OFFSET 0 """ # In[102]: # it is recommended to set 'rewrite' to 'False' to prevent the sparql query rewriting when a syntactically correct SPARQL query is provided. resources = forge.sparql(query, rewrite=False, limit=3, offset=1, debug=True) # In[103]: type(resources) # In[104]: len(resources) # In[105]: type(resources[0]) # In[106]: forge.as_dataframe(resources) # ### Avoid rewriting the query # To not assign any limit or offset, one can pass `None` to those parameters # In[107]: query_without_limit = """ PREFIX dc: PREFIX dcat: PREFIX dcterms: PREFIX mba: PREFIX nsg: PREFIX owl: PREFIX prov: PREFIX rdf: PREFIX rdfs: PREFIX schema: PREFIX sh: PREFIX shsh: PREFIX skos: PREFIX vann: PREFIX void: PREFIX xsd: PREFIX : SELECT ?id ?name WHERE { ?id a schema:Dataset ; nsg:contribution/prov:agent ?contributor. ?contributor schema:name ?name. } ORDER BY ?id """ # In[108]: resources = forge.sparql(query_without_limit, rewrite=False, limit=None, offset=None, debug=True) # In[109]: len(resources) # If you only want to add the context, but keep the content of the query the same, you need to set the `rewrite` parameter to `False` # In[110]: query_without_context = """ SELECT ?id ?name ?contributor WHERE { ?id a Dataset ; contribution/agent ?contributor. ?contributor name ?name. } """ # In[111]: resources = forge.sparql(query_without_context, limit=None, debug=True) # In[112]: len(resources) # ## ElasticSearch DSL Query # # ElasticSearch DSL can be used as a query language search for resources provided that the configured store supports it. The 'BlueBrainNexusStore' supports ElasticSearch. # Note: DemoStore doesn't implement ElasaticSearch DSL operations. # In[113]: jane = Resource(type="Person", name="Jane Doe") contribution_jane = Resource(type="Contribution", agent=jane) # In[114]: john = Resource(type="Person", name="John Smith") contribution_john = Resource(type="Contribution", agent=john) # In[115]: association = Resource(type="Dataset", contribution=[contribution_jane, contribution_john]) # In[116]: forge.register(association) # ### Plain ElasticSearch DSL # In[117]: query = """ { "_source": { "includes": [ "@id", "name" ] }, "query": { "term": { "@type": "http://schema.org/Dataset" } } } """ # In[118]: # limit and offset (when provided in this method call) superseed 'size' and 'from' values provided in the query resources = forge.elastic(query, limit=3) # In[119]: type(resources) # In[120]: len(resources) # In[121]: type(resources[0]) # In[122]: forge.as_dataframe(resources) # ### ElasticSearch results as dictionaries # In[136]: resources_2 = forge.elastic(query, limit=3, as_resource=False) # In[137]: type(resources_2[0]) # In[138]: resources_2[0] # ## Downloading # Note: DemoStore doesn't implement file operations yet. Please use another store for this section. # In[123]: jane = Resource(type="Person", name="Jane Doe") # In[124]: get_ipython().system(' ls -p ../../data | egrep -v /$') # In[125]: distribution = forge.attach("../../data") # In[126]: association = Resource(type="Association", agent=jane, distribution=distribution) # In[127]: forge.register(association) # In[128]: # By default, the downladable file urls are collected from the json path "distribution.contentUrl" (follow="distribution.contentUrl") and # the files are downloaded in the current path (path="."). # The argument overwrite: bool can be provided to decide whether to overwrite (True) existing files with the same name or # to create new ones (False) with their names suffixed with a timestamp. # A cross_bucket argument can be provided to download data from the configured bucket (cross_bucket=False - the default value) # or from a bucket different than the configured one (cross_bucket=True). The configured store should support crossing buckets for this to work. forge.download(association) # In[129]: # Specific content type can be downloaded. forge.download(association, content_type="text/tab-separated-values") # In[130]: # The urls or the files to download can be collected from a different json path (by setting a value for "follow") and # the files downloaded to a different path (by setting a value for "path") forge.download(association, follow="distribution.contentUrl", path="./downloaded/") # In[131]: get_ipython().system(' ls -l ./downloaded/') # In[132]: #! rm -R ./downloaded/