#!/usr/bin/env python # coding: utf-8 # # Test alignment parsing by `Targets` # This Jupyter notebook is designed to test parsing of alignments by `Targets`. # ## Import Python modules # In[1]: import contextlib import os import random import re import tempfile from IPython.display import display import pandas as pd import yaml import alignparse.minimap2 import alignparse.targets # ## Set up `Targets` # Read in the RecA target used in the notebook examples: # In[2]: targetfile = '../notebooks/input_files/recA_amplicon.gb' feature_parse_specs_file = '../notebooks/input_files/recA_feature_parse_specs.yaml' with open(feature_parse_specs_file) as f: feature_parse_specs = yaml.safe_load(f) # we can't get accuracies for the dummy alignments used here feature_parse_specs['RecA_PacBio_amplicon']['gene']['return'] = 'mutations' feature_parse_specs['RecA_PacBio_amplicon']['barcode']['return'] = 'sequence' targets = alignparse.targets.Targets(seqsfile=targetfile, feature_parse_specs=feature_parse_specs, allow_extra_features=True) print(f"There is just one target: {targets.target_names}") target = targets.targets[0] print(f"It has the following features: {target.feature_names}") _ = targets.plot() # Location of features in 0-indexed scheme: # In[3]: for feature in target.features: print(feature.name, feature.start, feature.end) # ## Make some queries # We make some query sequences that we will align against the target. # The name of each query is a description of all ways that it differs from target: # In[4]: random.seed(1) # for reproducible output nts = 'ACGT' # valid nucleotides def randseq(length): """Random nucleotide sequence of given length.""" return ''.join(random.choices(nts, k=length)) def mutseq(wtseq): """Random mutant sequence from a wildtype sequence.""" return ''.join(random.choice([nt for nt in nts if nt != wt]) for wt in wtseq) def get_wt_query(target, ambiguous_features=('barcode', 'variant_tag5', 'variant_tag3')): """`(description, seq)` for wildtype query, ambiguous features assigned.""" seq = target.seq description = [] for fname in ambiguous_features: f = target.get_feature(fname) fseq = randseq(f.length) seq = seq[: f.start] + fseq + seq[f.end:] description.append(f"{f.name}={fseq}") assert len(seq) == len(target.seq) assert re.fullmatch(f"[{nts}]+", seq) return ','.join(description), seq queries = [] # get two random (unmappable) queries queries.append(('unmapped_1', randseq(target.length))) queries.append(('unmapped_2', randseq(target.length // 2))) # get a fully wildtype query queries.append(get_wt_query(target)) # query with query clipping at both ends and a codon substitution in gene desc, seq = get_wt_query(target) # add substitution to gene gene = target.get_feature('gene') geneseq = gene.seq mutstart = 6 mutlength = 3 mutcodon = mutseq(gene.seq[mutstart: mutstart + mutlength]) sub_desc = [] for i in range(mutlength): wt = geneseq[i + mutstart] mut = mutcodon[i] sub_desc.append(f"{wt}{mutstart + i}{mut}") geneseq = geneseq[: mutstart + i] + mut + geneseq[mutstart + i + 1:] seq = seq[: gene.start] + geneseq + seq[gene.end:] desc += f",gene_{'-'.join(sub_desc)}" # add clipping desc += ',query_clip5=9' seq = randseq(9) + seq desc += ',query_clip3=7' seq += randseq(7) # add to list of queries queries.append((desc, seq)) # query with a deletion in gene and in insertion in spacer desc, seq = get_wt_query(target) delstart = 7 dellength = 15 geneseq = gene.seq[: delstart] + gene.seq[delstart + dellength:] spacer = target.get_feature('spacer') insstart = 4 ins = 'G' spacerseq = spacer.seq[: insstart] + ins + spacer.seq[insstart:] seq = seq[: gene.start] + geneseq + seq[gene.end: spacer.start] + spacerseq + seq[spacer.end:] desc += f",gene_del{delstart}to{delstart + dellength}" desc += f",spacer_ins{insstart}{ins}" queries.append((desc, seq)) # query with deletion spanning gene and spacer desc, seq = get_wt_query(target) delstartgene = gene.length - 7 delendgene = delstartgene + 7 geneseq = gene.seq[: delstartgene] delendspacer = 9 spacerseq = spacer.seq[delendspacer:] seq = seq[: gene.start] + geneseq + spacerseq + seq[spacer.end:] desc += f",gene_del{delstartgene}to{delendgene},spacer_del1to{delendspacer}" queries.append((desc, seq)) # query with insertion at boundary of gene and spacer, note how # it is assigned as being at end of gene desc, seq = get_wt_query(target) ins = randseq(14) seq = seq[: gene.end] + ins + seq[spacer.start:] desc += f",gene_ins{gene.length}{ins}" queries.append((desc, seq)) # query with target clipping at both ends desc, seq = get_wt_query(target) target_clip5 = target.get_feature('termini5').length + 4 target_clip3 = 2 seq = seq[target_clip5: -target_clip3] desc = desc.split(',') desc = ','.join([desc[0], 'variant_tag5=None', desc[2]]) desc += f",target_clip5={target_clip5},target_clip3={target_clip3}" desc += ',termini5=None,gene=,termini3=' queries.append((desc, seq)) # ## Align the queries to the target # In[5]: mapper = alignparse.minimap2.Mapper(alignparse.minimap2.OPTIONS_CODON_DMS) with contextlib.ExitStack() as stack: # write queries to FASTA file queryfile = tempfile.NamedTemporaryFile('r+', suffix='.fasta') queryfile.write('\n'.join(f">{desc}\n{seq}" for desc, seq in queries)) queryfile.flush() # align the queries to the target alignmentfile = tempfile.NamedTemporaryFile('r+', suffix='.sam') targets.align(queryfile.name, alignmentfile.name, mapper) # get the alignment cs data frame alignments_cs = targets._parse_alignment_cs(alignmentfile.name) # get the alignment results readstats, aligned, filtered = targets.parse_alignment(alignmentfile.name) # ## Check results of `Targets._parse_alignment_cs` # We now check if the feature-specific `cs` strings returned by `Targets._parse_alignment_cs` are correct. # # We expect there to be alignments for just our one target plus the number of unmapped reads: # In[6]: sorted(alignments_cs.keys()) == [target.name, 'unmapped'] # Make sure the number of unmapped queries equals the number we passed in: # In[7]: unmapped_queries = [(desc, query) for desc, query in queries if 'unmapped' in desc] len(unmapped_queries) == alignments_cs['unmapped'] # Make sure we get the expected queries mapped to our target: # In[8]: mapped_queries = [tup for tup in queries if tup not in unmapped_queries] len(mapped_queries) == len(alignments_cs[target.name]) # Now we look manually at whether we got the expected `cs` strings for features: # In[9]: with pd.option_context('display.max_colwidth', -1, 'display.max_columns', None): display(alignments_cs[target.name]) # ## Check reseults of `Targets.parse_alignment` # Now check the results or `Targets.parse_alignment`. # # First make sure the read stats are correct. # Here are those read stats: # In[10]: readstats # Make sure they are correct for number of unmapped reads: # In[11]: readstats.set_index('category')['count'].to_dict()['unmapped'] == len(unmapped_queries) # Now look at the filtered reads and make sure that they are what we expect: # In[12]: with pd.option_context('display.max_colwidth', -1, 'display.max_columns', None): display(filtered[target.name]) # Check that the aligned reads are also what we expect: # In[13]: with pd.option_context('display.max_colwidth', -1, 'display.max_columns', None): display(aligned[target.name])