import json
import os
import subprocess
import requests
# API token is always available in the jupyterhub environment
token = os.environ["JUPYTERHUB_API_TOKEN"]
# create a requests Session object with our auth token
s = requests.Session()
s.headers = {"Authorization": f"token {token}"}
# get the running server(s) from `jupyter notebook list`
p = subprocess.run(["jupyter", "notebook", "list", "--json"], capture_output=True, text=True)
servers = []
for line in p.stdout.splitlines():
servers.append(json.loads(line))
servers
[{'base_url': '/user/test-1/', 'hostname': '127.0.0.1', 'notebook_dir': '/private/tmp/test-1', 'password': False, 'pid': 94573, 'port': 50632, 'secure': False, 'sock': '', 'token': '', 'url': 'http://127.0.0.1:50632/user/test-1/'}]
# assume one server for now
# can select by comparing base_url to $JUPYTERHUB_SERVICE_PREFIX
server = servers[0]
jupyter_api = f"{server['url']}api"
# List sessions using jupyter REST API
sessions = s.get(f"{jupyter_api}/sessions").json()
# sort by activity
# last_activity is ISO8601 strings, sortable without parsing
sessions = sorted(sessions, key=lambda s: s["kernel"]["last_activity"], reverse=True)
sessions
[{'id': '42481fdd-c6a8-4e69-ae9f-c2c2b6b2c233', 'path': 'cull-kernels.ipynb', 'name': 'cull-kernels.ipynb', 'type': 'notebook', 'kernel': {'id': '1e884864-3de1-4cad-a1f1-8f4a055888e4', 'name': 'python3', 'last_activity': '2021-04-27T09:37:55.397236Z', 'execution_state': 'busy', 'connections': 2}, 'notebook': {'path': 'cull-kernels.ipynb', 'name': 'cull-kernels.ipynb'}}, {'id': 'f58a8d78-3a72-4cc1-bc58-2ef167120011', 'path': 'Untitled1.ipynb', 'name': 'Untitled1.ipynb', 'type': 'notebook', 'kernel': {'id': '83bd39eb-2e9a-4dee-83b3-63102f071193', 'name': 'python3', 'last_activity': '2021-04-27T09:37:05.716763Z', 'execution_state': 'idle', 'connections': 0}, 'notebook': {'path': 'Untitled1.ipynb', 'name': 'Untitled1.ipynb'}}]
# don't shutdown the most recently active kernel, but shut down all others
if sessions:
print(f"Not shutting down most recently active session: {sessions[0]['name']}")
for session in sessions[1:]:
kernel = session['kernel']
name = session['name']
if kernel['execution_state'] == 'busy':
print(f"Not shutting down busy kernel for {name}")
continue
if kernel['connections']:
print(f"Not shutting down kernel with {kernel['connections']} connections: {name}")
continue
print(f"Shutting down session {name} idle since {kernel['last_activity']}")
r = s.delete(f"{jupyter_api}/sessions/{session['id']}")
r.raise_for_status()
Not shutting down most recently active session: cull-kernels.ipynb Shutting down session Untitled1.ipynb idle since 2021-04-27T09:37:05.716763Z