%matplotlib inline #export from exp.nb_12a import * import torchaudio from torchaudio import transforms #export AUDIO_EXTS = {str.lower(k) for k,v in mimetypes.types_map.items() if v.startswith('audio/')} dsid = "ST-AEDS-20180100_1-OS" data_url = f'http://www.openslr.org/resources/45/{dsid}' # actual URL has .tgz extension but untar_data doesn't like that path = Path.home() / Path(f".fastai/data/{dsid}/") datasets.untar_data(data_url, dest=path) path audios = get_files(path, extensions=AUDIO_EXTS) print(f"Found {len(audios)} audio files") audios[:5] #export class AudioList(ItemList): @classmethod def from_files(cls, path, extensions=None, recurse=True, include=None, **kwargs): if extensions is None: extensions = AUDIO_EXTS return cls(get_files(path, extensions, recurse=recurse, include=include), path, **kwargs) def get(self, fn): sig, sr = torchaudio.load(fn) assert sig.size(0) == 1, "Non-mono audio detected, mono only supported for now." return (sig, sr) al = AudioList.from_files(path); al al[0] sd = SplitData.split_by_func(al, partial(random_splitter, p_valid=0.2)) sd #export def re_labeler(fn, pat): return re.findall(pat, str(fn))[0] label_pat = r'/([mf]\d+)_' speaker_labeler = partial(re_labeler, pat=label_pat) ll = label_by_func(sd, speaker_labeler, proc_y=CategoryProcessor()) ll #export from IPython.display import Audio def show_audio(ad): sig,sr=ad display(Audio(data=sig, rate=sr)) #export def show_audio_in_out(orig, trans): """Helper to plot input and output signal in different colors""" osig,osr = orig tsig,tsr = trans print("↓ Original ↓") show_audio(orig) print("↓ Transformed ↓") show_audio(trans) if orig is not None: plt.plot(osig[0], 'm', label="Orig.") if trans is not None: plt.plot(tsig[0], 'c', alpha=0.5, label="Transf.") plt.legend() plt.show() #export class ToCuda(Transform): _order=10 def __call__(self, ad): sig,sr = ad return (sig.cuda(), sr) ToCuda()(ll.train[0][0]) #export class PadOrTrim(Transform): _order=11 def __init__(self,msecs): self.msecs = msecs def __call__(self, ad): sig,sr = ad mx = sr//1000 * self.msecs return (transforms.PadTrim(mx)(sig), sr) pt = PadOrTrim(3000) ## duration in milliseconds show_audio_in_out(ll.train[0][0], pt(ll.train[0][0])) #export class SignalShift(Transform): _order=20 def __init__(self, max_shift_pct=.6): self.max_shift_pct = max_shift_pct def __call__(self, ad): sig,sr = ad roll_by = int(random.random()*self.max_shift_pct*len(sig[0])) return (sig.roll(roll_by), sr) shifter = SignalShift() show_audio_in_out(ll.train[0][0], shifter(ll.train[0][0])) #export class Spectrogrammer(Transform): _order=90 def __init__(self, to_mel=True, to_db=True, n_fft=400, ws=None, hop=None, f_min=0.0, f_max=None, pad=0, n_mels=128, top_db=None, normalize=False): self.to_mel, self.to_db, self.n_fft, self.ws, self.hop, self.f_min, self.f_max, \ self.pad, self.n_mels, self.top_db, self.normalize = to_mel, to_db, n_fft, \ ws, hop, f_min, f_max, pad, n_mels, top_db, normalize def __call__(self, ad): sig,sr = ad if self.to_mel: spec = transforms.MelSpectrogram(sr, self.n_fft, self.ws, self.hop, self.f_min, self.f_max, self.pad, self.n_mels)(sig) else: spec = transforms.Spectrogram(self.n_fft, self.ws, self.hop, self.pad, normalize=self.normalize)(sig) if self.to_db: spec = transforms.SpectrogramToDB(top_db=self.top_db)(spec) spec = spec.permute(0,2,1) # reshape so it looks good to humans return spec #export def show_spectro(img, ax=None, figsize=(6,6), with_shape=True): if hasattr(img,"device") & str(img.device).startswith("cuda"): img = img.cpu() if ax is None: _,ax = plt.subplots(1, 1, figsize=figsize) ax.imshow(img if (img.shape[0]==3) else img.squeeze(0)) if with_shape: display(f'Tensor shape={img.shape}') speccer = Spectrogrammer(to_db=True, n_fft=1024, n_mels=64, top_db=80) show_spectro(speccer(ll.train[0][0])) #export class SpecAugment(Transform): _order=99 def __init__(self, max_mask_pct=0.2, freq_masks=1, time_masks=1, replace_with_zero=False): self.max_mask_pct, self.freq_masks, self.time_masks, self.replace_with_zero = \ max_mask_pct, freq_masks, time_masks, replace_with_zero if not 0 <= self.max_mask_pct <= 1.0: raise ValueError( f"max_mask_pct must be between 0.0 and 1.0, but it's {self.max_mask_pct}") def __call__(self, spec): _, n_mels, n_steps = spec.shape F = math.ceil(n_mels * self.max_mask_pct) # rounding up in case of small % T = math.ceil(n_steps * self.max_mask_pct) fill = 0 if self.replace_with_zero else spec.mean() for i in range(0, self.freq_masks): f = random.randint(0, F) f0 = random.randint(0, n_mels-f) spec[0][f0:f0+f] = fill for i in range(0, self.time_masks): t = random.randint(0, T) t0 = random.randint(0, n_steps-t) spec[0][:,t0:t0+t] = fill return spec masker = SpecAugment(freq_masks=2,time_masks=2,max_mask_pct=0.1) show_spectro(masker(speccer(ll.train[0][0]))) pad_3sec = PadOrTrim(3000) shifter = SignalShift() speccer = Spectrogrammer(n_fft=1024, n_mels=64, top_db=80) masker = SpecAugment(freq_masks=2, time_masks=2, max_mask_pct=0.1) tfms = [ToCuda(), shifter, pad_3sec, speccer, masker] al = AudioList.from_files(path, tfms=tfms) sd = SplitData.split_by_func(al, partial(random_splitter, p_valid=0.2)) ll = label_by_func(sd, speaker_labeler, proc_y=CategoryProcessor()) show_spectro(ll.train[4][0]) bs=64 c_in = ll.train[0][0].shape[0] c_out = len(uniqueify(ll.train.y)) data = ll.to_databunch(bs,c_in=c_in,c_out=c_out) x,y = next(iter(data.train_dl)) #export def show_batch(x, c=4, r=None, figsize=None, shower=show_image): n = len(x) if r is None: r = int(math.ceil(n/c)) if figsize is None: figsize=(c*3,r*3) fig,axes = plt.subplots(r,c, figsize=figsize) for xi,ax in zip(x,axes.flat): shower(xi, ax) show_spec_batch = partial(show_batch, c=4, r=2, figsize=None, shower=partial(show_spectro, with_shape=False)) show_spec_batch(x) opt_func = adam_opt(mom=0.9, mom_sqr=0.99, eps=1e-6, wd=1e-2) loss_func = LabelSmoothingCrossEntropy() lr = 1e-2 pct_start = 0.5 phases = create_phases(pct_start) sched_lr = combine_scheds(phases, cos_1cycle_anneal(lr/10., lr, lr/1e5)) sched_mom = combine_scheds(phases, cos_1cycle_anneal(0.95,0.85, 0.95)) cbscheds = [ParamScheduler('lr', sched_lr), ParamScheduler('mom', sched_mom)] learn = cnn_learner(xresnet34, data, loss_func, opt_func) learn.fit(5, cbs=cbscheds) # dsid = "ST-AEDS-20180100_1-OS" # data_url = f'http://www.openslr.org/resources/45/{dsid}' # actual URL has .tgz extension but untar_data doesn't like that # path = Path.home() / Path(f".fastai/data/{dsid}/") # datasets.untar_data(data_url, dest=path) # pad_3sec = PadOrTrim(3000) # shifter = SignalShift() # speccer = Spectrogrammer(n_fft=1024, n_mels=64, top_db=80) # masker = SpecAugment(freq_masks=2, time_masks=2, max_mask_pct=0.1) # tfms = [ToCuda(), shifter, pad_3sec, speccer, masker] # al = AudioList.from_files(path, tfms=tfms) # sd = SplitData.split_by_func(al, partial(random_splitter, p_valid=0.2)) # label_pat = r'/([mf]\d+)_' # speaker_labeler = partial(re_labeler, pat=label_pat) # ll = label_by_func(sd, speaker_labeler, proc_y=CategoryProcessor()) # bs=64 # c_in = ll.train[0][0].shape[0] # c_out = len(uniqueify(ll.train.y)) # data = ll.to_databunch(bs,c_in=c_in,c_out=c_out) # opt_func = adam_opt(mom=0.9, mom_sqr=0.99, eps=1e-6, wd=1e-2) # loss_func = LabelSmoothingCrossEntropy() # lr = 1e-2 # pct_start = 0.5 # phases = create_phases(pct_start) # sched_lr = combine_scheds(phases, cos_1cycle_anneal(lr/10., lr, lr/1e5)) # sched_mom = combine_scheds(phases, cos_1cycle_anneal(0.95,0.85, 0.95)) # cbscheds = [ParamScheduler('lr', sched_lr), # ParamScheduler('mom', sched_mom)] # learn = cnn_learner(xresnet34, data, loss_func, opt_func) # learn.fit(5, cbs=cbscheds) nb_auto_export()