#!/usr/bin/env python # coding: utf-8 # # Dataset discovery tools # # This notebook shows some features to make the dataset discovery for CMS analysis easier. # The rucio sytem is queried to look for dataset and access to the list of all available file replicas. # # Users can exploit these tools at 2 different levels: # - low level: use the `rucio_utils` module directly to just query rucio # - high level: use the `DataDiscoveryCLI` class to simplify dataset query, replicas filters and uproot preprocessing with dask # ## Using Rucio utils directly # In[1]: from coffea.dataset_tools import rucio_utils from coffea.dataset_tools.dataset_query import print_dataset_query from rich.console import Console from rich.table import Table # In[2]: client = rucio_utils.get_rucio_client() client # In[3]: query = "/TTToSemiLeptonic_*_13TeV-powheg-pythia8/RunIISummer20UL18NanoAODv9*/NANOAODSIM" # In[4]: outlist, outtree = rucio_utils.query_dataset( query, client=client, tree=True, scope="cms", ) outlist[1:5] # Let's now pretty-print the results in a table using an utility function in the `dataset_query` module. # In[6]: console = Console() print_dataset_query(query, outtree, console) # ### Dataset replicas # Let's select one dataset and look for available replicas # In[7]: dataset = outlist[0] dataset # Using the option `mode='full'` in the function `rucio_utils.get_dataset_file_replicas()` one gets all the available replicas. # In[8]: try: ( outfiles, outsites, sites_counts, ) = rucio_utils.get_dataset_files_replicas( dataset, allowlist_sites=[], blocklist_sites=[], regex_sites=[], mode="full", # full or first. "full"==all the available replicas client=client, ) except Exception as e: print(f"\n[red bold] Exception: {e}[/]") # In[9]: def print_replicas(sites_counts): console.print(f"[cyan]Sites availability for dataset: [red]{dataset}") table = Table(title="Available replicas") table.add_column("Index", justify="center") table.add_column("Site", justify="left", style="cyan", no_wrap=True) table.add_column("Files", style="magenta", no_wrap=True) table.add_column("Availability", justify="center") table.row_styles = ["dim", "none"] Nfiles = len(outfiles) sorted_sites = dict( sorted(sites_counts.items(), key=lambda x: x[1], reverse=True) ) for i, (site, stat) in enumerate(sorted_sites.items()): table.add_row( str(i), site, f"{stat} / {Nfiles}", f"{stat*100/Nfiles:.1f}%" ) console.print(table) # In[10]: print_replicas(sites_counts) # ### Filtering sites # Grid sites can be filtered in 3 different ways # - **allowlist**: if this list of specified, only the sites in the list are considered. No blocklist and regex are considered # - **blocklist**: if this list is specified, those sites are excluded from the replicas # - **regex_sites**: regex filter the sites to be considered, on top of the blocklist # In[16]: # Example with allowlist try: ( outfiles, outsites, sites_counts, ) = rucio_utils.get_dataset_files_replicas( dataset, allowlist_sites=["T2_DE_DESY", "T1_US_FNAL_Disk"], blocklist_sites=[], regex_sites=None, mode="full", # full or first. "full"==all the available replicas client=client, ) except Exception as e: print(f"\n[red bold] Exception: {e}[/]") print_replicas(sites_counts) # In[18]: # Example with blocklist try: ( outfiles, outsites, sites_counts, ) = rucio_utils.get_dataset_files_replicas( dataset, allowlist_sites=[], blocklist_sites=["T2_DE_DESY", "T3_CH_PSI"], regex_sites=None, mode="full", # full or first. "full"==all the available replicas client=client, ) except Exception as e: print(f"\n[red bold] Exception: {e}[/]") print_replicas(sites_counts) # In[22]: # Example with regex try: ( outfiles, outsites, sites_counts, ) = rucio_utils.get_dataset_files_replicas( dataset, allowlist_sites=[], blocklist_sites=[], regex_sites= r"T[123]_(FR|IT|BE|CH|DE|ES|UK)_\w+", mode="full", # full or first. "full"==all the available replicas client=client, ) except Exception as e: print(f"\n[red bold] Exception: {e}[/]") print_replicas(sites_counts) # ## Using the DataDiscoveryCLI # Manipulating the dataset query and replicas is simplified by the `DataDiscoveryCLI` class in `dataset_query` module. # In[1]: from coffea.dataset_tools import rucio_utils from coffea.dataset_tools.dataset_query import print_dataset_query from rich.console import Console from coffea.dataset_tools.dataset_query import DataDiscoveryCLI # In[2]: dataset_definition = { "/DYJetsToLL_M-50_TuneCP5_13TeV-amcatnloFXFX-pythia8/RunIISummer20UL18NanoAODv9-106X*/NANOAODSIM": {"short_name": "ZJets", "metadata": {"xsec": 100.0,"isMC":True}}, "/SingleMuon/Run2018C-UL20*_MiniAODv2_NanoAODv9_GT36*/NANOAOD": {"short_name": "SingleMuon", "metadata": {"isMC":False}} } # The dataset definition is passed to a `DataDiscoveryCLI` to automatically query rucio and get replicas # In[11]: ddc = DataDiscoveryCLI() ddc.load_dataset_definition(dataset_definition, query_results_strategy="all", replicas_strategy="round-robin") # ### Filtering sites # Sites filtering works in a very similar way for `DataDiscoveryCLI` # In[17]: ddc = DataDiscoveryCLI() ddc.do_regex_sites(r"T[123]_(CH|IT|UK|FR|DE)_\w+") ddc.load_dataset_definition(dataset_definition, query_results_strategy="all", replicas_strategy="round-robin") # In[18]: ddc.do_list_selected() # ### Save the replicas metadata # In[20]: ddc.do_save("replicas_info.json") # ## DataDiscoveryCLI from shell # The DataDiscoveryCLI can be used directly from CLI # In[35]: get_ipython().system('python -m coffea.dataset_tools.dataset_query --help') # In[ ]: get_ipython().system('python -m coffea.dataset_tools.dataset_query --cli -d dataset_definition.json') # ## Preprocess the fileset with dask # The replicas metadata contain the file location in the CMS grid. # This info can be **preprocessed** with uproot and dask-awkward to extract the **fileset**. Practically a fileset is a collection of metadata about the file location, file name, chunks splitting, that can be used directly to configure the uproot reading. # # This step replaces the preprocessing step in coffea 0.7.x. The output of the preprocessing can be used directly to start an analysis with dask-awkward. # # The preprocessing is performed locally with multiple processes if `dask_cluster==None`, but a pre-existing dask cluster url can be passed. # In[22]: fileset_total = ddc.do_preprocess(output_file="fileset", step_size=10000, #chunk size for files splitting align_to_clusters=False, scheduler_url=None) # In[24]: import gzip import json with gzip.open("fileset_available.json.gz", "rt") as file: fileset_available = json.load(file) # In[32]: dataset = '/DYJetsToLL_M-50_TuneCP5_13TeV-amcatnloFXFX-pythia8/RunIISummer20UL18NanoAODv9-106X_upgrade2018_realistic_v16_L1v1-v2/NANOAODSIM' for i, (file, meta) in enumerate(fileset_available[dataset]["files"].items()): print(file, meta) if i>3: break # In[ ]: