#!/usr/bin/env python # coding: utf-8 # ### Perform clustered heat map analysis # Perform clustered heat map analysis using data for named metabolites from the metabolomics workbench or uploaded data files. #
Note: This notebook contains IPython widgets. Consequently, you won't be able to use Kernal/Restart & Restart command to automatically execute all cells in the notebook. You must use Run command individually to execute each cell and advance to the next cell.
# Import Python modules... # In[ ]: from __future__ import print_function import os import sys import time import re import requests import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns import ipywidgets as widgets from IPython.display import display, HTML from IPython import __version__ as ipyVersion # Import MW modules from the current directory or default Python directory... import MWUtil get_ipython().run_line_magic('matplotlib', 'inline') print("Python: %s.%s.%s" % sys.version_info[:3]) print("IPython: %s" % ipyVersion) print() print(time.asctime()) # The URL PATH # # The MW REST URL consists of three main parts, separated by forward slashes, after the common prefix specifying the invariant base URL (https://www.metabolomicsworkbench.org/rest/): # # https://www.metabolomicsworkbench.org/rest/context/input_specification/output_specification # # Part 1: The context determines the type of data to be accessed from the Metabolomics Workbench, such as metadata or results related to the submitted studies, data from metabolites, genes/proteins and analytical chemistry databases as well as other services related to mass spectrometry and metabolite identification: # # context = study | compound | refmet | gene | protein | moverz | exactmass # # Part 2: The input specification consists of two required parameters describing the REST request: # # input_specification = input_item/input_value # # Part 3: The output specification consists of two parameters describing the output generated by the REST request: # # output_specification = output_item/(output_format) # # The first parameter is required in most cases. The second parameter is optional. The input and output specifications are context sensitive. The context determines the values allowed for the remaining parameters in the input and output specifications as detailed in the sections below. # # Setup MW REST base URL... # In[ ]: MWBaseURL = "https://www.metabolomicsworkbench.org/rest" # **Retrieve or upload data for named metabolites...** # In[ ]: # Initialize data... StudiesResultsData = None RetrievedMWData = None # In[ ]: # Setup UIF info text... TopInfoTextHTML = widgets.HTML(value = "Retrieve or upload data and process any missing values", placeholder='', description='') # Setup UIF to process any missing values... MissingValuesMethods = ["NoAction", "DeleteRows", "DeleteColumns", "ReplaceByColumnMean", "ReplaceColumnMedian", "ReplaceByZero" , "LinearInterpolation"] MissingValuesMethodsDropdown = widgets.Dropdown(options = MissingValuesMethods, value = "NoAction", description = " ") ProcessMissingValueTopTextHTML = widgets.HTML(value = "Method for processing missing values:", placeholder='', description='') # Setup UIF to retrieve... StudyIDText = widgets.Text(value = "ST000001 ST000002", description = "Study ID (s)", placeholder = "Type study ID", disabled = False, layout = widgets.Layout(margin='0 10px 0 0')) RetrieveDataBtn = widgets.Button(description = 'Retrieve Data', disabled = False, button_stype = '', tooltip = "Retrieve data for study ID") RetrieveDataOutput = widgets.Output() def RetrieveDataBtnEventHandler(Object): global StudiesResultsData, RetrievedMWData RetrievedMWData = True StudiesResultsData = None StudyIDs = StudyIDText.value MissingValuesMethod = MissingValuesMethodsDropdown.value RetrieveDataOutput.clear_output() UploadDataOutput.clear_output() with RetrieveDataOutput: if len(StudyIDs): print("\nProcessing study ID(s): %s" % StudyIDs) StudiesResultsData = MWUtil.RetrieveStudiesAnalysisAndResultsData(StudyIDs, MWBaseURL, MissingValuesMethod) DisplayData = False if len(StudiesResultsData.keys()) > 5 else True MWUtil.ListStudiesAnalysisAndResultsData(StudiesResultsData, DisplayDataFrame = DisplayData, IPythonDisplayFuncRef = display, IPythonHTMLFuncRef = HTML) else: print("\nNo study ID(s) specified...") RetrieveDataBtn.on_click(RetrieveDataBtnEventHandler) # Setup UIF to upload data file(s)... FileUploadBtn = widgets.FileUpload(description = 'Upload File(s)', accept='.csv,.txt,.tsv', multiple = True, disabled = False) FileUploadTextHTML = widgets.HTML(value = "File format: Col 1: Sample names; \ Col 2: Class identifiers; Remaining cols: Named metabolites; \ Exts: .csv, .txt, or .tsv", placeholder='', description='') UploadDataOutput = widgets.Output() def FileUploadBtnEventHandler(Change): global StudiesResultsData, RetrievedMWData RetrievedMWData = False StudiesResultsData = None MissingValuesMethod = MissingValuesMethodsDropdown.value UploadedDataInfo = FileUploadBtn.value RetrieveDataOutput.clear_output() UploadDataOutput.clear_output() with UploadDataOutput: StudiesResultsData = MWUtil.RetrieveUploadedData(UploadedDataInfo, MissingValuesMethod) DisplayData = False if len(StudiesResultsData.keys()) > 5 else True MWUtil.ListStudiesAnalysisAndResultsData(StudiesResultsData, DisplayDataFrame = DisplayData, IPythonDisplayFuncRef = display, IPythonHTMLFuncRef = HTML) FileUploadBtn.observe(FileUploadBtnEventHandler, names = 'value') # Setup UIF to retrieve or upload data file... DataWarningTextHTML = widgets.HTML(value = "
Warning: Don't re-run the current cell after specifying study ID(s) or selecting file(s) and retrieving the data. Click on the next cell to advance.
", placeholder='', description='') OrTextHTML = widgets.HTML(value = "Or", placeholder='', description='') UIFDataBoxes = [] UIFDataBoxes.append(widgets.HBox([TopInfoTextHTML])) UIFDataBoxes.append(widgets.HBox([ProcessMissingValueTopTextHTML, MissingValuesMethodsDropdown])) UIFDataBoxes.append(widgets.HBox([StudyIDText, RetrieveDataBtn], layout = widgets.Layout(margin='10px 0 0 0'))) UIFDataBoxes.append(widgets.HBox([OrTextHTML])) UIFDataBoxes.append(widgets.HBox([FileUploadBtn])) UIFDataBoxes.append(widgets.HBox([FileUploadTextHTML])) UIFDataBoxes.append(widgets.HBox([DataWarningTextHTML])) for UIFDataBox in UIFDataBoxes: display(UIFDataBox) display(RetrieveDataOutput) display(UploadDataOutput) # In[ ]: MWUtil.CheckAndWarnEmptyStudiesData(StudiesResultsData, RetrievedMWData, StudyIDText.value) # Setup UIF for selecting and plotting available data... # In[ ]: # Setup UIF data... StudiesUIFData = MWUtil.SetupUIFDataForStudiesAnalysisAndResults(StudiesResultsData, MinClassCount = 2) # In[ ]: MWUtil.CheckAndWarnEmptyStudiesUIFData(StudiesUIFData, RetrievedMWData, StudyIDText.value) # In[ ]: # Setup a function to generate dataframe for clustered heatmap plot... def GenerateClusteredHeatupData(InputDataFrame, Normalization = "Auto", ClassColID = "Class", ClassNumColID = "ClassNum"): """Generate plot data frame. """ DataFrame = InputDataFrame.copy() # Drop Class column... if ClassColID is not None: DataFrame = DataFrame.drop(ClassColID, axis = 1) # Retrieve unique class nums... ClassNums = DataFrame[ClassNumColID] UniqueClassNums = DataFrame[ClassNumColID].unique() # Setup a features dataframe for metaboloties... FeaturesDataFrame = DataFrame.drop(ClassNumColID, axis = 1) # Setup row color information based on unique class nums... ClassNumsColorNamesMap = None if len(UniqueClassNums) <= 10: Colors = ["red", "green", "blue", "orange", "purple", "pink", "cyan", "olive", "brown", "grey"] ColorNames = [] ClassNumsColorNamesMap = {} for Index, ClassNum in enumerate(UniqueClassNums): ColorName = Colors[Index] ColorNames.append(ColorName) ClassNumsColorNamesMap[ClassNum] = ColorName ClassNumColors = sns.xkcd_palette(ColorNames) else: ClassNumColors = sns.color_palette("hls", len(UniqueClassNums)) ClassNumsColorsMap = {} for Index, ClassNum in enumerate(UniqueClassNums): ClassNumsColorsMap[ClassNum] = ClassNumColors[Index] ClassNumsRowColors = ClassNums.map(ClassNumsColorsMap) NormalizedFeaturesDataFrame = NormalizeData(FeaturesDataFrame, Method = Normalization) return (NormalizedFeaturesDataFrame, ClassNumsRowColors, ClassNumsColorNamesMap) # Setup a function to normalize data... def NormalizeData(InputDataFrame, Method = "Auto"): if re.match("^None$", Method, re.I): return InputDataFrame DataFrame = InputDataFrame # Center data by mean... DataFrame = DataFrame - DataFrame.mean() if re.match("^Median$", Method, re.I): DataFrame = DataFrame / DataFrame.median() elif re.match("^(Auto|Zscore)$", Method, re.I): DataFrame = DataFrame / DataFrame.std() elif re.match("^Pareto$", Method, re.I): DataFrame = DataFrame / np.sqrt(DataFrame.std()) elif re.match("^Range$", Method, re.I): DataFrame = DataFrame / (DataFrame.max() - DataFrame.min()) else: print("***Warning: Failed to normalize data: Unknown method %s..." % Method) return InputDataFrame return DataFrame # Setup a function to draw hierarchically-clustered heatmap.... def DrawClusteredHeatmapPlot(NormalizedFeaturesDataFrame, Method = "average", Metric = "correlation", RowCluster = True, ColCluster = True, CMapName = "inferno", RowColors = None, FontScale = None, PlotWidth = 9, PlotHeight = 6): sns.set(rc = {'figure.figsize':(PlotWidth, PlotHeight)}) if FontScale is not None: sns.set(font_scale = FontScale) g = sns.clustermap(NormalizedFeaturesDataFrame, method = Method, metric = Metric, z_score = None, standard_scale = None, figsize = (PlotWidth, PlotHeight), row_cluster = RowCluster, col_cluster = ColCluster, row_colors = RowColors, cmap = CMapName) plt.show() # In[ ]: StudyID = StudiesUIFData["StudyIDs"][0] AnalysisID = StudiesUIFData["AnalysisIDs"][StudyID][0] DataFrame = StudiesResultsData[StudyID][AnalysisID]["data_frame"] FeaturesDataFrame, ClassNumsRowColors, ClassNumsColorNamesMap = GenerateClusteredHeatupData(DataFrame) # In[ ]: # Setup UIF... FirstStudyID = StudiesUIFData["StudyIDs"][0] StudiesDropdown = widgets.Dropdown(options = StudiesUIFData["StudyIDs"], value = FirstStudyID, description="Study:", disabled = False) FirstAnalysisID = StudiesUIFData["AnalysisIDs"][FirstStudyID][0] AnalysisDropdown = widgets.Dropdown(options = StudiesUIFData["AnalysisIDs"][FirstStudyID], value = FirstAnalysisID, description = "Analysis:", disabled = False) ClusteringMethods = ["single", "complete", "average", "weighted", "centroid", "median", "ward"] ClusteringMethodsDropdown = widgets.Dropdown(options = ClusteringMethods, value = "average", description = "Method:") DistanceMetrics = ["braycurtis", "canberra", "chebyshev", "cityblock", "correlation", "cosine", "dice", "euclidean", "hamming", "jaccard", "jensenshannon", "kulsinski", "mahalanobis", "matching", "minkowski", "rogerstanimoto", "russellrao", "seuclidean", "sokalmichener", "sokalsneath", "sqeuclidean", "yule"] DistanceMetricsDropdown = widgets.Dropdown(options = DistanceMetrics, value = "correlation", description = "Metric:") ClusterRowsCheckBox = widgets.Checkbox(value = True, description = "Cluster rows", disabled = False ) ClusterColsCheckBox = widgets.Checkbox(value = True, description = "Cluster cols", disabled = False ) NormalizeDataMethods = ["ZScore", "Median", "Pareto", "Range", "None"] NormalizeDataMethodsDropdown = widgets.Dropdown(options = NormalizeDataMethods, value = NormalizeDataMethods[0], description = "Scaling:") ColorMaps = ["viridis", "plasma", "inferno", "magma", "spring", "summer", "autumn", "winter", "cool", "Wistia", "hot", "afmhot", "copper", "gray", "bone", "pink", "Greys", "Purples", "Blues", "Greens", "Oranges", "Reds"] ColorMapsDropdown = widgets.Dropdown(options = ColorMaps, value = "inferno", description = "Colormap:") DefaultPlotWidth = 9 DefaultPlotHeight = 9 PlotSizeText = widgets.Text(value = "9x9", description = "Plot size:", placeholder = "Type WxH; Hit enter", disabled = False, continuous_update=False) DataLayout = widgets.Layout(margin='0 0 4px 0') StudiesDataHBox = widgets.HBox([StudiesDropdown, AnalysisDropdown], layout = DataLayout) ClusteringDataHbox1 = widgets.HBox([ClusteringMethodsDropdown, DistanceMetricsDropdown], layout = DataLayout) ClusteringDataHbox2 = widgets.HBox([ClusterRowsCheckBox, ClusterColsCheckBox], layout = DataLayout) ClusteringDataHbox3 = widgets.HBox([NormalizeDataMethodsDropdown, ColorMapsDropdown], layout = DataLayout) ClusteringDataHbox4 = widgets.HBox([PlotSizeText], layout = DataLayout) Output = widgets.Output() OutputPlot = widgets.Output() UpdatePlot = True def DisablePlotUpdate(): global UpdatePlot UpdatePlot = False def EnablePlotUpdate(): global UpdatePlot UpdatePlot = True def GetUpdatePlotStatus(): global UpdatePlot return True if UpdatePlot else False # Setup function to update dropdown options... def UpdateAnalysisDropdown(StudyID): AnalysisDropdown.options = StudiesUIFData["AnalysisIDs"][StudyID] AnalysisDropdown.value = StudiesUIFData["AnalysisIDs"][StudyID][0] # Setup dropdown event handlers... def StudiesDropdownEventHandler(Change): StudyID = Change["new"] DisablePlotUpdate() UpdateAnalysisDropdown(StudyID) EnablePlotUpdate() PlotData() def AnalysisDropdownEventHandler(Change): PlotData() def ClusteringMethodsDropdownEventHandler(Change): PlotData() def DistanceMetricsDropdownEventHandler(Change): PlotData() def ClusterRowsCheckBoxEventHandler(Change): PlotData() def ClusterColsCheckBoxEventHandler(Change): PlotData() def NormalizeDataMethodsDropdownEventHandler(Change): PlotData() def ColorMapsDropdownEventHandler(Change): PlotData() def PlotSizeTextEventHandler(Change): PlotData() # Bind required event handlers... StudiesDropdown.observe(StudiesDropdownEventHandler, names = 'value') AnalysisDropdown.observe(AnalysisDropdownEventHandler, names = 'value') ClusteringMethodsDropdown.observe(ClusteringMethodsDropdownEventHandler, names = 'value') DistanceMetricsDropdown.observe(DistanceMetricsDropdownEventHandler, names = 'value') ClusterRowsCheckBox.observe(ClusterRowsCheckBoxEventHandler, names = 'value') ClusterColsCheckBox.observe(ClusterColsCheckBoxEventHandler, names = 'value') NormalizeDataMethodsDropdown.observe(NormalizeDataMethodsDropdownEventHandler, names = 'value') ColorMapsDropdown.observe(ColorMapsDropdownEventHandler, names = 'value') PlotSizeText.observe(PlotSizeTextEventHandler, names = 'value') # Set up function to generate clustered heapmap plot... def PlotData(): if not UpdatePlot: return Output.clear_output() OutputPlot.clear_output() StudyID = StudiesDropdown.value AnalysisID = AnalysisDropdown.value DataFrame = StudiesResultsData[StudyID][AnalysisID]["data_frame"] ClusteringMethod = ClusteringMethodsDropdown.value ClusteringMetric = DistanceMetricsDropdown.value if re.match("^(centroid|median|ward)$", ClusteringMethod, re.I): if not re.match("^Euclidean$", ClusteringMetric, re.I): with Output: print("The clustering metric, %s, must be 'Euclidean' for method '%s'" % (ClusteringMetric, ClusteringMethod)) return CluterRowData = ClusterRowsCheckBox.value ClusterColData = ClusterColsCheckBox.value NormalizeDataMethod = NormalizeDataMethodsDropdown.value ClusterColMapName = ColorMapsDropdown.value PlotSize = PlotSizeText.value.lower() PlotSize = re.sub(" ", "", PlotSize) PlotSizeWords = PlotSize.split("x") if len(PlotSizeWords) == 2 and len(PlotSizeWords[0]) > 0 and len(PlotSizeWords[1]) > 0: Width = float(PlotSizeWords[0]) Height = float(PlotSizeWords[1]) else: Width = DefaultPlotWidth Height = DefaultHeight with Output: print("Invalid plot size; Using default plot size: %sx%s\n" % (Width, Height)) with OutputPlot: # Setup data for clustering... NormalizedDataFrame, ClassNumsRowColors, ClassNumsColorNamesMap = GenerateClusteredHeatupData(DataFrame, Normalization = NormalizeDataMethod) # Draw clustered heatmap... DrawClusteredHeatmapPlot(NormalizedDataFrame, Method = ClusteringMethod, Metric = ClusteringMetric, RowCluster = CluterRowData, ColCluster = ClusterColData, CMapName = ClusterColMapName, RowColors = ClassNumsRowColors, PlotWidth = Width, PlotHeight = Height) with Output: MWUtil.ListClassInformation(StudiesResultsData, StudyID, AnalysisID, RetrievedMWData, ClassNumsColorNamesMap) display(StudiesDataHBox) display(ClusteringDataHbox1) display(ClusteringDataHbox2) display(ClusteringDataHbox3) display(ClusteringDataHbox4) display(OutputPlot) display(Output) PlotData()