def calculate_atoms(
df,
weight=None,
input_var=None,
weight_var=None,
weight_prefix=None,
source_id=None,
groupby_cols=None,
overwrite_attrs=None,
):
"""Calculate the atoms (intersecting parts) of census geographies
and interpolate a proportional weight of the source attribute that
lies within the target geography.
Parameters
----------
df : pandas.DataFrame
The input data. See ``GeoCrossWalk.base``.
weight : str
The weight colum name(s).
input_var : str or iterable
The input variable column name(s).
weight_var : str or iterable
The groupby and summed variable column name(s).
weight_prefix : str
Prepend this prefix to the the ``weight_var`` column name.
source_id : str
The source ID column name.
groupby_cols : list
The dataframe columns on which to perform groupby.
overwrite_attrs : None or GeoCrossWalk
Setting this parameter to a ``GeoCrossWalk`` object overwrites the
``input_var`` and ``weight_var`` attributes. Default is ``None``.
Returns
-------
atoms : pandas.DataFrame
All intersections between ``source`` and ``target`` geographies, and
the interpolated weight calculations for the propotion of
source area attributes that are in the target area.
Notes
-----
See example 1 in the ``GeoCrossWalk`` Examples section.
"""
# confirm variable data types
input_var, weight_var = _check_vars(input_var), _check_vars(weight_var)
# determine length of variable lists
n_input_var, n_weight_var = len(input_var), len(weight_var)
# check variable lists are equal length
if n_input_var != n_weight_var:
msg = "The 'input_var' and 'weight_var' should be the same length. "
msg += "%s != %s" % (n_input_var, n_weight_var)
raise RuntimeError(msg)
# add prefix (if desired)
weight_col = _weight_columns(weight_prefix if weight_prefix else "", weight_var)
if str(overwrite_attrs) != "None":
overwrite_attrs.input_var = input_var
overwrite_attrs.weight_col = weight_col
# iterate over each pair of input/interpolation variables
for ix, (ivar, wvar) in enumerate(zip(input_var, weight_col)):
# calculate numerators
df[wvar] = df[weight] * df[ivar]
if ix == 0:
# on the first iteration create an atom dataframe
atoms = df.groupby(groupby_cols)[wvar].sum().to_frame()
atoms.reset_index(inplace=True)
else:
# on tsubsequent iterations add weights as a column
atoms[wvar] = df.groupby(groupby_cols)[wvar].sum().values
# calculate denominators
denominators = atoms.groupby(source_id)[wvar].sum()
# interpolate weights
atoms[wvar] = atoms[wvar] / atoms[source_id].map(denominators)
# if any weights are NaN, replace with 0.
atoms[wvar].fillna(0.0, inplace=True)
return atoms