#!/usr/bin/env python # coding: utf-8 # # Create and Query a Nexus Elasticsearch view # The goal of this notebook is to learn how to connect to an Elasticsearch view and run queries against it. It is not a tutorial on the elasticsearch DSL language for which many well written [learning resources are available](https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html). # ## Prerequisites # # This notebook assumes you've already created project. If not follow the Blue Brain Nexus [Quick Start tutorial](https://bluebrain.github.io/nexus/docs/tutorial/getting-started/quick-start/index.html). # # ## Overview # # You'll work through the following steps: # # 1. Create an Elasticsearch wrapper around your project's ElasticsearchView # 2. Explore and search data using the wrapper as well as the Elasticsearch DSL language # # # This tutorial makes use of the [elasticsearch](https://github.com/elastic/elasticsearch-py) and [elasticsearch-dsl](https://github.com/elastic/elasticsearch-dsl-py) python clients allowing to connect to an elasticsearch search endpoint and perform various types of queries against it. # ## Step 1: Create an Elasticsearch wrapper around your project's ElasticsearchView # Every project in Blue Brain Nexus comes with an ElasticsearchView enabling to explore and search datausing the Elasticsearch DSL language. The address of such ElasticsearchView is # `https://sandbox.bluebrainnexus.io/v1/views/tutorialnexus/$PROJECTLABEL/_search` for a project with label $PROJECTLABEL. The organiation 'tutorialnexus' is the one used throughout the tutorial but it can be replaced by any other organization. # ### Install [elasticsearch](https://github.com/elastic/elasticsearch-py) and [elasticsearch-dsl](https://github.com/elastic/elasticsearch-dsl-py) # In[ ]: # Install https://github.com/elastic/elasticsearch-py get_ipython().system('pip install elasticsearch') # In[ ]: # Install https://github.com/elastic/elasticsearch-dsl-py get_ipython().system('pip install elasticsearch-dsl') # ### Set Nexus deployment configuration # In[ ]: import getpass # In[ ]: token = getpass.getpass() # In[142]: deployment = "https://nexus-sandbox.io/v1" org_label = "tutorialnexus" project_label ="myProject" headers = {} # ### Prepare a custom connection class for elasticsearch # # There is a need to extends the default Urllib3HttpConnection used by the elasticsearch client to: # - include custom headers like an Authorization token # - change the default search endpoint full_url construction method (perform_request): # - full_url = self.host + self.url_prefix + "/_all/_search" +"%s?%s" % (url, urlencode(params)) # - self.url_prefix is the address of the provided ElasticsearchView # - The new full_url construction method is: # - full_url = self.host + self.url_prefix as Nexus hide the target index and handle through its API the search parameters like (from or size) # # Solution partially taken from: from https://github.com/elastic/elasticsearch-py/issues/407 # In[ ]: from elasticsearch import Elasticsearch, Urllib3HttpConnection from elasticsearch.serializer import JSONSerializer, DEFAULT_SERIALIZERS from elasticsearch_dsl import Search import time import ssl import urllib3 from urllib3.exceptions import ReadTimeoutError, SSLError as UrllibSSLError from urllib3.util.retry import Retry import warnings import gzip from base64 import decodestring class MyConnection(Urllib3HttpConnection): def __init__(self,*args, **kwargs): extra_headers = kwargs.pop('extra_headers', {}) super(MyConnection, self).__init__(*args, **kwargs) self.headers.update(extra_headers) def perform_request( self, method, url, params=None, body=None, timeout=None, ignore=(), headers=None ): #url = self.url_prefix +url url = self.url_prefix #if params: # url = "%s?%s" % (url, urlencode(params)) full_url = self.host + self.url_prefix start = time.time() try: kw = {} if timeout: kw["timeout"] = timeout # in python2 we need to make sure the url and method are not # unicode. Otherwise the body will be decoded into unicode too and # that will fail (#133, #201). if not isinstance(url, str): url = url.encode("utf-8") if not isinstance(method, str): method = method.encode("utf-8") request_headers = self.headers if headers: request_headers = request_headers.copy() request_headers.update(headers) if self.http_compress and body: try: body = gzip.compress(body) except AttributeError: # oops, Python2.7 doesn't have `gzip.compress` let's try # again body = gzip.zlib.compress(body) response = self.pool.urlopen( method, url, body, retries=Retry(False), headers=request_headers, **kw ) duration = time.time() - start raw_data = response.data.decode("utf-8") except Exception as e: self.log_request_fail( method, full_url, url, body, time.time() - start, exception=e ) if isinstance(e, UrllibSSLError): raise SSLError("N/A", str(e), e) if isinstance(e, ReadTimeoutError): raise ConnectionTimeout("TIMEOUT", str(e), e) raise ConnectionError("N/A", str(e), e) # raise errors based on http status codes, let the client handle those if needed if not (200 <= response.status < 300) and response.status not in ignore: self.log_request_fail( method, full_url, url, body, duration, response.status, raw_data ) self._raise_error(response.status, raw_data) self.log_request_success( method, full_url, url, body, response.status, raw_data, duration ) return response.status, response.getheaders(), raw_data # ### Create an elasticsearch wrapper # In[ ]: es_endpoint = deployment+"/views/"+org_label+"/"+project_label+"/nxv%3AdefaultElasticSearchIndex/_search" print("Elasticsearch View address: " +es_endpoint) DEFAULT_SERIALIZERS["application/ld+json"] = JSONSerializer() es_wrapper = Elasticsearch(es_endpoint, connection_class=MyConnection,send_get_body_as='POST', extra_headers={"Authorization":"Bearer {}".format(token)}) # ### Let test the elasticsearch wrapper by running a simple filter and aggregation queries # In[ ]: # Retrieve non deprecated resources s = Search(using=es_wrapper) \ .filter("term", _deprecated="false") # Aggregate them by type s.aggs.bucket('per_type', 'terms', field='@type') response = s.execute() total = response.hits.total print('total hits', total.relation, total.value) # Don't forget that ressources are paginated with 10 as default: use from and size (on Nexus API) to get all the hits print("Displaying 10 first search results (score id type)") for hit in response: print(hit.meta.score, hit['@id'], "" if "@type" not in hit else hit['@type']) print("Displaying aggregation") for _type in response.aggregations.per_type.buckets: print(str(_type.doc_count)+ " resources of type "+_type.key) # ## Step 2: Create an ElasticSearchView # # The goal here is to illustrate hwo to use the Nexus SDK to create an Elasticsearch view. The full documentation can be found at: https://bluebrainnexus.io/docs/api/current/kg/kg-views-api.html#create-an-elasticsearchview-using-post. # # ``` # { # "@id": "{someid}", # "@type": [ "View", "ElasticSearchView"], # "resourceSchemas": [ "{resourceSchema}", ...], # "resourceTypes": [ "{resourceType}", ...], # "resourceTag": "{tag}", # "sourceAsText": {sourceAsText}, # "includeMetadata": {includeMetadata}, # "includeDeprecated": {includeDeprecated}, # "mapping": _elasticsearch mapping_ # } # ``` # # An ElasticSearchView is a way to tell Nexus: # # 1. Which resources to index in the view: # # * resources that conform to a given schema: set resourceSchemas to the targeted schemas # # * resources that are of a given type: set resourceTypes to the targeted types # # * resources that are tagged: set resourceTag to the targeted tag value. # # 2. Which mapping to use when indexing the selected resources: # # * set mappingto be used: [More info about Elasticsearch mapping](https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping.html). # # # In[ ]: get_ipython().system('pip install nexus-sdk') # In[ ]: import nexussdk as nexus nexus.config.set_environment(deployment) nexus.config.set_token(token) # In[ ]: type_to_index = "https://bluebrain.github.io/nexus/vocabulary/File" view_data = { "@id": "http://myView6.com", "@type": [ "ElasticSearchView" ], "includeMetadata": True, "includeDeprecated": False, "resourceTypes":type_to_index, "mapping": { "dynamic": "false", "properties": { "@id": { "type": "keyword" }, "@type": { "type": "keyword" } } }, "sourceAsText": False } try: response = nexus.views.create_es(org_label=org_label, project_label=project_label,view_data=view_data) print(dict(response)) except nexus.HTTPError as ne: print(ne.response.json()) # ### List views # In[ ]: response = nexus.views.list(org_label=org_label, project_label=project_label) print(dict(response)) # ### Views statistics # # Please refer to https://bluebrainnexus.io/docs/api/current/kg/kg-views-api.html#fetch-view-statistics for more details about how to access the view indexing progress.