This notebook harvests data from all of NSW State Archives online indexes, saving the data as a collection of easily downloadable CSV files.
Before you harvest the data, you need to get the details of all the indexes.
If you just want the data, my latest harvest of the indexes is available from this repository.
If you'd like to explore the harvested data, try the Index Explorer!
import os
import shutil
import time
from pathlib import Path
import pandas as pd
import requests_cache
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from tqdm.auto import tqdm
def filter_errors(response):
"""
Errors are returned with a status code of 200, so we have to stop them from being cached.
"""
return "errors" not in response.json()
s = requests_cache.CachedSession(
allowable_methods=("GET", "POST"), filter_fn=filter_errors
)
retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504])
s.mount("http://", HTTPAdapter(max_retries=retries))
s.mount("https://", HTTPAdapter(max_retries=retries))
%%capture
# Load environment variables if available
%load_ext dotenv
%dotenv
The index tables are populated using a GraphQL API query. Here we create a basic GraphQL query to request a page of table data for a specific index.
# Define the GraphQL query
q_str = """
query SaraIndex($slug: String, $table: PrimoTableInput) {
stateArchives {
saraIndex(slug: $slug) {
path
title
percentDigitised
table(input: $table) {
pagination {
page
perPage
results
total
totalPages
}
headers {
title
}
rows {
columns {
content
link
}
}
}
}
}
}
"""
The complete API query includes variables that supply a slug to identifies the index and pagination details to specify a slice of the dataset. We can change these values as we work through the indexes to retrieve the full datasets.
# Construct the basic data payload, the table variables will be updated by the harvest
query = {
"query": q_str,
"operationName": "SaraIndex",
"variables": {
"slug": "colonial-architect-index",
"table": {"query": "", "pagination": {"page": 1, "perPage": 100}},
},
}
def get_total(query):
"""
Get the total number of pages in the result set returned by the supplied API query.
Returns the number of pages in the results set as an integer.
"""
response = s.post("https://api.mhnsw.au/graphql", json=query)
data = response.json()
# Sometimes the requests fail and return JSON with an "errors" key
# These seem to be random and re-requesting after a short pause seems to work ok
if "errors" not in data:
pagination = data["data"]["stateArchives"]["saraIndex"]["table"]["pagination"]
return pagination["totalPages"]
else:
time.sleep(60)
get_total()
def get_headers(table):
"""
Get a list of column names from the table.
"""
headers = []
for h in table["headers"]:
if h["title"]:
headers.append(h["title"])
else:
headers.append("Details")
return headers
def get_rows(table):
"""
Get data from all the rows/columns on the current table page.
Returns a list of lists representing rows/columns.
"""
rows = []
for row in table["rows"]:
cols = []
for col in row["columns"]:
if col["content"] == "Details" and col["link"]:
cols.append(col["link"])
else:
cols.append(col["content"])
rows.append(cols)
return rows
def harvest_index(query, output_dir="indexes"):
"""
Harvest all the the data returned by the supplied query.
The data is saved as a CSV file in the specified output directory.
"""
dfs = []
current_page = 0
# Get the total number of pages
total_pages = get_total(query)
with tqdm(total=total_pages, desc=query["variables"]["slug"]) as pbar:
# Continue until the current page equals the total number of pages
while current_page < total_pages:
current_page += 1
# Set the page required in the table pagination
query["variables"]["table"]["pagination"]["page"] = current_page
response = s.post("https://api.mhnsw.au/graphql", json=query)
data = response.json()
# Sometimes the requests fail and return JSON with an "errors" key
# These seem to be random and re-requesting after a short pause seems to work ok
if "errors" not in data:
# Get the table data from the response
table = data["data"]["stateArchives"]["saraIndex"]["table"]
# Get column headers
headers = get_headers(table)
# Get rows
rows = get_rows(table)
# Create a dataframe from the rows and headers and append to a list
dfs.append(pd.DataFrame(rows, columns=headers))
pbar.update(1)
if not response.from_cache:
time.sleep(0.5)
# If there's been an error, wait for 60 secs then try again
else:
current_page = current_page - 1
time.sleep(60)
# Combine all the dataframes
df = pd.concat(dfs)
# I don't think this is necessary, but just in case
df.drop_duplicates(inplace=True)
# Set up output directory and save df as a CSV file
output_dir = Path(output_dir)
output_dir.mkdir(exist_ok=True)
df.to_csv(Path(output_dir, f"{query['variables']['slug']}.csv"), index=False)
del df
def harvest_indexes(query, indexes, output_dir="indexes"):
"""
Harvest data from all of the indexes.
"""
for index in tqdm(indexes):
slug = index["url"].strip("/").split("/")[-1]
query["variables"]["slug"] = slug
harvest_index(query, output_dir=output_dir)
Note that this can take quite a while and sometimes there are errors that interrupt the harvest. I've noted in the code that sometimes the JSON response includes an 'errors' key. These problems seem temporary and re-requesting after a short pause seems to work ok. Other errors sometimes result in a reponse that's not JSON, generating a JSONDecodeError
. I haven't tried to handle these in the code, as they don't seem as common and I'm not quite sure what the problem is. But because the requests/responses are all cached, you can simply re-run harvest_indexes()
to pick up where you left off.
# Load the pre-harvested list of indexes
indexes = pd.read_csv("indexes.csv").to_dict("records")
# Harvest all the indexes to the default directory ("indexes").
harvest_indexes(query, indexes)
To harvest a single index you just need to update the slug
parameter in query
. The index's slug
is the last part of the url. For example: 'unemployed-in-sydney'.
query["variables"]["slug"] = "unemployed-in-sydney"
harvest_index(query)
# IGNORE -- THIS CELL IS FOR AUTOMATED TESTING ONLY
if os.getenv("GW_STATUS") == "dev":
indexes = pd.read_csv("indexes.csv").to_dict("records")[:1]
harvest_indexes(query, indexes, output_dir="test")
index = indexes[0]
slug = index["url"].strip("/").split("/")[-1]
assert Path("test", f"{slug}.csv").exists()
assert not pd.read_csv(Path("test", f"{slug}.csv")).empty
shutil.rmtree("test")
Created by Tim Sherratt for the GLAM Workbench.