import numpy as np
import torch
from torch.autograd import Variable
def _log_poisson_impl(n, lam):
normal = torch.distributions.Normal(lam, torch.sqrt(lam))
return normal.log_prob(n)
class shapesys_constraint:
def __init__(self, nom_data, mod_data):
self.auxdata = []
self.bkg_over_db_squared = []
for b, deltab in zip(nom_data, mod_data):
bkg_over_bsq = b * b / deltab / deltab # tau*b
self.bkg_over_db_squared.append(bkg_over_bsq)
self.auxdata.append(bkg_over_bsq)
def alphas(self, pars):
return np.product([pars, self.bkg_over_db_squared], axis=0)
def logpdf(self, a, alpha):
return _log_poisson_impl(a, alpha)
def expected_data(self, pars):
return self.alphas(pars)
class Model:
def __init__(self, spec):
self.samples = {}
self.samples['sig'] = spec['sig']
self.samples['bkg'] = spec['bkg']
self.constraint = shapesys_constraint(spec['bkg'], spec['bkgerr'])
def expected_actualdata(self, pars):
return [
pars[0] * s + pars[1] * b
for s, b in zip(self.samples['sig'], self.samples['bkg'])
]
def expected_auxdata(self, pars):
modparslice = slice(1, 2)
return self.constraint.expected_data(pars[modparslice])
def expected_data(self, pars, include_auxdata=True):
expected_actual = self.expected_actualdata(pars)
if not include_auxdata:
return expected_actual
expected_constraints = self.expected_auxdata(pars)
return np.concatenate([expected_actual, expected_constraints])
def logpdf(self, data, pars):
actual_data, auxdata = [data[0]], [data[1]]
lambdas_data = self.expected_actualdata(pars)
sum = 0
for d, lam in zip(actual_data, lambdas_data):
sum = sum + _log_poisson_impl(d, lam)
modauxslice = slice(0, 1)
thisauxdata = auxdata[slice(0, 1)]
modparslice = slice(1, 2)
modalphas = pdf.constraint.alphas(pars[modparslice])
for a, alpha in zip(thisauxdata, modalphas):
sum = sum + self.constraint.logpdf(a, alpha)
return sum
def loglambdav(pars, data, pdf):
return -2 * pdf.logpdf(data, pars)
def optimize(
objective,
init_pars,
par_bounds,
trf_to_unbounded=lambda x: x,
trf_from_unbounded=lambda x: x,
args=None,
constrain_index_value=None,
):
data = args[0]
poi_index = None
constrained_var = None
if constrain_index_value is not None:
poi_index, poi_value = constrain_index_value
upto = init_pars[:poi_index]
fromon = init_pars[poi_index + 1 :]
nuis_init = np.concatenate([upto, fromon])
init_pars = nuis_init
constrained_var = Variable(torch.Tensor([poi_value]), requires_grad=True)
init_pars = trf_to_unbounded(init_pars)
# print('init from: %s' % init_pars)
parameters = Variable(torch.Tensor(init_pars), requires_grad=True)
data = Variable(torch.Tensor(data))
optimizer = torch.optim.Adam([parameters])
def assemble_from_nuis(nuisance_pars):
tocat = []
if poi_index > 0:
tocat.append(nuisance_pars[:poi_index])
tocat.append(constrained_var)
if poi_index < len(parameters):
tocat.append(nuisance_pars[poi_index:])
return torch.cat(tocat)
for i in range(1000):
objpars = None
if constrain_index_value is None:
objpars = parameters
else:
objpars = assemble_from_nuis(parameters)
loss = objective(objpars, data, args[1])
optimizer.zero_grad()
loss.backward()
optimizer.step()
# if i % 1000 == 0:
# print(trf_from_unbounded(parameters).data.numpy())
result = parameters
if constrain_index_value is not None:
result = assemble_from_nuis(result)
result = trf_from_unbounded(result).data.numpy()
# print('result %s' % result)
return result
def unconstrained_bestfit(data, pdf, init_pars, par_bounds):
# print 'fit', data, 'expected for init pars', np.array(pdf.expected_actualdata(init_pars))
result = optimize(
loglambdav,
init_pars,
par_bounds,
args=(
data,
pdf,
),
)
return result
def constrained_bestfit(constrained_mu, data, pdf, init_pars, par_bounds):
# print 'fit', data, 'for pars', np.array(pdf.expected_actualdata(init_pars))
result = optimize(
loglambdav,
init_pars,
par_bounds,
args=(
data,
pdf,
),
constrain_index_value=(0, constrained_mu),
)
return result