Get a list of agencies associated with a function

RecordSearch describes the business of government in terms of 'functions'. A function is an area of responsibility assigned to a particular government agency. Over time, functions change and move between agencies.

If you're wanting to track particular areas of government activity, such as 'migration' or 'meteorology', it can be useful to start with functions, then follow the trail through agencies, series created by those agencies, and finally items contained within those series.

Functions are also organised into a hierarchy, so moving up or down the hierarchy can help you refine or broaden your search.

This notebook helps you create a list of all agencies associated with a particular function. Click on the 'Appmode' button to hide all the code.

Some limitations...

The function selector in the form below uses the hierarchy of functions that are currently built into the RecordSearch interface. However, there are numerous inconsistencies in RecordSearch, and a majority of the terms are not assigned to any agencies.

In [ ]:
import json
import os
import time

import arrow
import ipywidgets as widgets
import pandas as pd
from IPython.display import FileLink, display
from recordsearch_data_scraper.scrapers import RSAgencySearch
from slugify import slugify
from tinydb import Query, TinyDB
from tqdm.auto import tqdm
In [ ]:
def harvest_agencies(function):
    agencies = []
    search = RSAgencySearch(function=function, record_detail="full")
    with tqdm(total=search.total_results) as pbar:
        more = True
        while more:
            data = search.get_results()
            if data["results"]:
                agencies += data["results"]
                pbar.update(len(data["results"]))
                time.sleep(0.5)
            else:
                more = False
    return agencies
In [ ]:
def get_terms(function):
    """
    Gets the children of a given term
    """
    terms = []
    if "narrower" in function:
        for subf in function["narrower"]:
            terms.append(subf["term"])
            terms += get_terms(subf)
    return terms


def get_db():
    function = rsfunction.value
    if children.value == True:
        db_name = "data/db_agencies_{}_with_children.json".format(
            slugify(function["term"])
        )
    else:
        db_name = "data/db_agencies_{}.json".format(slugify(function["term"]))
    db = TinyDB(db_name)
    return db


def get_agencies(b):
    """
    Sends function terms off to the harvester to get related agencies.
    If you've selected the 'include children' options, it includes all
    the function terms below the selected one in the hierarchy.
    """
    harvest_status.clear_output()
    Record = Query()
    function = rsfunction.value
    db = get_db()
    terms = [function["term"]]
    if children.value == True:
        terms += get_terms(function)
    with harvest_status:
        for term in terms:
            print('\nHarvesting "{}"'.format(term))
            agencies = harvest_agencies(term)
            for agency in agencies:
                db.upsert(agency, Record.agency_id == agency["identifier"])

Select the function you want to harvest

  • Select a function from the dropdown list
  • Check 'include children' to add all the terms below the selected function in the hierarchy to the search
  • Click 'Get agencies' to start!
In [ ]:
# Set up the interface


def get_children(function, level):
    """
    Gets the children of the supplied term.
    Formats/indents the terms for the dropdown.
    """
    f_list = []
    if "narrower" in function:
        level += 1
        for subf in function["narrower"]:
            f_list.append(
                ("{}{} {}".format(level * "  ", level * "-", subf["term"]), subf)
            )
            f_list += get_children(subf, level=level)
    return f_list


def get_functions():
    # Load the JSON file of functions we've previously harvested
    with open("data/functions.json", "r") as json_file:
        functions = json.load(json_file)

    # Make the list of options for the dropdown
    functions_list = []
    for function in functions:
        functions_list.append((function["term"], function))
        functions_list += get_children(function, level=0)
    return functions_list


# Make the dropdown selector
rsfunction = widgets.Dropdown(
    options=get_functions(), description="Function:", disabled=False
)

# Make a checkbox to include children
children = widgets.Checkbox(value=False, description="include children", disabled=False)

# A button to start the harvest
start = widgets.Button(
    description="Get agencies",
    disabled=False,
    button_style="primary",  # 'success', 'info', 'warning', 'danger' or ''
    tooltip="Start harvest",
)

# Add function to the button
start.on_click(get_agencies)

display(
    widgets.HBox(
        [rsfunction, children, start],
        layout=widgets.Layout(
            padding="50px", margin="20px 0 0 0", border="1px solid #999999"
        ),
    )
)
harvest_status = widgets.Output()
harvest_status

Save and download the harvested data

RecordSearch data includes a rich set of relationships. In the case of agencies, there are links to functions, people, and to previous, subsequent, controlled, and controlling agencies. It's hard to present this complex, nested data in a flat format, such as a CSV file. For convenience, the CSV file created for download doesn't include related agencies, people, and functions. It does, however, include start_function and end_function fields that indicate when the agency had responsibility for the selected function. If you've included child functions, the start_function and end_function fields contain the earliest and latest dates from any of the harvested terms.

In [ ]:
def parse_date(date):
    try:
        if "-" in date:
            parsed_date = arrow.get(date)
        else:
            parsed_date = arrow.get(date, "YYYY")
    except TypeError:
        parsed_date = None
    return parsed_date


def make_filename():
    function = rsfunction.value
    if children.value == True:
        filename = "data/agencies_{}_with_children".format(slugify(function["term"]))
    else:
        filename = "data/agencies_{}".format(slugify(function["term"]))
    return filename


def save_csv():
    db = get_db()
    agencies = db.all()
    function = rsfunction.value
    terms = [function["term"]]
    if children.value == True:
        terms += get_terms(function)
    rows = []
    for agency in agencies:
        earliest_date = None
        latest_date = None
        for function in agency["functions"]:
            if function["identifier"].lower() in terms:
                start = parse_date(function["start_date"])
                end = parse_date(function["start_date"])
                if start and earliest_date and start < parse_date(earliest_date):
                    earliest_date = function["start_date"]
                elif start and not earliest_date:
                    earliest_date = function["start_date"]
                if end and latest_date and end > parse_date(latest_date):
                    latest_date = function["end_date"]
                elif end and not latest_date:
                    latest_date = function["end_date"]
        row = {
            k: agency[k]
            for k in ["agency_id", "title", "dates", "agency_status", "location"]
        }
        row["start_function"] = earliest_date
        row["end_function"] = latest_date
        rows.append(row)
    df = pd.DataFrame(rows)
    # The 'contents_date' column is a dictionary, we need to flatten this out so we can easily work with the values
    df = pd.concat(
        [df, pd.DataFrame((d for idx, d in df["dates"].iteritems()))], axis=1
    )
    # Delete the old date field
    del df["dates"]
    # Rename column
    df.rename({"date_str": "dates"}, axis=1, inplace=True)
    df = df[
        [
            "agency_id",
            "title",
            "agency_status",
            "dates",
            "start_date",
            "end_date",
            "location",
            "start_function",
            "end_function",
        ]
    ]
    filename = "{}.csv".format(make_filename())
    df.to_csv(filename, index=False)


def save_json():
    db = get_db()
    agencies = db.all()
    filename = "{}.json".format(make_filename())
    with open(filename, "w") as json_file:
        json.dump(agencies, json_file, indent=4)


def save_data(b):
    out.clear_output()
    try:
        save_csv()
        save_json()
        filename = make_filename()
    except KeyError:
        with out:
            print("You need to harvest some data first!")
    else:
        with out:
            display(FileLink("{}.json".format(filename)))
            display(FileLink("{}.csv".format(filename)))


# A button to start the harvest
download = widgets.Button(
    description="Save data",
    disabled=False,
    button_style="primary",  # 'success', 'info', 'warning', 'danger' or ''
    tooltip="Save data for download",
)

# Add function to the button
download.on_click(save_data)

out = widgets.Output()

display(download)
display(out)
In [ ]:
%%capture
# Load environment variables if available
%load_ext dotenv
%dotenv
In [ ]:
# TESTING
if os.getenv("GW_STATUS") == "dev":
    rsfunction.value = {"term": "arts", "narrower": []}
    start.click()

Created by Tim Sherratt as part of the GLAM Workbench.