Ghani, Rayid, Frauke Kreuter, Julia Lane, Adrianne Bradford, Alex Engler, Nicolas Guetta Jeanrenaud, Graham Henke, Daniela Hochfellner, Clayton Hunter, Brian Kim, Avishek Kumar, and Jonathan Morgan.
Before we begin, run the code cell below to initialize the libraries we'll be using in this assignment. We're already familiar with numpy
, pandas
, and psycopg2
from previous tutorials. Here we'll also be using scikit-learn
to fit modeling.
%pylab inline
import pandas as pd
import psycopg2
from sqlalchemy import create_engine
db_name = "appliedda"
hostname = "10.10.2.10"
# Insert team schema name below:
myschema = 'ada_tanf'
Our features are our independent variables or predictors. Good features make machine learning systems effective. The better the features the easier it is the capture the structure of the data. You generate features using domain knowledge. In general, it is better to have more complex features and a simpler model rather than vice versa. Keeping the model simple makes it faster to train and easier to understand rather then extensively searching for the "right" model and "right" set of parameters.
Machine Learning Algorithms learn a solution to a problem from sample data. The set of features is the best representation of the sample data to learn a solution to a problem.
Example of feature engineering are:
(such as city) which do not have a numerical value, and adding them to models as a binary value.
by binning, which you can do by various approaches like equal width, deciles, Fisher-Jenks, etc.
different aggregation functions (count, min, max, average, standard deviation, etc.) which summarize several values into one feature, aggregating over varying windows of time and space. For example, for policing or criminal justice problems, we may want to calculate the number (and min, max, mean, variance, etc.) of crimes within an m-mile radius of an address in the past t months for varying values of m and t, and then use all of them as features.
This notebook walks through creating the following features:
recp_age_end
(Continuous): age of the TANF recipient at "end_date" of the spellrecp_age_beg
(Continuous): age of the TANF recipient at "start_date" of the spelljob_during
(Binary): recipient has a job during the benefit spelljob_before
(Binary): recipient has a job before the benefit spellnum_cases
(Aggregation): The number of cases this spell representsavg_case_dur
(Aggregation): Average case duration
conn = psycopg2.connect(database=db_name, host = hostname)
cursor = conn.cursor()
The member
table has birth_date
, so it is quite easy to calculate the recp_age_*
features for the label_*
tables (as created in the creating_labels notebook) once we get the birth date. To do so, we first need to get the ch_dpa_caseid
identifier from the indcase_spells
table based on when our selected ind_spells
ended
sql = """
CREATE TEMP TABLE cohort_caseid AS
SELECT a.*, b.ch_dpa_caseid
FROM {schema}.labels_20080101 a
JOIN il_dhs.indcase_spells b
ON a.recptno = b.recptno
AND a.end_date = b.end_date
WHERE b.benefit_type = 'tanf46';
""".format(schema=myschema)
cursor.execute(sql)
df = pd.read_sql('select * from cohort_caseid', conn)
# check that our identifier ('recptno') is unique
df.recptno.nunique(), df.shape[0]
# SQL to calculate recipient age at the beginning and end of a spell
sql = '''
CREATE TEMP TABLE features_age AS
SELECT a.recptno,
extract(epoch from age(a.start_date, b.birth_date))/(3600.*24*365) AS recp_age_beg,
extract(epoch from age(a.end_date, b.birth_date))/(3600.*24*365) AS recp_age_end
FROM cohort_caseid a
LEFT JOIN il_dhs.member b
ON a.recptno = b.recptno AND a.ch_dpa_caseid = b.ch_dpa_caseid;
'''
cursor.execute(sql)
df = pd.read_sql('select * from features_age', conn)
# check that our identifier ('recptno') is unique
df.recptno.nunique(), df.shape[0]
We then merge this list to our labels and view the distribution of ages at the beginning and and of TANF spells for our cohort
sql = '''
SELECT a.recptno, a.label, b.recp_age_beg, b.recp_age_end
FROM {schema}.labels_20080101 a
LEFT JOIN features_age AS b
ON a.recptno = b.recptno;
'''.format(schema=myschema)
df = pd.read_sql(sql, conn)
df.recptno.nunique(), df.shape[0]
df.head()
df.groupby('label')[['recp_age_beg', 'recp_age_end']].describe().T
df[(df['recp_age_end']<1) | (df['recp_age_beg']<1)].head(10)
df[(df['recp_age_end']<1) | (df['recp_age_beg']<1)].shape
In order to facilitate creating this feature for several years of data, we combined all the above steps into a Python function, and added a final step that writes the feature table to the database.
In the step-by-step approach above, all SQL queries were entirely hard coded. As in the labels notebook, using a Python function with parameters so you can easily reuse for other values (eg prediction date). The function's parameters are:
preddate
: The year at which we are doing the prediction.schema
: Your team schema, where the label table will be written. The default value is set to myschema
, defined in the Python Setup section of this notebook.db_name
: Database name. This is the name of the SQL database we are using. The default value is set to db_name
, defined in the Python Setup section of this notebook.hostname
: Host name. This is the host name for the SQL database we are using. The default value is set to hostname
, defined in the Python Setup section of this notebook.overwrite
: Whether you want the function to overwrite tables that already exist. Before writing a table, the function will check whether this table exists, and by default will not overwrite existing tables.Note that we assume the corresponding label_<date>
table has already been created.
def spell_age_features(preddate,
schema=myschema,
db_name=db_name,
hostname=hostname,
overwrite=False):
# set the database connection
conn = psycopg2.connect(database=db_name, host = hostname)
cursor = conn.cursor()
# set variables based on prediction date
tbl_suffix = preddate.replace('-', '') #remove dashes
# Check if the table already exists:
cursor.execute('''
SELECT * FROM information_schema.tables
WHERE table_name = 'features_age_{tbl_suffix}'
AND table_schema = '{schema}';
'''.format(tbl_suffix=tbl_suffix, schema=schema))
# Let's write table if it does not exist (or if overwrite = True)
if not(cursor.rowcount) or overwrite:
print("Creating table")
sql = '''
-- get caseid's to find birth_date
CREATE TEMP TABLE cohort_caseid AS
SELECT a.*, b.ch_dpa_caseid
FROM {schema}.labels_{tbl_suffix} a
JOIN il_dhs.indcase_spells b
ON a.recptno = b.recptno AND a.end_date = b.end_date
WHERE b.benefit_type = 'tanf46';
DROP TABLE IF EXISTS {schema}.features_age_{tbl_suffix};
CREATE TABLE {schema}.features_age_{tbl_suffix} AS
SELECT a.recptno,
extract(epoch from age(a.start_date, b.birth_date))/(3600.*24*365) AS recp_age_beg,
extract(epoch from age(a.end_date, b.birth_date))/(3600.*24*365) AS recp_age_end
FROM cohort_caseid a
LEFT JOIN il_dhs.member b
ON a.recptno = b.recptno AND a.ch_dpa_caseid = b.ch_dpa_caseid;
COMMIT;
ALTER TABLE {schema}.features_age_{tbl_suffix} OWNER TO {schema}_admin;
COMMIT;
'''.format(tbl_suffix=tbl_suffix, schema=schema)
# print(sql) # to debug
cursor.execute(sql)
else:
print("Table already exists")
cursor.close()
sql = '''
SELECT * FROM {schema}.features_age_{tbl_suffix};
'''.format(tbl_suffix=tbl_suffix, schema=schema)
df = pd.read_sql(sql, conn)
return df
start_time = time.time()
df_test1 = spell_age_features('2008-01-01')
print('ages generated in {:.2f} seconds'.format(time.time()-start_time))
df_test1[['recp_age_beg', 'recp_age_end']].describe(percentiles=[0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99])
start_time = time.time()
df_test2 = spell_age_features('2009-01-01')
print('ages generated in {:.2f} seconds'.format(time.time()-start_time))
start_time = time.time()
df_test3 = spell_age_features('2010-01-01')
print('ages generated in {:.2f} seconds'.format(time.time()-start_time))
We can use the wage record data to define what it means to "have a job" before and during the benefit spell
For this exercise, we'll define that the individual has a job if they have at least 1 quarter in the time period (before/during/after the spell) with total earnings over a calculated minumum full-time employment wage (using IL's minimum hourly wage in the il_minimum_wage_by_year
table and assuming 35 hours per week and 13 weeks per quarter)
We will again start with our study cohort to subset the UI wage record data to just our population of interest.
Due to how the data is constructed, specifically that the wage data does not have a date index, we will define a "quarters_back()" function to create the WHERE clause.
Note: refer back to the Data Exploration for an example of using the
EXPLAIN
keyword in postgresql
def quarters_back(preddate, num_quarters=12):
# use Pandas datetime functions to easily access quarter info
predDate = pd.to_datetime(preddate)
# starting parameters
i = 0 # counter for loop
I = num_quarters # number of sequential quarters to select
Yr = predDate.year
Qt = predDate.quarter
# start with previous quarter:
# if it's the last quarter
if Qt==1:
Yr -= 1 # decrement to previous year
Qt = 4 # reset to 4th quarter
else:
Qt -= 1 # decrement to previous quarter
# list to collect (year=Y AND quarter=Q) combinations
where_list = []
while i<I:
# set year and quarter selection for this quarter
where_clause = '(year = ' + str(Yr) + ' AND quarter = ' + str(Qt) +')'
# add this clause to the list
where_list.append(where_clause)
# if it's the last quarter
if Qt==1:
Yr -= 1 # decrement to previous year
Qt = 4 # reset to 4th quarter
else:
Qt -= 1 # decrement to previous quarter
i += 1 # increment counter
# join all of the (year = Y AND quarter = Q) separated by OR
return(' OR '.join(where_list))
quarters_back('2008-01-01')
# first we need to find the SSNs of our cohort
# similarly to how we found the caseid
sql = """
CREATE TEMP TABLE cohort_ssn AS
SELECT a.*, b.ssn_hash AS ssn
FROM cohort_caseid a
JOIN il_dhs.member b
ON a.recptno = b.recptno AND a.ch_dpa_caseid = b.ch_dpa_caseid;
""".format(schema=myschema)
cursor.execute(sql)
We will also use the start_date
and end_date
columns to define whether a given job occured before, during, or after each spell.
start_time = time.time()
# create temp table with job history for this cohort
sql = '''
CREATE TEMP TABLE job_history_cohort_20080101 AS
SELECT *, CASE WHEN empl_quarter <= (start_date - '3 months'::interval) THEN 'before'
WHEN empl_quarter < end_date
AND empl_quarter > (start_date - '3 months'::interval) THEN 'during'
ELSE 'after' END job_spell_time
FROM (
SELECT a.recptno, a.start_date, a.end_date, b.wage,
year, quarter, b.ein, b.seinunit, b.empr_no,
to_date(b.year::text||right('0'||((b.quarter-1)*3+1)::text,2)||'01', 'YYYYMMDD')
AS empl_quarter
FROM cohort_ssn a
JOIN il_des_kcmo.il_wage b
ON a.ssn = b.ssn
WHERE ({where_list})
) q;
COMMIT;
'''.format(where_list=quarters_back('2008-01-01'))
# print(sql)
cursor.execute(sql)
print('job history generated in {:.2f} seconds'.format(time.time()-start_time))
sql = 'SELECT * FROM job_history_cohort_20080101'
df = pd.read_sql(sql, conn, parse_dates=['start_date', 'end_date', 'empl_quarter'])
df.info()
df.head()
df.groupby('job_spell_time')['wage'].describe(percentiles=[0.01,0.05,0.25,0.50,0.75,0.95,0.99])
df.groupby(['empl_quarter', 'job_spell_time'])['wage']\
.describe(percentiles=[0.01,0.05,0.25,0.50,0.75,0.95,0.99])
# number of inidividuals present in this selection of wage record data
df['recptno'].nunique()
There are many different ways we could summarize the job history data to include in our analysis; here we will create
To define "fully employed" for a given quarter, we'll calculate based on minimum hourly wage in IL
* 35 (work hours per week) * 13 (weeks per quarter). We can use the il_minimum_wage_by_year
table in the public
schema for this calculation.
start_time = time.time()
sql = '''
-- summarize earnings for each individual by quarter
-- note we can keep year for our next query, too
CREATE TEMP TABLE job_earnings_summary_20080101 AS
SELECT recptno, job_spell_time, empl_quarter, year, sum(wage) total_earnings
FROM job_history_cohort_20080101
GROUP BY recptno, job_spell_time, year, empl_quarter;
COMMIT;
-- create earnings features for each individual
-- that we found in the wage record data
CREATE TEMP TABLE employ_summary_20080101 AS
SELECT recptno,
sum(CASE WHEN job_spell_time = 'before' THEN total_earnings ELSE 0 END) tot_earn_before,
sum(CASE WHEN job_spell_time = 'during' THEN total_earnings ELSE 0 END) tot_earn_during,
sum(CASE WHEN job_spell_time = 'after' THEN total_earnings ELSE 0 END) tot_earn_after,
avg(CASE WHEN job_spell_time = 'before' THEN total_earnings ELSE 0 END) avg_earn_before,
avg(CASE WHEN job_spell_time = 'during' THEN total_earnings ELSE 0 END) avg_earn_during,
avg(CASE WHEN job_spell_time = 'after' THEN total_earnings ELSE 0 END) avg_earn_after,
sum(CASE WHEN job_spell_time = 'before'
AND total_earnings > (b.minimum_wage*35*13) THEN 1
ELSE 0 END) qtr_full_empl_before,
sum(CASE WHEN job_spell_time = 'during'
AND total_earnings > (b.minimum_wage*35*13) THEN 1
ELSE 0 END) qtr_full_empl_during,
sum(CASE WHEN job_spell_time = 'after'
AND total_earnings > (b.minimum_wage*35*13) THEN 1
ELSE 0 END) qtr_full_empl_after
FROM job_earnings_summary_20080101 a
JOIN il_minimum_wage_by_year b
ON a.year = b.year
GROUP BY recptno;
COMMIT;
-- create the employment feature table
CREATE TEMP TABLE features_employment_20080101 AS
SELECT a.recptno,
CASE WHEN b.recptno IS NOT NULL THEN tot_earn_before ELSE 0 END AS tot_earn_before,
CASE WHEN b.recptno IS NOT NULL THEN tot_earn_during ELSE 0 END AS tot_earn_during,
CASE WHEN b.recptno IS NOT NULL THEN tot_earn_after ELSE 0 END AS tot_earn_after,
CASE WHEN b.recptno IS NOT NULL THEN avg_earn_before ELSE 0 END AS avg_earn_before,
CASE WHEN b.recptno IS NOT NULL THEN avg_earn_during ELSE 0 END AS avg_earn_during,
CASE WHEN b.recptno IS NOT NULL THEN avg_earn_after ELSE 0 END AS avg_earn_after,
CASE WHEN b.recptno IS NOT NULL THEN qtr_full_empl_before ELSE 0
END AS qtr_full_empl_before,
CASE WHEN b.recptno IS NOT NULL THEN qtr_full_empl_during ELSE 0
END AS qtr_full_empl_during,
CASE WHEN b.recptno IS NOT NULL THEN qtr_full_empl_after ELSE 0
END AS qtr_full_empl_after
FROM {schema}.labels_20080101 a
LEFT JOIN employ_summary_20080101 b
ON a.recptno = b.recptno;
COMMIT;
'''.format(schema=myschema)
cursor.execute(sql)
print('features created in {:.2f} seconds'.format(time.time()-start_time))
df = pd.read_sql('SELECT * FROM features_employment_20080101', conn)
df.head()
df.shape
Now we'll create a function to do all the above employment steps for a given input (ie "prediction") date.
def employment_features(preddate,
qtrs_back = 12,
quarter_wage_hours=35*13,
schema=myschema,
db_name=db_name,
hostname=hostname,
overwrite=False):
#database connection
conn = psycopg2.connect(database=db_name, host = hostname)
cursor = conn.cursor()
# set table suffix based on prediction date
tbl_suffix = preddate.replace('-', '') #remove dashes
# check if the table already exists:
cursor.execute('''
SELECT * FROM information_schema.tables
WHERE table_name = 'features_employment_{tbl_suffix}'
AND table_schema = '{schema}';
'''.format(tbl_suffix=tbl_suffix, schema=schema))
# Let's write table if it does not exist (or if overwrite = True)
if not(cursor.rowcount) or overwrite:
print("Creating table")
# create "where_list" for quarters to pull from wage data
where_quarters = quarters_back(preddate, qtrs_back)
sql = '''
-- handle overwrite case
DROP TABLE IF EXISTS {schema}.features_employment_{tbl_suffix};
-- get caseid for cohort
CREATE TEMP TABLE cohort_caseid AS
SELECT a.*, b.ch_dpa_caseid
FROM {schema}.labels_{tbl_suffix} a
JOIN il_dhs.indcase_spells b
ON a.recptno = b.recptno
AND a.end_date = b.end_date
WHERE b.benefit_type = 'tanf46';
-- get cohort SSNs
CREATE TEMP TABLE cohort_ssn AS
SELECT a.*, b.ssn_hash AS ssn
FROM cohort_caseid a
JOIN il_dhs.member b
ON a.recptno = b.recptno
AND a.ch_dpa_caseid = b.ch_dpa_caseid;
commit;
-- get cohort's job history
CREATE TEMP TABLE job_history_cohort AS
SELECT *,
CASE WHEN empl_quarter <= (start_date - '3 months'::interval)
THEN 'before'
WHEN empl_quarter < end_date
AND empl_quarter > (start_date - '3 months'::interval)
THEN 'during'
ELSE 'after' END job_spell_time
FROM (
SELECT a.recptno, a.start_date, a.end_date, b.wage,
year, quarter, b.ein, b.seinunit, b.empr_no,
to_date(b.year::text||right('0'||((b.quarter-1)*3+1)::text,2)||'01', 'YYYYMMDD')
AS empl_quarter
FROM cohort_ssn a
JOIN il_des_kcmo.il_wage b
ON a.ssn = b.ssn
WHERE ({where_list})
) q;
commit;
-- summarize earnings for each individual by quarter
-- note we can keep year for our next query, too
CREATE TEMP TABLE job_earnings_summary AS
SELECT recptno, job_spell_time, empl_quarter, year, sum(wage) total_earnings
FROM job_history_cohort
GROUP BY recptno, job_spell_time, year, empl_quarter;
COMMIT;
-- create earnings features for each individual
-- that we found in the wage record data
CREATE TEMP TABLE employ_summary AS
SELECT recptno,
sum(CASE WHEN job_spell_time = 'before' THEN total_earnings ELSE 0 END) tot_earn_before,
sum(CASE WHEN job_spell_time = 'during' THEN total_earnings ELSE 0 END) tot_earn_during,
sum(CASE WHEN job_spell_time = 'after' THEN total_earnings ELSE 0 END) tot_earn_after,
avg(CASE WHEN job_spell_time = 'before' THEN total_earnings ELSE 0 END) avg_earn_before,
avg(CASE WHEN job_spell_time = 'during' THEN total_earnings ELSE 0 END) avg_earn_during,
avg(CASE WHEN job_spell_time = 'after' THEN total_earnings ELSE 0 END) avg_earn_after,
sum(CASE WHEN job_spell_time = 'before'
AND total_earnings > (b.minimum_wage*{qtr_hours}) THEN 1
ELSE 0 END) qtr_full_empl_before,
sum(CASE WHEN job_spell_time = 'during'
AND total_earnings > (b.minimum_wage*{qtr_hours}) THEN 1
ELSE 0 END) qtr_full_empl_during,
sum(CASE WHEN job_spell_time = 'after'
AND total_earnings > (b.minimum_wage*{qtr_hours}) THEN 1
ELSE 0 END) qtr_full_empl_after
FROM job_earnings_summary a
JOIN il_minimum_wage_by_year b
ON a.year = b.year
GROUP BY recptno;
COMMIT;
-- create the employment feature table
CREATE TABLE {schema}.features_employment_{tbl_suffix} AS
SELECT a.recptno,
CASE WHEN b.recptno IS NOT NULL THEN tot_earn_before ELSE 0 END AS tot_earn_before,
CASE WHEN b.recptno IS NOT NULL THEN tot_earn_during ELSE 0 END AS tot_earn_during,
CASE WHEN b.recptno IS NOT NULL THEN tot_earn_after ELSE 0 END AS tot_earn_after,
CASE WHEN b.recptno IS NOT NULL THEN avg_earn_before ELSE 0 END AS avg_earn_before,
CASE WHEN b.recptno IS NOT NULL THEN avg_earn_during ELSE 0 END AS avg_earn_during,
CASE WHEN b.recptno IS NOT NULL THEN avg_earn_after ELSE 0 END AS avg_earn_after,
CASE WHEN b.recptno IS NOT NULL THEN qtr_full_empl_before ELSE 0
END AS qtr_full_empl_before,
CASE WHEN b.recptno IS NOT NULL THEN qtr_full_empl_during ELSE 0
END AS qtr_full_empl_during,
CASE WHEN b.recptno IS NOT NULL THEN qtr_full_empl_after ELSE 0
END AS qtr_full_empl_after
FROM {schema}.labels_{tbl_suffix} a
LEFT JOIN employ_summary b
ON a.recptno = b.recptno;
COMMIT;
-- set owner to schema's admin group:
ALTER TABLE {schema}.features_employment_{tbl_suffix} OWNER TO {schema}_admin;
COMMIT;
'''.format(tbl_suffix=tbl_suffix, where_list=where_quarters,
qtr_hours=quarter_wage_hours, schema=schema)
# print(sql) # to debug
cursor.execute(sql)
else:
print("Table already exists")
cursor.close()
sql = '''
SELECT * FROM {schema}.features_employment_{tbl_suffix};
'''.format(tbl_suffix=tbl_suffix, schema=schema)
df = pd.read_sql(sql, conn)
return df
# time our function
start_time = time.time()
# calculate for 2008-01-01 prediction date with default values
df = employment_features('2008-01-01')
print('features created in {:.2f} seconds'.format(time.time()-start_time))
# how many of our cohort had full employment for how many quarters before their TANF spell?
df['qtr_full_empl_before'].value_counts()
df.head()
# time our function
start_time = time.time()
# calculate for 2008-01-01 prediction date with default values
df = employment_features('2009-01-01')
print('features created in {:.2f} seconds'.format(time.time()-start_time))
df['qtr_full_empl_before'].value_counts()
# time our function
start_time = time.time()
# calculate for 2012-10-01 prediction date
df = employment_features('2010-01-01')
print('features created in {:.2f} seconds'.format(time.time()-start_time))
# how many of our cohort had full employment for how many quarters during their TANF spell?
df['qtr_full_empl_during'].value_counts()
The IDHS data from Chapin Hall has a number of "time-invariant" tables which include information about the individuals on various social benefit programs (see the IDHS data documentation for details)
Here we will collect and clean some information about the individuals in our cohort(s).
conn = psycopg2.connect(database=db_name, host = hostname)
cursor = conn.cursor()
# from the documentation:
# education levels
member_info_edlevel = {'A':'None', 'B': 'Some Elementary',
'C': 'Completed 7th grade', 'D':'Completed 8th grade',
'E':'Completed 9th or 10th grade', 'F':'Completed 11th grade',
'G':'Completed GED', 'H':'High School Diploma',
'V':'Post Secondary vocational training', 'W':'One year of college',
'X':'Two Years of College', 'Y':'Three years of college',
'Z':"College graduate (bachelor's degree)",
'P':'Post graduate college degree', '1':'None',
'2':'Some Elementary Grades', '3':'All Elementary Grades',
'4':'Some High School Grades', '5':'All High School Grades',
'6':'Some College', '7':'All College'}
# marital status
member_info_martlst = {1: 'Never married', 2: 'Married - living with spouse',
3: 'Married - spouse in nursing home, etc.',
4: 'Married - spouse deserted',
5: 'Married - legally separated',
6: 'Married - other, including incarceration',
7: 'Divorced', 8: 'Widowed'}
# get caseid
sql = """
CREATE TEMP TABLE cohort_caseid AS
SELECT a.*, b.ch_dpa_caseid
FROM {schema}.labels_20080101 a
JOIN il_dhs.indcase_spells b
ON a.recptno = b.recptno
AND a.end_date = b.end_date
WHERE b.benefit_type = 'tanf46';
""".format(schema=myschema)
cursor.execute(sql)
# get member_info characateristics
sql = """
CREATE TEMP TABLE cohort_member_info AS
SELECT b.*, edlevel, martlst
FROM il_dhs.member_info a
JOIN cohort_caseid b
ON a.recptno = b.recptno
AND a.ch_dpa_caseid = b.ch_dpa_caseid
"""
cursor.execute(sql)
df = pd.read_sql("SELECT * FROM cohort_member_info", conn)
df.head()
df['edlevel'].value_counts()
# but the codes aren't that great, let's use the descriptions instead
df['edlevel'].value_counts(normalize=True).rename(index=member_info_edlevel)
df['martlst'].value_counts(normalize=True).rename(index=member_info_martlst)
def member_info_features(preddate,
schema=myschema,
db_name=db_name,
hostname=hostname,
overwrite=False):
#database connection
conn = psycopg2.connect(database=db_name, host = hostname)
cursor = conn.cursor()
# set table suffix based on prediction date
tbl_suffix = preddate.replace('-', '') #remove dashes
# check if the table already exists:
cursor.execute('''
SELECT * FROM information_schema.tables
WHERE table_name = 'features_member_info_{tbl_suffix}'
AND table_schema = '{schema}';
'''.format(tbl_suffix=tbl_suffix, schema=schema))
# Let's write table if it does not exist (or if overwrite = True)
if not(cursor.rowcount) or overwrite:
print("Creating table")
sql = '''
-- handle overwrite case
DROP TABLE IF EXISTS {schema}.features_member_info_{tbl_suffix};
-- get caseid for cohort
CREATE TEMP TABLE cohort_caseid AS
SELECT a.*, b.ch_dpa_caseid
FROM {schema}.labels_{tbl_suffix} a
JOIN il_dhs.indcase_spells b
ON a.recptno = b.recptno
AND a.end_date = b.end_date
WHERE b.benefit_type = 'tanf46';
commit;
-- get member_info variables of interest
CREATE TEMP TABLE cohort_member_info AS
SELECT b.*, edlevel, martlst
FROM il_dhs.member_info a
JOIN cohort_caseid b
ON a.recptno = b.recptno
AND a.ch_dpa_caseid = b.ch_dpa_caseid;
commit;
-- classify values and attach to our cohort
CREATE TABLE {schema}.features_member_info_{tbl_suffix} AS
SELECT a.recptno,
CASE WHEN b.edlevel IN ('1', 'A') THEN 'ed_none'
WHEN b.edlevel IN ('2', '3', 'B', 'C', 'D') THEN 'ed_no_hs'
WHEN b.edlevel IN ('4', 'E', 'F') THEN 'ed_some_hs'
WHEN b.edlevel IN ('5', 'G', 'H') THEN 'ed_hs_ged'
WHEN b.edlevel IN ('6', 'V', 'W', 'X', 'Y') THEN 'ed_some_coll'
WHEN b.edlevel IN ('7', 'P', 'Z') THEN 'ed_comp_coll'
ELSE 'ed_unknown'
END AS ed_level,
CASE WHEN b.martlst = 1 THEN 'martlst_never'
WHEN b.martlst IN (2, 3) THEN 'martlst_married'
WHEN b.martlst IN (3, 4, 5, 6, 7, 8) THEN 'martlst_separated'
ELSE 'martlst_unknown'
END AS martl_status
FROM {schema}.labels_{tbl_suffix} a
LEFT JOIN cohort_member_info b
ON a.recptno = b.recptno;
commit;
-- set owner to schema's admin group:
ALTER TABLE {schema}.features_member_info_{tbl_suffix} OWNER TO {schema}_admin;
COMMIT;
'''.format(tbl_suffix=tbl_suffix, schema=schema)
# print(sql) # to debug
cursor.execute(sql)
else:
print("Table already exists")
cursor.close()
sql = '''
SELECT * FROM {schema}.features_member_info_{tbl_suffix};
'''.format(tbl_suffix=tbl_suffix, schema=schema)
df = pd.read_sql(sql, conn)
return df
start_time = time.time()
df = member_info_features('2008-01-01')
print('features created in {:.2f} seconds'.format(time.time()-start_time))
df.info()
df[['ed_level', 'martl_status']].describe()
start_time = time.time()
df = member_info_features('2009-01-01')
print('features created in {:.2f} seconds'.format(time.time()-start_time))
df['ed_level'].value_counts()
start_time = time.time()
df = member_info_features('2010-01-01')
print('features created in {:.2f} seconds'.format(time.time()-start_time))
df['martl_status'].value_counts()
Social benefit programs are often administered on a case by case basis, and there is associated information with each case. The TANF data we are using in this program has gone through some processing (by Chapin Hall at UChicago), and as you have seen we are creating our observations (rows) from the ind_spells
table. These individual spells are often groupings of multiple cases.
In this section we will create features based on the underlying cases which make up each spell.
conn = psycopg2.connect(database=db_name, host = hostname)
cursor = conn.cursor()
# the number of cases this individual spell represents is fairly easy
# using just the two `*_spells` tables:
sql = """
CREATE TEMP TABLE cohort_case_summary AS
SELECT a.recptno, count(b.*) case_count,
avg(extract(epoch from age(b.end_date, b.start_date))/(3600.*24)) avg_len_days
FROM {schema}.labels_20080101 a
JOIN il_dhs.indcase_spells b
ON a.recptno = b.recptno
AND a.end_date <= b.end_date
AND a.start_date >= b.start_date
WHERE b.benefit_type = 'tanf46'
GROUP BY a.recptno
""".format(schema=myschema)
cursor.execute(sql)
df = pd.read_sql('SELECT * FROM cohort_case_summary', conn)
df.describe()
# get caseid
sql = """
CREATE TEMP TABLE cohort_caseid AS
SELECT a.*, b.ch_dpa_caseid
FROM {schema}.labels_20080101 a
JOIN il_dhs.indcase_spells b
ON a.recptno = b.recptno
AND a.end_date = b.end_date
WHERE b.benefit_type = 'tanf46';
""".format(schema=myschema)
cursor.execute(sql)
# for the most recent case, let's get
# 1) the County or district code of the Public Assistance Office
# 2) the homelessnes flag
sql = """
CREATE TEMP TABLE cohort_case_assist AS
SELECT a.recptno, b.district, b.homeless
FROM cohort_caseid a
JOIN il_dhs.assistance_case b
ON a.ch_dpa_caseid = b.ch_dpa_caseid;
"""
cursor.execute(sql)
df = pd.read_sql('SELECT * FROM cohort_case_assist', conn)
df['district'].value_counts()
From the documentation: "district" is defined as "County or district code of recipient's Public Aid office. 10-115=Downstate County Codes, 200-294=Cook County district codes"
# so if we were to group our cohort by county, how many couties are represented:
df[(10 <= df['district']) & (df['district'] <= 115)]['district'].nunique() + 1
def case_info_features(preddate,
schema=myschema,
db_name=db_name,
hostname=hostname,
overwrite=False):
#database connection
conn = psycopg2.connect(database=db_name, host = hostname)
cursor = conn.cursor()
# set table suffix based on prediction date
tbl_suffix = preddate.replace('-', '') #remove dashes
# check if the table already exists:
cursor.execute('''
SELECT * FROM information_schema.tables
WHERE table_name = 'features_case_info_{tbl_suffix}'
AND table_schema = '{schema}';
'''.format(tbl_suffix=tbl_suffix, schema=schema))
# Let's write table if it does not exist (or if overwrite = True)
if not(cursor.rowcount) or overwrite:
print("Creating table")
sql = '''
-- handle the overwrite case
DROP TABLE IF EXISTS {schema}.features_case_info_{tbl_suffix};
commit;
-- produce the case summary temp table
CREATE TEMP TABLE cohort_case_summary AS
SELECT a.recptno, count(b.*) case_count,
avg(extract(epoch from age(b.end_date, b.start_date))/(3600.*24)) avg_len_days
FROM {schema}.labels_{tbl_suffix} a
JOIN il_dhs.indcase_spells b
ON a.recptno = b.recptno
AND a.end_date <= b.end_date
AND a.start_date >= b.start_date
WHERE b.benefit_type = 'tanf46'
GROUP BY a.recptno;
commit;
-- get the cohort's caseid
CREATE TEMP TABLE cohort_caseid AS
SELECT a.*, b.ch_dpa_caseid
FROM {schema}.labels_{tbl_suffix} a
JOIN il_dhs.indcase_spells b
ON a.recptno = b.recptno
AND a.end_date = b.end_date
WHERE b.benefit_type = 'tanf46';
-- get the district and homeless codes
CREATE TEMP TABLE cohort_case_assist AS
SELECT a.recptno, b.district, b.homeless
FROM cohort_caseid a
JOIN il_dhs.assistance_case b
ON a.ch_dpa_caseid = b.ch_dpa_caseid;
commit;
-- create our cohort's feature table
CREATE TABLE {schema}.features_case_info_{tbl_suffix} AS
SELECT a.recptno,
CASE WHEN b.case_count IS NULL THEN 0 ELSE b.case_count END AS case_count,
CASE WHEN b.avg_len_days IS NULL THEN 0 ELSE b.avg_len_days END AS avg_len_days,
c.district, c.homeless
FROM {schema}.labels_{tbl_suffix} a
LEFT JOIN cohort_case_summary b
ON a.recptno = b.recptno
LEFT JOIN cohort_case_assist c
ON a.recptno = c.recptno;
commit;
-- set owner to schema's admin group:
ALTER TABLE {schema}.features_case_info_{tbl_suffix} OWNER TO {schema}_admin;
COMMIT;
'''.format(tbl_suffix=tbl_suffix, schema=schema)
# print(sql) # to debug
cursor.execute(sql)
else:
print("Table already exists")
cursor.close()
sql = '''
SELECT * FROM {schema}.features_case_info_{tbl_suffix};
'''.format(tbl_suffix=tbl_suffix, schema=schema)
df = pd.read_sql(sql, conn)
return df
start_time = time.time()
df = case_info_features('2008-01-01')
print('features created in {:.2f} seconds'.format(time.time()-start_time))
start_time = time.time()
df = case_info_features('2009-01-01')
print('features created in {:.2f} seconds'.format(time.time()-start_time))
start_time = time.time()
df = case_info_features('2010-01-01')
print('features created in {:.2f} seconds'.format(time.time()-start_time))
It is never a good idea to drop observations without prior investigation AND a good reason to believe the data is wrong!
There are many ways of imputing missing values based on the rest of the data. Missing values can be imputed to median of the rest of the data, or you can use other characteristics (eg industry, geography, etc.).
For our data, we have made an assumption about what "missing" means for each of our data's components (eg if the individual does not show up in the IDES data we say they do not have a job in that time period).