#!/usr/bin/env python # coding: utf-8 # Run an EasyVVUQ campaign to analyze the sensitivity for the Ishigami function with noise # # This is done with PCE providing a normal distributed noise value # In[1]: # Run an EasyVVUQ campaign to analyze the sensitivity for the Ishigami function with noise # This is done with PCE providing a normal distributed noise value. import os import easyvvuq as uq import chaospy as cp import pickle import numpy as np import matplotlib.pylab as plt import time import pandas as pd # In[2]: # Define the Ishigami function def ishigamiSA(a,b): '''Exact sensitivity indices of the Ishigami function for given a and b. From https://openturns.github.io/openturns/master/examples/meta_modeling/chaos_ishigami.html ''' var = 1.0/2 + a**2/8 + b*np.pi**4/5 + b**2*np.pi**8/18 S1 = (1.0/2 + b*np.pi**4/5+b**2*np.pi**8/50)/var S2 = (a**2/8)/var S3 = 0 S13 = b**2*np.pi**8/2*(1.0/9-1.0/25)/var exact = { 'expectation' : a/2, 'variance' : var, 'S1' : (1.0/2 + b*np.pi**4/5+b**2*np.pi**8.0/50)/var, 'S2' : (a**2/8)/var, 'S3' : 0, 'S12' : 0, 'S23' : 0, 'S13' : S13, 'S123' : 0, 'ST1' : S1 + S13, 'ST2' : S2, 'ST3' : S3 + S13 } return exact Ishigami_a = 7.0 Ishigami_b = 0.1 exact = ishigamiSA(Ishigami_a, Ishigami_b) # In[3]: # define a model to run the Ishigami code directly from python, expecting a dictionary and returning a dictionary def run_ishigami_model(input): import Ishigami qois = ["Ishigami"] del input['out_file'] N = input['N'] del input['N'] return {qois[0]: Ishigami.evaluate(**input)+N} # In[4]: # Define parameter space def define_params(): return { "x1": {"type": "float", "min": -np.pi, "max": np.pi, "default": 0.0}, "x2": {"type": "float", "min": -np.pi, "max": np.pi, "default": 0.0}, "x3": {"type": "float", "min": -np.pi, "max": np.pi, "default": 0.0}, "a": {"type": "float", "min": Ishigami_a, "max": Ishigami_a, "default": Ishigami_a}, "b": {"type": "float", "min": Ishigami_b, "max": Ishigami_b, "default": Ishigami_b}, "N": {"type": "float", "min": -100.0, "max": 100.0, "default": 0.0}, "out_file": {"type": "string", "default": "output.csv"} } # In[5]: # Define varying space def define_vary(): return { "x1": cp.Uniform(-np.pi, np.pi), "x2": cp.Uniform(-np.pi, np.pi), "x3": cp.Uniform(-np.pi, np.pi), "N": cp.Normal(0, 10.0) } # In[6]: # Set up and run a campaign def run_campaign(pce_order=2, use_files=False): times = np.zeros(7) time_start = time.time() time_start_whole = time_start # Set up a fresh campaign called "Ishigami_pce." my_campaign = uq.Campaign(name='Ishigami_pce.') # Create an encoder and decoder for PCE test app if use_files: encoder = uq.encoders.GenericEncoder(template_fname='Ishigami.template', delimiter='$', target_filename='Ishigami_in.json') decoder = uq.decoders.SimpleCSV(target_filename="output.csv", output_columns=["Ishigami"]) execute = uq.actions.ExecuteLocal('python3 %s/Ishigami.py Ishigami_in.json' % (os.getcwd())) actions = uq.actions.Actions(uq.actions.CreateRunDirectory('/tmp'), uq.actions.Encode(encoder), execute, uq.actions.Decode(decoder)) else: actions = uq.actions.Actions(uq.actions.ExecutePython(run_ishigami_model)) # Add the app (automatically set as current app) my_campaign.add_app(name="Ishigami", params=define_params(), actions=actions) # Create the sampler time_end = time.time() times[1] = time_end-time_start print('Time for phase 1 = %.3f' % (times[1])) time_start = time.time() # Associate a sampler with the campaign Sampler_PCE = uq.sampling.PCESampler(vary=define_vary(), polynomial_order=pce_order) my_campaign.set_sampler(Sampler_PCE) # Will draw all (of the finite set of samples) my_campaign.draw_samples() print('PCE order = %s' % pce_order) print('Number of samples = %s' % my_campaign.get_active_sampler().count) time_end = time.time() times[2] = time_end-time_start print('Time for phase 2 = %.3f' % (times[2])) time_start = time.time() # Run the cases my_campaign.execute(sequential=True).collate(progress_bar=True) time_end = time.time() times[3] = time_end-time_start print('Time for phase 3 = %.3f' % (times[3])) time_start = time.time() # Get the results results_df = my_campaign.get_collation_result() time_end = time.time() times[4] = time_end-time_start print('Time for phase 4 = %.3f' % (times[4])) time_start = time.time() # Post-processing analysis results = my_campaign.analyse(qoi_cols=["Ishigami"]) time_end = time.time() times[5] = time_end-time_start print('Time for phase 5 = %.3f' % (times[5])) time_start = time.time() # Save the results pickle.dump(results, open('Ishigami_results.pickle','bw')) time_end = time.time() times[6] = time_end-time_start print('Time for phase 6 = %.3f' % (times[6])) times[0] = time_end - time_start_whole return results_df, results, times, pce_order, my_campaign.get_active_sampler().count # In[7]: # Calculate the polynomial chaos expansion for a range of orders R = {} for pce_order in range(1, 11): R[pce_order] = {} (R[pce_order]['results_df'], R[pce_order]['results'], R[pce_order]['times'], R[pce_order]['order'], R[pce_order]['number_of_samples']) = run_campaign(pce_order=pce_order, use_files=False) # In[8]: # save the results pickle.dump(R, open('collected_results.pickle','bw')) # In[9]: # produce a table of the time taken for various phases # the phases are: # 1: creation of campaign # 2: creation of samples # 3: running the cases # 4: calculation of statistics including Sobols # 5: returning of analysed results # 6: saving campaign and pickled results Timings = pd.DataFrame(np.array([R[r]['times'] for r in list(R.keys())]), columns=['Total', 'Phase 1', 'Phase 2', 'Phase 3', 'Phase 4', 'Phase 5', 'Phase 6'], index=[R[r]['order'] for r in list(R.keys())]) Timings.to_csv(open('Timings.csv', 'w')) display(Timings) # In[10]: R[10]['results'].plot_sobols_treemap('Ishigami') # In[11]: R[10]['results'].sobols_first(), R[10]['results'].sobols_total() # In[12]: # plot the convergence of the mean and standard deviation to the analytic result mean_analytic = exact['expectation'] std_analytic = np.sqrt(exact['variance']) O = [R[r]['order'] for r in list(R.keys())] plt.figure() plt.semilogy([o for o in O], [np.abs(R[o]['results'].describe('Ishigami', 'mean') - mean_analytic) for o in O], 'o-', label='mean') plt.semilogy([o for o in O], [np.abs(R[o]['results'].describe('Ishigami', 'std') - std_analytic) for o in O], 'o-', label='std') plt.xlabel('PCE order') plt.ylabel('RMSerror compared to analytic') plt.legend(loc=0) plt.savefig('Convergence_mean_std.png') plt.savefig('Convergence_mean_std.pdf') # In[13]: # plot the first Sobols as a function of PCE order O = [R[r]['order'] for r in list(R.keys())] plt.figure() for v in list(R[O[0]]['results'].sobols_first('Ishigami').keys()): plt.semilogy([o for o in O], [R[o]['results'].sobols_first('Ishigami')[v] for o in O], 'o-', label=v) plt.xlabel('PCE order') plt.ylabel('1st sobol') plt.ylim(1e-2,10) plt.legend(loc=0) plt.savefig('Convergence_sobol_first.png') plt.savefig('Convergence_sobol_first.pdf') # In[14]: # plot the total Sobols as a function of PCE order O = [R[r]['order'] for r in list(R.keys())] plt.figure() for v in list(R[O[0]]['results'].sobols_total('Ishigami').keys()): plt.semilogy([o for o in O], [R[o]['results'].sobols_total('Ishigami')[v] for o in O], 'o-', label=v) plt.xlabel('PCE order') plt.ylabel('total sobol') plt.ylim(1e-2,10) plt.legend(loc=0) plt.savefig('Convergence_sobol_total.png') plt.savefig('Convergence_sobol_total.pdf') # In[15]: # plot the distribution function results_df = R[O[-1]]['results_df'] results = R[O[-1]]['results'] Ishigami_dist = results.raw_data['output_distributions']['Ishigami'] plt.figure() plt.hist(results_df.Ishigami[0], density=True, bins=50, label='histogram of raw samples', alpha=0.25) if hasattr(Ishigami_dist, 'samples'): plt.hist(Ishigami_dist.samples[0], density=True, bins=50, label='histogram of kde samples', alpha=0.25) t1 = Ishigami_dist[0] plt.plot(np.linspace(t1.lower, t1.upper), t1.pdf(np.linspace(t1.lower, t1.upper)), label='PDF') plt.legend(loc=0) plt.xlabel('Ishigami') plt.savefig('Ishigami_distribution_function.png') # In[21]: results_df.Ishigami[0] # In[17]: # plot the RMS surrogate error at the PCE sample points _o = [] _RMS = [] for r in R.values(): results_df = r['results_df'] results = r['results'] Ishigami_surrogate = np.squeeze(np.array(results.surrogate()(results_df[results.inputs])['Ishigami'])) Ishigami_samples = np.squeeze(np.array(results_df['Ishigami'])) _RMS.append((np.sqrt((((Ishigami_surrogate - Ishigami_samples))**2).mean()))) _o.append(r['order']) plt.figure() plt.semilogy(_o, _RMS, 'o-') plt.xlabel('PCE order') plt.ylabel('RMS error for the PCE surrogate') plt.legend(loc=0) plt.savefig('Convergence_surrogate.png') plt.savefig('Convergence_surrogate.pdf') # In[18]: # prepare the test data test_campaign = uq.Campaign(name='Ishigami.') test_campaign.add_app(name="Ishigami", params=define_params(), actions=uq.actions.Actions(uq.actions.ExecutePython(run_ishigami_model))) test_campaign.set_sampler(uq.sampling.quasirandom.LHCSampler(vary=define_vary(), count=100)) test_campaign.execute(nsamples=1000, sequential=True).collate(progress_bar=True) test_df = test_campaign.get_collation_result() # In[19]: # calculate the PCE surrogates test_points = test_df[test_campaign.get_active_sampler().vary.get_keys()] test_results = np.squeeze(test_df['Ishigami'].values) test_predictions = {} for i in list(R.keys()): test_predictions[i] = np.squeeze(np.array(R[i]['results'].surrogate()(test_points)['Ishigami'])) # In[20]: # plot the convergence of the surrogate _o = [] _RMS = [] for r in R.values(): _RMS.append((np.sqrt((((test_predictions[r['order']] - test_results))**2).mean()))) _o.append(r['order']) plt.figure() plt.semilogy(_o, _RMS, 'o-') plt.xlabel('PCE order') plt.ylabel('RMS error for the PCE surrogate') plt.legend(loc=0) plt.savefig('Convergence_PCE_surrogate.png') plt.savefig('Convergence_PCE_surrogate.pdf') # In[ ]: