#!/usr/bin/env python # coding: utf-8 # # Gist for PyMC # # ## Setup # # - Experimental Data for > 100 Participants # - "Measurement Phase" where I observe performance as a function of $x$ # - "Optimization Phase" where each Participant has to give a *single* estimate what $x$ would maximize some payoff function (with an explicit cost term) # - Goal: Find different types of models which best explain performance + optimization choices by participants # In[2]: import pymc as pm import numpy as np # Parmaters of two subjects beta_params = [0.9,0.8] lambda_params = [1.5,0.6] delta_params = [1.5,0.6] # Function def performance(beta_param, lambda_param,delta_param): p = beta_param * (1 - np.exp(-(x_init - delta_param) / lambda_param)) return np.random.binomial(1, p)#, p # Generate input data x_init = np.random.choice([2,3,4,5,6], 50) correct1 = performance(beta_params[0], lambda_params[0], delta_params[0]) correct2 = performance(beta_params[1], lambda_params[1], delta_params[1]) codes = np.repeat([0,1], 50) correct_init = np.concatenate([correct1, correct2]) x = np.concatenate([x_init, x_init]) # Implemented Optimization Value for x x_optim = [4.2,3.1] codes_optim = [0,1] # In[9]: # print shapes print(f"Shape of x: {x.shape}") print(f"Shape of codes: {codes.shape}") print(f"Shape of correct_init: {correct_init.shape}") # In[3]: import pytensor as pt def index_fun(argmax_tensor): ''' Returns the corresponding input value of the index of the argmax value''' x = pt.shared(np.linspace(0, 15,1000)) return x[argmax_tensor] def argmax_fun(beta_param, lambda_param, delta_param): x_init = np.array(np.linspace(0, 15, 1000)) x = np.tile(x_init, (2,1)).swapaxes(0,1)# Get correct shape for 2 participants x = pt.shared(x) # Convert to pytensor COST = np.array([10]) # Cost for optimization probability_simul = beta_param * (1 - np.exp(-(x - delta_param) / lambda_param)) probability_simul_low = probability_simul * (150-(COST[0]*x)) maximum_low = pt.tensor.argmax(probability_simul_low,axis=0) return index_fun(maximum_low) # In[4]: coords = { "participants": ["0","1"], "participant_idxs": codes, "obs_id": np.arange(len(codes)), "obs_id_optim": np.arange(len(codes_optim)), } with pm.Model(coords=coords) as model: participant_idx = pm.Data("participant_idx", codes, dims="obs_id") participant_idx_optim = pm.Data("participant_idx_optim", codes_optim, dims="obs_id_optim") # Three parameters of Performance Function lambda_param = pm.TruncatedNormal("lambda_param", mu=0, sigma=1, dims="participants", lower=0) beta_param = pm.TruncatedNormal("beta_param", mu=0, sigma=1, dims="participants", lower=0, upper=1) delta_param = pm.TruncatedNormal("delta_param", mu=0, sigma=1, dims="participants", lower=0) # Deviation Parameter (One for every participant) sigma = pm.HalfNormal("simga_plan", sigma=1) # Performance Function probability = beta_param[participant_idx] * (1 - pm.math.exp(-(x - delta_param[participant_idx]) / lambda_param[participant_idx])) # Sample from Performance Function y = pm.Bernoulli('y', p=probability, observed=correct_init) sampled_optim = argmax_fun(beta_param, lambda_param, delta_param) pm.Deterministic('sampled_optim', sampled_optim, dims="participants") implemented_optim = pm.Normal('implemented_optim', mu=sampled_optim[participant_idx_optim], sigma=sigma, observed=x_optim) trace = pm.sample(1000, tune=1000, cores=4, chains=4, init="adapt_diag", return_inferencedata=True) # In[5]: pm.summary(trace) # In[ ]: