#!/usr/bin/env python # coding: utf-8 --- title: FHIR for Research Workshop - Bulk Data execute: cache: true --- # ## Introduction # ### Learning Objectives and Key Concepts # # **The goal of this workshop is to connect to the SMART Bulk Data Server and fetch a set of sample patient data.** # # In this exercise, you will: # # - Connect to an authorization server using a provided key, and retrieve an access token # - Make a Bulk Data Export Request with that access token # - Download the exported Bulk Data # - Convert the downloaded data into DataFrames # # While libraries like [FHIR-PYrate](https://github.com/UMEssen/FHIR-PYrate) allow you to fetch data from a server and parse it directly into a DataFrame, these libraries generally do not support FHIR Bulk Data. This workshop will step through the process of building up a tool to fetch Bulk Data and convert it into DataFrames. # # This notebook is best experienced interactively. If the notebook already has output in it, you may clear that prior to starting via the menu: Cell -> All Output -> Clear. # ### Setup # # If you are not using a JupyterHub instance with dependencies already installed, you will need to: # # 1. Clone this repository # 2. Install dependencies with `pip install -r requirements.txt` # 3. Run `jupyter notebook workshops/fhir-bulk-data` # # This should open the Jupyter environment in your browser window. You should see `notebook.ipynb` listed in the interface. Open this notebook in Jupyter, and you should be able to run the code. # ### Background # # The [Bulk Data Access standard](http://hl7.org/fhir/uv/bulkdata/) enables researchers to retrieve large volumes of data from a patient population in an EHR. The Bulk Data Access standard is part of the [SMART ecosystem](https://smarthealthit.org/), and SMART on FHIR can be used to authenticate and authorize applications that retrieve bulk data automatically # # Clients of FHIR Bulk Data servers use [SMART Backend Authorization](http://www.hl7.org/fhir/smart-app-launch/backend-services.html) to connect to the server. With SMART Backend Authorization, registered clients make a signed request to a token endpoint to receive a Bearer token, which they use for subsequent calls to the FHIR server. # # Client registration often happens manually as a separate one-time event. The SMART Backend Authorization specification does not define an API for registration. # # For this workshop, we connect to the [SMART Bulk Data Server](https://bulk-data.smarthealthit.org/) (). This is a developer tool provided by SMART Health IT to facilitate development with Bulk Data Access. This test server allows clients to "register" on the launch page by providing either a URL for a [JSON Web Key Set(JWKS)](https://auth0.com/docs/secure/tokens/json-web-tokens/json-web-key-sets) or a raw JWKS. In this case, "registration" is not stored on the server. Instead, the FHIR Server URL contains the "registration" information stored as state in the URL and clientID. Production servers will usually have a more standard registration process rather than taking this approach. # # For convenience, the SMART Bulk Data Server launch page allows users to generate a one-off JWKS to use for testing. For production usage, clients must generate their own certificates and JWKS and keep the private key private. # In this workshop, we will use a JWKS generated by the launch page. # # **IMPORTANT**: this workshop is not meant to be a formal documentation of the specification, and largely skips error handling and stays on the "happy path" for brevity and readability. We strongly recommend reviewing the specifications and adding error handling before using any of this code in a production environment. # # In[1]: # The default style for rendering JSON parsed as Python dicts isn't the best. # Use this import and call `print(json)` when we want a cleaner view. from rich import print # Status bars for long-running cels from tqdm.notebook import trange, tqdm # ## Getting our Access Token # # The first step in obtaining data from a FHIR server that supports Bulk Data Access is to obtain an access token. That access token identifies and authorizes the client on requests made to the FHIR resource server. # # Obtaining an access token is itself a two-step process: # 1. Make a discovery request to the FHIR resource server to get the address of the authorization server. # 2. Post a token request, signed by the client's private key, to the authorization server # # To keep the focus of this workshop on the Bulk Data process rather than the details of generating keys, we will use a JWKS pre-generated by the SMART Bulk Data server launch page. # # For reference, the steps followed to generate the keys used here were: # # - Visit the SMART Bulk Data Server launch page # - In the upper left, click the `JWKS` button for Authentication # - Click the `Generate` button and choose `Generate RS384` # - Choose `R4` for the FHIR Version # - The associated text box now contains a JWKS with both a public and private key, and the Launch Configuration contains a FHIR Server URL and Client ID # - Convert the private key from the JWKS to "PEM" format so it can be used by Python (this is not easy to do natively in Python, so we have done it with JavaScript out of band) # # Let's start by defining our credentials. **In practice, real credentials must always be stored and loaded securely**, but for simplicity in this workshop we will define them as local variables. # In[2]: client_id = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCIsImtpZCI6InJlZ2lzdHJhdGlvbi10b2tlbiJ9.eyJqd2tzIjp7ImtleXMiOlt7Imt0eSI6IlJTQSIsImFsZyI6IlJTMzg0IiwibiI6IngzMDc2RTJNaUpMR3JPbXJXRjZXSWZ1RjFSZDBlTjBSdEhUSVRuMlNGVWhMYTFQWE5Ia0xBR2xSSmtJWk1QMUk5SEhxdTRERy02d2JraFMweU9GbEZhZE1iaGgzcHkySHoybDctRmg1M3Y3bmpwb3dxUGV2eEpqMlpEQU5BanFWeHRLOGdvMm1BZmZFSnJ2ZkVHbm5oUGkzdGE1U2U5UTBkS29la2hJRVRCaVJTa0ozN0pobEZGSDh3S2hFLXVwaXBQU3VycTBrQ0JkNlNaS3NOVHpHNzJmLVJoNENiREZWTVdfRm5zcTh5LWRJMTdMSDJZcHBBLWc0eGlUZnMwMGZOUG9FUEdoWFU2bHFKMHMwclp4Um9zYnVuV0NTYi1UaEtWV0RyeUFudE83S3dWN1BxVG1NMmVrVS1yenZFaWprVjZfUUlnVTJxRTd6X1k1N1l4aW8zUSIsImUiOiJBUUFCIiwia2V5X29wcyI6WyJ2ZXJpZnkiXSwiZXh0Ijp0cnVlLCJraWQiOiI0ZDc3OTJjZTQyMDU0ZDVkZjhkZDg1ZjhiNTI3ZGQ4OCJ9LHsia3R5IjoiUlNBIiwiYWxnIjoiUlMzODQiLCJuIjoieDMwNzZFMk1pSkxHck9tcldGNldJZnVGMVJkMGVOMFJ0SFRJVG4yU0ZVaExhMVBYTkhrTEFHbFJKa0laTVAxSTlISHF1NERHLTZ3YmtoUzB5T0ZsRmFkTWJoaDNweTJIejJsNy1GaDUzdjduanBvd3FQZXZ4SmoyWkRBTkFqcVZ4dEs4Z28ybUFmZkVKcnZmRUdubmhQaTN0YTVTZTlRMGRLb2VraElFVEJpUlNrSjM3SmhsRkZIOHdLaEUtdXBpcFBTdXJxMGtDQmQ2U1pLc05Uekc3MmYtUmg0Q2JERlZNV19GbnNxOHktZEkxN0xIMllwcEEtZzR4aVRmczAwZk5Qb0VQR2hYVTZscUowczByWnhSb3NidW5XQ1NiLVRoS1ZXRHJ5QW50TzdLd1Y3UHFUbU0yZWtVLXJ6dkVpamtWNl9RSWdVMnFFN3pfWTU3WXhpbzNRIiwiZSI6IkFRQUIiLCJkIjoiUnptQWRTMlMtb1FsS1VGNHF1R0Npdm1KekE1R3lJeHRzTmR0V1JEZVluamdiSjZQbksyRzd3dXJMSlMyOTlYSEFYZld6a0ZwU2h3bDc5OHl1UEk0ckNXQ1ZXQ29fLWh5ci14Q2xlWEpCWVJQV292VXljODlVMTBsdzVtZ1cyWmRhWkotT2NLblBkYWZreERLME1wdkhmdkxZN09zd1lkX2Z4UHFQRTd3ZDlaQU5XLUIyWmNURUVmd2taNWdlcmtDdnFHQ1lEUTdVcVJqR3k1dWRjTkRiQ01ITFdGaEZZMTVqMDVMMFpJV0RwUDY2cmN6UWZEdnduR0pIbWxJbnJMbTl5WkowUTNkVlpHSmo2Y2dMeWI4WHhkNHpWRjZGSy1NX2VKbnFzZFRveHRPMDNUOVotSWlrN1BfbFBheWRvMWRycXRZdUxmZXpvU1lnUGp0V0NnV0JRIiwicCI6IjZwNlV5aGZiQ0JjQlEzcGttMHZEb1lqSDZsc1FCeS1PTzlEYlpfZnFfSHpzZl96UWhENDdua0dZZngxbGVTUFlQU0ZSeDlRTUR3cTlvYWxjYmEwNmE3QTVmMUxQNVpaRnNvSDVCTElHTUcxNmhDbW1mTEdRMURkZ3pMb2s3Q3RldDRnNGhUTlpseFZOYV9uYVNmZGJSdmQycF8zNTM1RGpaOXoyMEpSNllDYyIsInEiOiIyYXNhQ0RCTmY3NTQ1ajdOcXI2TTZiUW8wVGZEWGNlb2FxcGVtNGhpNE1pYUtBOEcydVFvdXNTOGcyUTlZOFZiZmxjX3I2WmxPVjIxSmJhYW5WN253MDRxbVpqMG5Xdkk0a19yX2lKWTVuSDNUMHk0Y0lGV21tLUhPY1dzazJXWl9QQ1NSc1piOU1qOUs4UXh6b1h5WEo0ck9aLUw4OTNZbDZ5bVdKa2xqVnMiLCJkcCI6Ik9LeWI5b0Z5dUc2T01KV2xMZHBNWkgzZEJPQ0FhNnZ5S01MWDdUSjNBZ3pQT0UtQ3N4OHhXWll3MXl2cnNpcVZkcGJRNFh0NGVqMjI5eEVwTVpreHpvZWdMQUItRmRDSl80Zmo5bDFtbjFZaXpVQWVabXFpT0pFMEFlQkpRUDlzX3RxYUJKc1YzaWdZTHFnSk1lcmRrclAtWnJBMEp1d2g4cG51eVEzRXplcyIsImRxIjoib2I2R0FvMjZHUEcxcnduLUZDR3lYanMwbFhzRlhwdHRaNDJmN1owa05IcDhLc1kzeHRJQl9mOFJRZVZyeE1hem5TZENPTWpCc1NZVDVLbFRMUnVIeHRZX3k1RWdQQllLMlRpZ1dXQzJoTTh0QWEwMTVNd0hTWTBVZ19hQ3JhaXpDNFRNZlhFS2hkUVFaTVJPYW5PWVRBQndpRW9wV2hhQXl2eE5ROHJSWDc4IiwicWkiOiJLSjhJU0RKaHVyUmEyTVRHdG4zWjR3NU9ob3o2N29OcE10MG1TakxGUEt0QjFWbjRaZ3VkTUxfWTZ4V2lWTnBOR1hQa3hoMEJjRmNKakNKcC0yeUZLV0d4Si14M2JMWVllbkVUaGRFSGRRR0xuUUszMHlEdHFTY2NDUVY5U2xGc281NUdnUmxhODNaY2NBZTdBMXBWN2sxRGE4dFVFNkE4TXNlQ1ZXamRLbFUiLCJrZXlfb3BzIjpbInNpZ24iXSwiZXh0Ijp0cnVlLCJraWQiOiI0ZDc3OTJjZTQyMDU0ZDVkZjhkZDg1ZjhiNTI3ZGQ4OCJ9XX0sImFjY2Vzc1Rva2Vuc0V4cGlyZUluIjoxNSwiaWF0IjoxNjg2NjUyNzM4fQ.j1urst068-21CxiH0Nqml7XoE9v6hWJ_vfqAK4W22vg' # Don't worry! This is not anybody's real private key. It was generated specifically and only for this exercise. private_key = """-----BEGIN RSA PRIVATE KEY----- MIIEowIBAAKCAQEAx3076E2MiJLGrOmrWF6WIfuF1Rd0eN0RtHTITn2SFUhLa1PX NHkLAGlRJkIZMP1I9HHqu4DG+6wbkhS0yOFlFadMbhh3py2Hz2l7+Fh53v7njpow qPevxJj2ZDANAjqVxtK8go2mAffEJrvfEGnnhPi3ta5Se9Q0dKoekhIETBiRSkJ3 7JhlFFH8wKhE+upipPSurq0kCBd6SZKsNTzG72f+Rh4CbDFVMW/Fnsq8y+dI17LH 2YppA+g4xiTfs00fNPoEPGhXU6lqJ0s0rZxRosbunWCSb+ThKVWDryAntO7KwV7P qTmM2ekU+rzvEijkV6/QIgU2qE7z/Y57Yxio3QIDAQABAoIBAEc5gHUtkvqEJSlB eKrhgor5icwORsiMbbDXbVkQ3mJ44Gyej5ythu8LqyyUtvfVxwF31s5BaUocJe/f MrjyOKwlglVgqP/ocq/sQpXlyQWET1qL1MnPPVNdJcOZoFtmXWmSfjnCpz3Wn5MQ ytDKbx37y2OzrMGHf38T6jxO8HfWQDVvgdmXExBH8JGeYHq5Ar6hgmA0O1KkYxsu bnXDQ2wjBy1hYRWNeY9OS9GSFg6T+uq3M0Hw78JxiR5pSJ6y5vcmSdEN3VWRiY+n IC8m/F8XeM1RehSvjP3iZ6rHU6MbTtN0/WfiIpOz/5T2snaNXa6rWLi33s6EmID4 7VgoFgUCgYEA6p6UyhfbCBcBQ3pkm0vDoYjH6lsQBy+OO9DbZ/fq/Hzsf/zQhD47 nkGYfx1leSPYPSFRx9QMDwq9oalcba06a7A5f1LP5ZZFsoH5BLIGMG16hCmmfLGQ 1DdgzLok7Ctet4g4hTNZlxVNa/naSfdbRvd2p/3535DjZ9z20JR6YCcCgYEA2asa CDBNf7545j7Nqr6M6bQo0TfDXceoaqpem4hi4MiaKA8G2uQousS8g2Q9Y8Vbflc/ r6ZlOV21JbaanV7nw04qmZj0nWvI4k/r/iJY5nH3T0y4cIFWmm+HOcWsk2WZ/PCS RsZb9Mj9K8QxzoXyXJ4rOZ+L893Yl6ymWJkljVsCgYA4rJv2gXK4bo4wlaUt2kxk fd0E4IBrq/IowtftMncCDM84T4KzHzFZljDXK+uyKpV2ltDhe3h6Pbb3ESkxmTHO h6AsAH4V0In/h+P2XWafViLNQB5maqI4kTQB4ElA/2z+2poEmxXeKBguqAkx6t2S s/5msDQm7CHyme7JDcTN6wKBgQChvoYCjboY8bWvCf4UIbJeOzSVewVem21njZ/t nSQ0enwqxjfG0gH9/xFB5WvExrOdJ0I4yMGxJhPkqVMtG4fG1j/LkSA8FgrZOKBZ YLaEzy0BrTXkzAdJjRSD9oKtqLMLhMx9cQqF1BBkxE5qc5hMAHCISilaFoDK/E1D ytFfvwKBgCifCEgyYbq0WtjExrZ92eMOToaM+u6DaTLdJkoyxTyrQdVZ+GYLnTC/ 2OsVolTaTRlz5MYdAXBXCYwiaftshSlhsSfsd2y2GHpxE4XRB3UBi50Ct9Mg7akn HAkFfUpRbKOeRoEZWvN2XHAHuwNaVe5NQ2vLVBOgPDLHglVo3SpV -----END RSA PRIVATE KEY-----""" # note key id is the "kid" field from the JWKS -- it's same for both values of `keys` key_id = "4d7792ce42054d5df8dd85f8b527dd88" server_url = 'https://bulk-data.smarthealthit.org/eyJlcnIiOiIiLCJwYWdlIjoxMDAwMCwiZHVyIjoxMCwidGx0IjoxNSwibSI6MSwic3R1Ijo0LCJkZWwiOjB9/fhir' # We will use the [Requests](https://requests.readthedocs.io/en/latest/) library for making all HTTP requests, and use a `Session`, in case we need to persist common settings such as proxy or SSL configuration. # In[3]: import requests session = requests.Session() # Optional: Turn off SSL verification. Useful when dealing with a corporate proxy with self-signed certificates. from urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) session.verify = False # Let's start by confirming we can hit the server via the `/metadata` endpoint. When connecting to a server for the first time it is generally a good idea to review the metadata to see what the server supports, and that it matches your expectations. In this case, expect to see the name "SMART Sample Bulk Data Server", and references to "export" operations. # In[4]: r = session.get(f'{server_url}/metadata') metadata = r.json() print(metadata) # The SMART Backend Authorization specification defines that the token endpoint will be published as part of the FHIR resource server's SMART metadata, at `.well-known/smart-configuration`. Let's fetch that endpoint and review the contents. # In[5]: r = session.get(f'{server_url}/.well-known/smart-configuration') smart_config = r.json() print(smart_config) # We care most about the `token_endpoint` field, which we need to request our JWT. For more information about the other fields, see [here](http://www.hl7.org/fhir/smart-app-launch/scopes-and-launch-context.html). # In[6]: token_endpoint = smart_config['token_endpoint'] # Now we have our token endpoint, so we can make a request to it to get a token. # The request follows the [OAuth 2.0 "Client Credentials" flow](https://datatracker.ietf.org/doc/html/rfc6749#section-4.4), using a [JSON Web Token (JWT) assertion](https://datatracker.ietf.org/doc/html/rfc7523) containing our client ID and signed with our private key. # # # ๐Ÿ“˜ [Read more about the access token request specification](http://www.hl7.org/fhir/smart-app-launch/backend-services.html#obtain-access-token) # In[7]: # Create a JWT client assertion as follows: import jwt import datetime assertion = jwt.encode({ 'iss': client_id, # "iss" == "issuer", the client that created this JWT 'sub': client_id, # "sub" == "subject", the client that will use the access token 'aud': token_endpoint, # "aud" == "audience", the receiver of this request 'exp': int((datetime.datetime.now() + datetime.timedelta(minutes=5)).timestamp()) }, private_key, # signed with the private key algorithm='RS384', # algorithm for the key headers={"kid": key_id}) # kid is required for smart bulk data server # And then POST it to the token endpont r = session.post(token_endpoint, data={ 'scope': 'system/*.read', 'grant_type': 'client_credentials', 'client_assertion_type': 'urn:ietf:params:oauth:client-assertion-type:jwt-bearer', 'client_assertion': assertion }) token_response = r.json() # And inspect the response: token_response # Two important fields we need to keep track of are the token itself, and the expire time. # Tokens are only valid for a certain amount of time, and once they expire we will need to fetch a new one via the same process as above. # `expires_in` is in seconds from the current time, so we'll add that to the current time to get a timestamp we can compare against. # # Note that for this example we requested and received `'scope': 'system/*.read'` which allows access to all resource types. In practice, requesting access to all resource types is generally not recommended, and servers do not always support asking for `*` scopes. Generally it is recommended to request only the minimal level of access necessary. # In[8]: token = token_response['access_token'] expire_time = datetime.datetime.now() + datetime.timedelta(seconds=token_response['expires_in']) # To make this easier for ourselves, let's package this up into a `get_token()` function that we can call anytime we need to use a token. If the current token is still valid, use that, or if it has expired, fetch a new one. The logic is exactly the same as the previous steps we just ran: # In[9]: def get_token(): global token, expire_time if datetime.datetime.now() < expire_time: # the existing token is still valid so return it return token assertion = jwt.encode({ 'iss': client_id, 'sub': client_id, 'aud': token_endpoint, 'exp': int((datetime.datetime.now() + datetime.timedelta(minutes=5)).timestamp()) }, private_key, algorithm='RS384', headers={"kid": key_id}) r = session.post(token_endpoint, data={ 'scope': 'system/*.read', 'grant_type': 'client_credentials', 'client_assertion_type': 'urn:ietf:params:oauth:client-assertion-type:jwt-bearer', 'client_assertion': assertion }) token_response = r.json() token = token_response['access_token'] expire_time = datetime.datetime.now() + datetime.timedelta(seconds=token_response['expires_in']) return token # ## Starting, Checking, and Downloading the Export # # Now that we have an access token, the next step in using Bulk Data is to request the export of data, via a "kick-off request". This is an asynchronous request -- once the request is accepted, instead of returning the results directly, the server response will point to a URL where the client can check the status. # # There are three levels of export: # - **Patient**, to obtain resources related to all Patients # - **Group**, to obtain resources associated with a particular [Group](https://www.hl7.org/fhir/group.html) # - **System**, to obtain all resources, whether or not they are associated with a patient # # For this exercise we will initially only request Patient-level data, but the general process for Groups and System-level data is exactly the same - there is just a different endpoint to hit, and a different set of data will be returned. # # There are also a number of parameters that may be set, but to keep things simple we will only use the `_type` parameter, to request only `Patient` and `Condition` resource types. # # ๐Ÿ“˜ [Read more about the Bulk Data Kick-off Request](http://hl7.org/fhir/uv/bulkdata/export.html#bulk-data-kick-off-request) # # Let's make the export request and inspect the response headers. For "Patient" level data, the URL we want to hit is `{server}/Patient/$export`. Our token is used in the "Authorization" header in the format `"Bearer {token}"`. # In[10]: r = session.get(f'{server_url}/Patient/$export?_type=Patient,Condition', headers={'Authorization': f'Bearer {get_token()}', 'Accept': 'application/fhir+json', 'Prefer': 'respond-async'}) print(r.headers) # We see the status URL in the `Content-Location` header, so let's save that into a variable. # In[11]: check_url = r.headers['Content-Location'] # We can now check the status by getting that URL, and the HTTP status code of the response will indicate the exort status. # - Code **200** means the export is complete, and the response body will indicate the location # - Code **202** means the export is still in progress # - Codes in the range **4xx-5xx** indicate an error has occurred. 4xx codes generally indicate an error in the request, and 5xx codes generally indicate a server error. # # Note that in production environments it is recommended to check the status as infrequently as possible, to minimize the load on the server. In this case we expect the export to complete in just a few seconds so the impact of checking every two seconds is minimal. The server will also include a "Retry-After" header which will give us a hint on how long to wait before trying again. # We'll check that status in a loop, and break out of the loop when we get a complete or error response. We'll print status each time through the loop, and the response body when the export is complete. # In[12]: # Now we check the status in a loop from time import sleep while True: r = session.get(check_url, headers={'Authorization': f'Bearer {get_token()}', 'Accept': 'application/fhir+json'}) if r.status_code == 200: # complete response = r.json() print(response) break elif r.status_code == 202: # in progress print(r.headers) delay = r.headers['Retry-After'] print(f"Sleeping {delay} seconds before retrying") sleep(int(delay)) else: # error print(r.text) break # We can see that the response points us to one or more NDJSON (Newline Delimited JSON) files per resource type, in the `output` field of the response. # # Note that in this case the volume of data is relatively small, and there is only one entry in the list per resource type, but for large datasets it is possible that there could be multiple files (and therefore multiple entries in this list) per resource type. # # Let's save that list to a variable. # In[13]: output_files = response['output'] output_files # Now we can loop through the list and download each one. Each file is an NDJSON, so that means we'll see one resource per line. # # To make each step clear and distinct, we'll keep a dict of `{ resourceType: [resources,...]}` which we can process later. # # Note: for this exercise we are only reading the NDJSON files into a dict in memory, but in practice you may want to save the file locally first in case there are errors in processing, especially if the files are large. # In[14]: import json resources_by_type = {} for output_file in tqdm(output_files): download_url = output_file['url'] resource_type = output_file['type'] r = session.get(download_url, headers={'Authorization': f'Bearer {get_token()}', 'Accept': 'application/fhir+json'}) ndjson = r.text.strip() # remove any whitespace, in particular trailing newlines if resource_type not in resources_by_type: resources_by_type[resource_type] = [] # NDJSON can't be parsed as a whole, we have to process it line-by-line for line in ndjson.split('\n'): resource = json.loads(line) resources_by_type[resource_type].append(resource) # This is a large amount of JSON data, only uncomment this line if you care to review # print(resources_by_type) # ## Converting to DataFrames # # Finally, let's convert these into DataFrames. # # The quick-and-dirty option is to use the Pandas `json_normalize()` function to parse a list of `dict`s into a DataFrame. # # ๐Ÿ“˜ [Read more about `pandas.json_normalize`](https://pandas.pydata.org/docs/reference/api/pandas.json_normalize.html) # # In[15]: import pandas as pd resource_dfs = {} for resource_type, resources in resources_by_type.items(): resource_dfs[resource_type] = pd.json_normalize(resources) # Now we can work with them by type: resource_dfs['Patient'] # This works, but it's clearly not ideal in how it handles nested fields, such as the nested lists of the `name` field. One way we can do a little better is with the flatten_json library: # # In[16]: from flatten_json import flatten for resource_type, resources in resources_by_type.items(): resource_dfs[resource_type] = pd.json_normalize(list(map(lambda r: flatten(r), resources))) # Now let's take another look resource_dfs['Patient'] # Let's look at just one row so it's easier to see all the columns and an example value: # In[17]: with pd.option_context('display.max_rows', 1000, 'display.max_columns', 10): print(resource_dfs['Patient'].loc[0].T) # Next, what if we know in advance we will only want certain fields? # # Let's follow the same pattern the FHIR-PYrate library uses, and use [FHIRPath](https://hl7.org/fhir/fhirpath.html) to define the fields we want to extract, along with a friendly name. # For this we'll use the [fhirpathpy](https://github.com/beda-software/fhirpath-py) library. # # [FHIRPath is](http://hl7.org/fhirpath/N1/): # # > a path based navigation and extraction language, somewhat like XPath. Operations are expressed in terms of the logical content of hierarchical data models, and support traversal, selection and filtering of data. # If you are not familiar with FHIRPath, [Section 3 of the FHIRPath spec](http://hl7.org/fhirpath/N1/#path-selection) describes some of the basics. # In[18]: import fhirpathpy fhir_paths = [ ["id", "identifier[0].value"], ["gender", "gender"], ["date_of_birth", "birthDate"], ["marital_status", "maritalStatus.coding.first().code"] ] # compile the fhirpath so they can be reused. this will result in better performance on large datasets for f in fhir_paths: f[1] = fhirpathpy.compile(f[1]) for resource_type, resources in resources_by_type.items(): filtered_resources = [] for resource in resources: filtered_resource = {} for f in fhir_paths: fieldname = f[0] func = f[1] filtered_resource[fieldname] = func(resource) # fhirpathpy always returns a list, which can make the DataFrame messy # if it's a list with only one item, extract the item from the list if isinstance(filtered_resource[fieldname], list) and len(filtered_resource[fieldname]) == 1: filtered_resource[fieldname] = filtered_resource[fieldname][0] filtered_resources.append(filtered_resource) resource_dfs[resource_type] = pd.json_normalize(list(map(lambda r: flatten(r), filtered_resources))) resource_dfs['Patient'] # ## Bringing it all together # # Now we have everything we need to connect to a FHIR server that supports Bulk Data, request and download exported data, and convert it into a DataFrame. Let's bring everything together from the previous steps into one class with a clear entrypoint. # In[19]: import requests import jwt import datetime import json import fhirpathpy from flatten_json import flatten from typing import Optional from collections import defaultdict class BulkDataFetcher: def __init__( self, base_url: str, client_id: str, private_key: str, key_id: str, endpoint: Optional[str] = None, session: Optional[str] = None ): self.base_url = base_url self.client_id = client_id self.private_key = private_key self.key_id = key_id self.token = None self.token_expire_time = None if endpoint is None: self.endpoint = "Patient" else: self.endpoint = endpoint if session is None: self.session = requests.Session() else: self.session = session r = self.session.get(f'{base_url}/.well-known/smart-configuration') smart_config = r.json() self.token_endpoint = smart_config['token_endpoint'] self.resource_types = [] self.fhir_paths = {} # Store raw FHIR resource instances; populated as part of get_dataframes() self.resources_by_type = {} def get_token(self): if self.token and datetime.datetime.now() < self.expire_time: # the existing token is still valid so use it return self.token assertion = jwt.encode({ 'iss': self.client_id, 'sub': self.client_id, 'aud': self.token_endpoint, 'exp': int((datetime.datetime.now() + datetime.timedelta(minutes=5)).timestamp()) }, self.private_key, algorithm='RS384', headers={"kid": key_id}) r = self.session.post(self.token_endpoint, data={ 'scope': 'system/*.read', 'grant_type': 'client_credentials', 'client_assertion_type': 'urn:ietf:params:oauth:client-assertion-type:jwt-bearer', 'client_assertion': assertion }) token_response = r.json() self.token = token_response['access_token'] self.expire_time = datetime.datetime.now() + datetime.timedelta(seconds=token_response['expires_in']) return self.token def add_resource_type(self, resource_type: str, fhir_paths = None): self.resource_types.append(resource_type) if fhir_paths: # fhir_paths=[ # ("id", "identifier[0].value"), # ("marital_status", "maritalStatus.coding[0].code") # ] compiled_fhir_paths = [(f[0], fhirpathpy.compile(f[1])) for f in fhir_paths] self.fhir_paths[resource_type] = compiled_fhir_paths def _invoke_request(self): types = ','.join(self.resource_types) url = f'{self.base_url}/{self.endpoint}/$export?_type={types}' print(f'Fetching from {url}') r = self.session.get(url, headers={'Authorization': f'Bearer {self.get_token()}', 'Accept': 'application/fhir+json', 'Prefer': 'respond-async'}) self.check_url = r.headers['Content-Location'] return self.check_url def _wait_until_ready(self): while True: r = self.session.get(self.check_url, headers={'Authorization': f'Bearer {self.get_token()}', 'Accept': 'application/fhir+json'}) # There are three possible options here: http://hl7.org/fhir/uv/bulkdata/export.html#bulk-data-status-request # Error = 4xx or 5xx status code # In-Progress = 202 # Complete = 200 if r.status_code == 200: # complete response = r.json() self.output_files = response['output'] return self.output_files elif r.status_code == 202: # in progress delay = r.headers['Retry-After'] sleep(int(delay)) else: raise RuntimeError(r.text) def get_dataframes(self): self._invoke_request() self._wait_until_ready() resources_by_type = {} self.resources_by_type = {} # Reset store of raw FHIR resources each time this is run for output_file in self.output_files: download_url = output_file['url'] resource_type = output_file['type'] r = self.session.get(download_url, headers={'Authorization': f'Bearer {get_token()}', 'Accept': 'application/fhir+json'}) ndjson = r.text.strip() if resource_type not in resources_by_type: resources_by_type[resource_type] = [] self.resources_by_type[resource_type] = [] for line in ndjson.split('\n'): resource = json.loads(line) # Make raw resource instances available for future use self.resources_by_type[resource_type].append(resource) if resource_type in self.fhir_paths: fhir_paths = self.fhir_paths[resource_type] filtered_resource = {} for f in fhir_paths: fieldname = f[0] func = f[1] filtered_resource[fieldname] = func(resource) if isinstance(filtered_resource[fieldname], list) and len(filtered_resource[fieldname]) == 1: filtered_resource[fieldname] = filtered_resource[fieldname][0] resource = filtered_resource resources_by_type[resource_type].append(resource) dfs = {} for resource_type, resources in resources_by_type.items(): dfs[resource_type] = pd.json_normalize(list(map(lambda r: flatten(r), resources))) return dfs def get_example_resource(self, resource_type: str, resource_id: Optional[str] = None): if self.resources_by_type is None: print("You need to run get_dataframes() first") return None if resource_type not in self.resources_by_type: print(f"{resource_type} not available. Try one of these: {', '.join(self.resources_by_type.keys())}") return None if resource_id is None: return self.resources_by_type[resource_type][0] resource = [r for r in self.resources_by_type[resource_type] if r['id'] == resource_id] if len(resource) > 0: return resource[0] print(f"No {resource_type} with id={resource_id} was found.") return None def reprocess_dataframes(self, fhir_paths): return BulkDataFetcher._reprocess_dataframes(self.resources_by_type, fhir_paths) @classmethod def _reprocess_dataframes(cls, obj_resources_by_type, user_fhir_paths): parsed_resources_by_type = defaultdict(list) for this_resource_type in obj_resources_by_type.keys(): if this_resource_type in user_fhir_paths: user_fhir_paths[this_resource_type] = [(f[0], fhirpathpy.compile(f[1])) for f in user_fhir_paths[this_resource_type]] for resource in obj_resources_by_type[this_resource_type]: if this_resource_type in user_fhir_paths: filtered_resource = {} for f in user_fhir_paths[this_resource_type]: fieldname = f[0] func = f[1] filtered_resource[fieldname] = func(resource) if isinstance(filtered_resource[fieldname], list) and len(filtered_resource[fieldname]) == 1: filtered_resource[fieldname] = filtered_resource[fieldname][0] parsed_resources_by_type[this_resource_type].append(filtered_resource) else: parsed_resources_by_type[this_resource_type].append(resource) dfs = {} for t, res in parsed_resources_by_type.items(): dfs[t] = pd.json_normalize(list(map(lambda r: flatten(r), res))) return dfs # In[20]: # And then to invoke it: # create a BulkDataFetcher with our credentials fetcher = BulkDataFetcher( base_url=server_url, client_id=client_id, private_key=private_key, key_id=key_id, session=session ) # add a resource type of interest, with some FHIRPath field mappings fetcher.add_resource_type('Patient', [ ("id", "identifier[0].value"), ("gender", "gender"), ("date_of_birth", "birthDate"), ("marital_status", "maritalStatus.coding.first().code") ]) # add another resource type, with no FHIRPath mappings (load the entire resource) fetcher.add_resource_type('Condition') dfs = fetcher.get_dataframes() dfs['Patient'] # In[21]: dfs['Condition'] # ## Group export # # [ยง170.315(g)(10) Standardized API for patient and population services](https://www.healthit.gov/test-method/standardized-api-patient-and-population-services) requires [`group-export`](https://hl7.org/fhir/uv/bulkdata/OperationDefinition-group-export.html) as of December 2022. # # This is therefore the FHIR Bulk Data endpoint you are likely to find in EHRs. # # To use this endpoint, you will need the ID of the group of patients you want to export. In a production setting, this would typically be provided by the administrators of the EHR. # # For the `bulk-data.smarthealthit.org` testing server, we can ask it for a list of groups via the FHIR API: # In[22]: r = session.get(f'{server_url}/Group', headers={'Authorization': f'Bearer {get_token()}', 'Accept': 'application/fhir+json'}) r.json() # Let's quickly pull this into a Pandas DataFrame to make it easier to read: # In[23]: groups = pd.json_normalize(r.json()['entry'])[['resource.id', 'resource.name', 'resource.quantity']] groups # Now we can request the patients and associated data for a specific group: # In[24]: group_id = groups.loc[0, 'resource.id'] fetcher = BulkDataFetcher( base_url=server_url, client_id=client_id, private_key=private_key, key_id=key_id, session=session, # Tell the BulkDataFetcher to request data from the specified group rather than all patients endpoint=f'Group/{group_id}' ) # add a resource type of interest, with some FHIRPath field mappings fetcher.add_resource_type('Patient', [ ("id", "identifier[0].value"), ("gender", "gender"), ("date_of_birth", "birthDate"), ("marital_status", "maritalStatus.coding.first().code") ]) # add another resource type, with no FHIRPath mappings (load the entire resource) fetcher.add_resource_type('Condition') dfs = fetcher.get_dataframes() dfs['Patient'] # A number of different FHIR resources are available from the test server: # # - [AllergyIntolerance](https://hl7.org/fhir/R4/allergyintolerance.html) # - [CarePlan](https://hl7.org/fhir/R4/careplan.html) # - [CareTeam](https://hl7.org/fhir/R4/careteam.html) # - [Claim](https://hl7.org/fhir/R4/claim.html) # - [Condition](https://hl7.org/fhir/R4/condition.html) # - [Device](https://hl7.org/fhir/R4/device.html) # - [DiagnosticReport](https://hl7.org/fhir/R4/diagnosticreport.html) # - [DocumentReference](https://hl7.org/fhir/R4/documentreference.html) # - [Encounter](https://hl7.org/fhir/R4/encounter.html) # - [ExplanationOfBenefit](https://hl7.org/fhir/R4/explanationofbenefit.html) # - [ImagingStudy](https://hl7.org/fhir/R4/imagingstudy.html) # - [Immunization](https://hl7.org/fhir/R4/immunization.html) # - [MedicationRequest](https://hl7.org/fhir/R4/medicationrequest.html) # - [Observation](https://hl7.org/fhir/R4/observation.html) # - [Patient](https://hl7.org/fhir/R4/patient.html) # - [Procedure](https://hl7.org/fhir/R4/procedure.html) # # Try modifying the request above to pull in resource types other than `Patient` and `Condition`. The links above go to the FHIR documentation for each resource type, which can help with constructing FHIRPaths. # In[25]: # Try adding an additional resources fetcher.add_resource_type('Observation') dfs = fetcher.get_dataframes() dfs['Observation'] # In[26]: # Try filtering to just Observations of smoking status # `fetcher.reprocess_dataframes()` does the same thing as `get_dataframes()`, # but with FHIRPaths and without re-downloading everything dfs = fetcher.reprocess_dataframes({ 'Patient': [ ("id", "identifier[0].value"), ("gender", "gender"), ("date_of_birth", "birthDate"), ("marital_status", "maritalStatus.coding.first().code") ], 'Observation': [ ("id", "id"), ("patient", "subject.reference"), ("type", "code.coding.first().code"), ("type_display", "code.coding.first().display"), ("code", "valueCodeableConcept.coding.first().code"), ("code_display", "valueCodeableConcept.coding.first().display"), ] }) with pd.option_context('display.max_rows', 100, 'display.min_rows', 100): display(dfs['Observation'][dfs['Observation']['type'] == '72166-2']) # ## Creating FHIRPaths # # It may be helpful to use an online tool like to assist with creating FHIRPaths for filtering the FHIR resources down for creating DataFrames. (Note that you should not use tools like this with identified patient data.) # # We have a convenience method to get an example resource in JSON format from the `fetcher` object: # In[27]: print(json.dumps(fetcher.get_example_resource('Observation'), indent=4)) # This can be copied and pasted into to experiment with FHIRPaths. Note that the JavaScript library used on this testing website is not the same as the Python library used in this notebook, so there may be some implementation differences. # ## Testing with Synthea data # # Having test data is very helpful when developing code that uses FHIR Bulk Data. The test data from may not have all the data elements you need for a specific research use case. [Synthea](https://mitre.github.io/fhir-for-research/modules/synthea-overview) can be used for generating customized synthetic data in FHIR format. Below we'll look at how to load `.ndjson` from Synthea into this notebook and use `reprocess_dataframes()` with FHIRPaths to convert into Pandas DataFrames. # # First, we'll create a short class to mimic the functionality of `BulkDataFetcher` but with loading the `.ndjson` directly from disk rather than via a bulk data export. # In[28]: class SyntheaDataFetcher: def __init__(self, ndjson_file_path): self.resources_by_type = {} num_lines = sum(1 for line in open(ndjson_file_path,'r')) with open(ndjson_file_path, 'r') as file: for line in tqdm(file, total=num_lines): json_obj = json.loads(line) this_resource_type = json_obj['resourceType'] if this_resource_type not in self.resources_by_type: self.resources_by_type[this_resource_type] = [] self.resources_by_type[this_resource_type].append(json_obj) print("Resources available: ") print('\n'.join(['- '+ x for x in self.resources_by_type.keys()])) def get_example_resource(self, resource_type: str, resource_id: Optional[str] = None): if self.resources_by_type is None: print("You need to run get_dataframes() first") return None if resource_type not in self.resources_by_type: print(f"{resource_type} not available. Try one of these: {', '.join(self.resources_by_type.keys())}") return None if resource_id is None: return self.resources_by_type[resource_type][0] resource = [r for r in self.resources_by_type[resource_type] if r['id'] == resource_id] if len(resource) > 0: return resource[0] print(f"No {resource_type} with id={resource_id} was found.") return None def reprocess_dataframes(self, user_fhir_paths): return BulkDataFetcher._reprocess_dataframes(self.resources_by_type, user_fhir_paths) # Load in 40 patients of Synthea data. # The original data come from > 1K Sample Synthetic Patient Records, FHIR R4 synthea_fetcher = SyntheaDataFetcher('synthea_100.ndjson') # Here is how to apply FHIRPaths to filter the Synthea data: # In[29]: dfs = synthea_fetcher.reprocess_dataframes({'Patient': [('id', 'id')]}) dfs['Patient'] # You can also get a sample resource to look at the raw JSON: # In[30]: print(synthea_fetcher.get_example_resource('Patient')) # ### Try it yourself # # Using FHIRPath, create the necessary dataframes to answer the following questions: # # 1. How many patients in the dataset have ever received a flu vaccine? # 2. What are the five most common conditions that patients have been diagnosed with? (Use only the first diagnosis of a given condition for each patient.) # 3. What is the most common medication (in MedicationRequest), and what are the top 5 encounter types associated with these medications? # # Remember that you can look at the [FHIR resource documentation](https://hl7.org/fhir/R4/resourcelist.html) to see what data elements are in each resource. You can also use `synthea_fetcher.get_example_resource('ResourceTypeHere')` with for testing out FHIRPaths if needed. # ## Summary # # Through this exercise we built a reusable tool to connect to a FHIR server with Bulk Data capabilities, export a set of resource types, and convert that data into DataFrames for analysis. # #