from __future__ import division
from IPython.display import *
from numpy import *; seterr(all="ignore")
from numpy import linalg
from numpy import random
%matplotlib notebook
from matplotlib.pyplot import *
import wish
import audio.wave as wave
from audio.filters import FIR, AR
import audio.frames
import audio.index; import nltk; nltk.download("timit")
from audio.lp import lp
from audio.quantizers import Quantizer
[nltk_data] Downloading package timit to /home/boisgera/nltk_data... [nltk_data] Package timit is already up-to-date!
df = 16000
dt = 1.0 / df
fir = FIR([1.0])
print fir(2.0)
print fir(1.0)
print fir([0.0, 7.0, -3.0])
delay = FIR([0.0, 1.0])
fir = delay
print fir(2.0)
print fir(1.0)
print fir([0.0, 7.0, -3.0])
print fir.a
2.0 1.0 [ 0. 7. -3.] 0.0 2.0 [1. 0. 7.] [0. 1.]
utterances = audio.index.search(type=audio.index.Utterance)
utterance = utterances[0]
display(utterance)
data = utterance.audio
audio.wave.write(data, "voice.wav", df=16000)
Audio("voice.wav")
a crab challenged me but a quick stab vanquished him
for item in utterance:
print type(item).__name__.lower() + ": " + str(item)
phone: h# (a crab challenged me but a quick stab vanquished him). word: a [q-ey] word: crab [kcl-k-r-ae-bcl] word: challenged [ch-ae-l-ax-n-dcl-jh-dcl-d] phone: epi (a crab challenged me but a quick stab vanquished him). word: me [m-iy] phone: pau (a crab challenged me but a quick stab vanquished him). word: but [b-eh-dx] word: a [ix] word: quick [kcl-k-w-ih-kcl-k] word: stab [s-tcl-t-ae-bcl-b] word: vanquished [v-ae-ng-kcl-k-w-ix-sh-tcl-t] word: him [hh-ih-m] phone: h# (a crab challenged me but a quick stab vanquished him).
audio.index.search("bcl", type=audio.index.Phone)
0. bcl (black [bcl-b-l-ae-kcl]). 1. bcl (betide [bcl-b-iy-tcl-t-ay-dcl]). 2. bcl (be [bcl-b-iy]). 3. bcl (both [bcl-b-ow-th]). 4. bcl (blues [bcl-b-l-ux-z]). 5. bcl (be [bcl-b-ix]). 6. bcl (bowl [bcl-b-ow-l]). 7. bcl (publicity [p-ax-bcl-b-l-ih-s-ix-dx-iy]). 8. bcl (be [bcl-b-iy]). 9. bcl (rhubarb [r-uw-bcl-b-aa-r-bcl]). 10. bcl (rhubarb [r-uw-bcl-b-aa-r-bcl]). 11. bcl (biblical [b-ih-bcl-b-l-ih-kcl-k-el]). 12. bcl (maybe [m-ey-bcl-b-iy]). 13. bcl (blistered [bcl-b-l-ih-s-tcl-t-axr-dcl]). 14. bcl (underbrush [ah-n-axr-bcl-b-r-ah-sh]). 15. bcl (arbitrate [q-aa-r-bcl-b-ix-tcl-t-r-ey-tcl-t]). 16. bcl (tomboy [tcl-t-aa-m-bcl-b-oy]). 17. bcl (brother [bcl-b-r-ah-dh-er]). 18. bcl (battery [bcl-b-ae-dx-er-iy]). 19. bcl (built [bcl-b-ih-l-tcl]). 20. bcl (books [bcl-b-uh-kcl-s]). 21. bcl (be [bcl-b-iy]). 22. bcl (buy [bcl-b-ay]). 23. bcl (about [ax-h-bcl-b-aw-tcl]). 24. bcl (honeybees [hv-ah-n-iy-bcl-b-iy-z]). 25. bcl (be [bcl-b-iy]). 26. bcl (birth [bcl-b-axr-th]). 27. bcl (becoming [bcl-b-iy-kcl-k-ah-m-ix-ng]). 28. bcl (available [ax-v-ey-l-ax-bcl-b-el]). 29. bcl (bequests [bcl-b-iy-kcl-k-w-eh-s]). 30. bcl (obstacles [q-aa-bcl-s-tcl-t-ix-kcl-k-el-z]). 31. bcl (bluejay [bcl-b-l-ux-dcl-jh-ey]). 32. bcl (building [bcl-b-ih-l-dcl-ix-ng]). 33. bcl (both [bcl-b-ow-th]). 34. bcl (interchangeably [q-ih-n-tcl-t-axr-tcl-ch-ey-n-dcl-jh-ix-bcl-b-l-iy]). 35. bcl (able [q-ey-bcl-el]). 36. bcl (suburbanites [s-axr-bcl-b-er-bcl-axr-n-ay-tcl-s]). 37. bcl (suburbanites [s-axr-bcl-b-er-bcl-axr-n-ay-tcl-s]). 38. bcl (valuables [v-ae-y-ix-bcl-b-el-s]). 39. bcl (bank [bcl-b-ae-ng-kcl]). 40. bcl (impossible [ih-m-pcl-p-aa-s-bcl-b-el]). 41. bcl (bureaucracy [bcl-b-y-er-aa-kcl-k-axr-s-iy]). 42. bcl (subdued [s-ah-bcl-d-ux-dcl-d]). 43. bcl (by [bcl-b-ay]). 44. bcl (big [bcl-b-ih-gcl]). 45. bcl (bizarre [bcl-b-ix-z-aa-r]). 46. bcl (boy [bcl-b-oy]). 47. bcl (bought [bcl-b-ao-tcl]). 48. bcl (breakfast [bcl-b-r-eh-kcl-k-f-ax-h-s-tcl-t]). 49. bcl (you're [y-er-bcl]). 50. bcl (crab [kcl-k-r-ae-bcl]). 51. bcl (stab [s-tcl-t-ae-bcl-b]). 52. bcl (both [bcl-b-ow-th]). 53. bcl (be [bcl-b-iy]). 54. bcl (blowing [bcl-b-l-ow-ih-ng]). 55. bcl (by [bcl-b-ay]). 56. bcl (be [bcl-b-iy]). 57. bcl (problem [pcl-p-r-aa-bcl-b-l-ax-m]). 58. bcl (be [bcl-b-iy]). 59. bcl (suburban [s-ix-bcl-b-er-v-ih-n]). 60. bcl (gab [gcl-g-ae-bcl]). 61. bcl (habit [hv-ae-bcl-b-ih-tcl]). 62. bcl (big [bcl-b-ih-gcl]). 63. bcl (biology [bcl-b-ay-ax-l-ax-dcl-jh-iy]). 64. bcl (celebrates [s-eh-l-ax-bcl-b-r-ey-tcl-s]). 65. bcl (born [bcl-b-ao-r-n]).
class STP(Quantizer):
"Short-Term Predictor"
def __init__(self, order=16, method="autocorrelation"):
self.fir = FIR(a=r_[1.0, zeros(order)])
self.ar = AR(a=zeros(order))
self.order = order
self.method = method
def encode(self, data):
if self.method == "covariance" and self.order >= len(data):
raise ValueError("not enough data samples")
a = lp(data, order=self.order, method=self.method)
self.fir.a[:] = r_[1.0, -a]
error = self.fir(data)
return (a, error)
def decode(self, data):
a, error = data
self.ar.a[:] = a
return self.ar(error)
def stp_error(data, T=0.02, order=16, method="autocorrelation"):
length = len(data)
n = int(T * df) # number of samples for T s at the given frequency.
frames = audio.frames.split(data, n, pad=True)
stp = STP(order=order, method=method)
error = zeros(n*len(frames))
for i, frame in enumerate(frames):
a, error_frame = stp.encode(frame)
error[i*n:(i+1)*n] = error_frame
return error[:length]
figure()
n = len(data)
t = r_[0:n] / df
plot(t,data, "k", alpha=0.1); axis("tight")
error = stp_error(data)
plot(t,error, "r")
xlabel("time [s]")
ylabel("amplitude")
axis("tight")
grid()
error = stp_error(data)
SNR2 = mean(data*data) / mean(error*error)
print "SNR", 10*log10(SNR2), "dB"
T = 20.0 / 1000 # 20 ms
n = int(df * T)
data_frames = audio.frames.split(data, n, pad=True)
error_frames = audio.frames.split(error, n, pad=True)
fig, (ax1, ax2) = subplots(2,1, sharex=True)
SNRs = [10.0 * log10(mean(d*d) / mean(e*e)) for d,e in zip(data_frames, error_frames)]
ax1.plot(data, "k", alpha=0.1, label="original")
ax1.plot(error, "r", label="stp residual")
ax1.set_title("Waveforms")
ax1.axis("tight")
ax1.legend()
ax1.grid()
ax2.plot(r_[0:len(SNRs)] * n, SNRs, "b")
ax2.plot([0,len(data_frames)*n], [10*log10(SNR2), 10*log10(SNR2)], "k--")
ax2.set_xlabel("sample index")
ax2.set_ylabel("SNR [dB]")
ax2.set_title("Signal-to-Noise Ratio")
ax2.grid()
SNR 15.446422432113586 dB
wave.write(error, "voice-error-stp.wav", df=df)
display(Audio("voice.wav"))
display(Audio("voice-error-stp.wav"))
def ltp_parameters(history, frame,
offset_min=1, offset_max=None,
gain_min=0.0, gain_max=inf,
SNR_min=1.0, SNR_max=inf,
select=argmax,
returns="offset, gain"):
p = len(history)
data = r_[history, frame] # full data
m = len(frame)
n = len(data)
nxcorrs = zeros(p+1)
gains = zeros(p+1)
SNRs = zeros(p+1)
valids = zeros(p+1, dtype=bool)
frame_norm = linalg.norm(frame)
normed_frame = frame / frame_norm
for i in range(p + 1):
windowed_data = data[n-i-m:n-i]
windowed_data_norm = linalg.norm(windowed_data)
normed_windowed_data = windowed_data / windowed_data_norm
nxcorr = nxcorrs[i] = dot(normed_frame, normed_windowed_data)
SNR = SNRs[i] = 1.0 / sqrt(1 - nxcorr*nxcorr)
#print ">", SNR
gain = gains[i] = nxcorr / windowed_data_norm * frame_norm
valid = True
if offset_min is not None:
valid = valid and (offset_min <= i)
if offset_max is not None:
valid = valid and (i <= offset_max)
valid = valid and (gain_min <= gain <= gain_max)
valid = valid and (SNR_min <= SNR <= SNR_max)
valids[i] = valid
criteria = SNRs.copy()
criteria[logical_not(valids)] = -inf
offset = select(criteria)
if not valids[offset]: # everything is invalid!
raise ValueError("no valid set of parameters")
else:
gain = gains[offset]
nxcorr = nxcorrs[offset]
SNR = SNRs[offset]
return wish.grant(returns)
N = 100
data_ = (0.7 * sin(r_[0:N]/N * 2*pi*4) + 0.05 * random.uniform(-1,1,N)) * (1.0 + 0.2*r_[0:1:1.0/N])
#history[::7] = 1.0
history, frame = data_[:-25], data_[-25:]
m = len(history)
n = len(frame)
offset_min = 5
def select(factor):
def _select(SNRs):
i = argmax(SNRs)
is_ = r_[0:len(SNRs)]
SNRi = max(SNRs)
js = [int(ceil(i/2.0)), int(floor(i/2.0)), int(ceil(i/3.0)), int(floor(i/3.0)), int(ceil(i/4.0)), int(floor(i/4.0))]
ij = argmax(SNRs[js])
if SNRs[js[ij]] >= factor * SNRi:
return js[ij]
else:
return i
return _select
offset, gain, nxcorrs, SNRs, valids = ltp_parameters(history, frame,
offset_min=offset_min, SNR_min=1.0, select=argmax, #select(0.5),
returns="offset, gain, nxcorrs, SNRs, valids")
print "offset:", offset, "gain:", gain
figure()
plot(data_, "k", alpha=0.5, label="data")
plot(arange(0,n)+m, frame, "b", label="reference")
plot(arange(m-offset, m-offset+n), frame/gain, "r", label="matched")
axis("tight")
legend(loc=0)
offset: 75 gain: 1.1689857342477985
<matplotlib.legend.Legend at 0x7fa07b71ac50>
figure()
m = arange(len(SNRs))
plot(m[offset_min:],SNRs[offset_min:], "r", alpha=0.25, label="SNR")
n = arange(len(SNRs))
plot(n[valids],SNRs[valids], "bx", linewidth=1.0,label="valid")
xlabel("offset")
ylabel("SNR (linear scale)")
axis("tight")
legend(loc=0)
<matplotlib.legend.Legend at 0x7fa07b62e510>
class LTP(Quantizer):
def __init__(self, order, **options):
self.fir = FIR(a=r_[1.0, zeros(order)])
self.history = zeros(order)
self.ar = AR(a=zeros(order))
self.order = order
self.options = options
def encode(self, frame):
a = zeros_like(self.fir.a)
a[0] = 1.0
try:
offset, gain = ltp_parameters(self.history, frame, **self.options)
a[offset] = - gain
except ValueError:
offset, gain = 0, 0.0
self.fir.a[:] = a
error = self.fir(frame)
self.history = r_[self.history[len(frame):], frame]
return (offset, gain), error
def decode(self, data):
(offset, gain), error = data
a = zeros_like(self.ar.a)
a[offset-1] = gain
self.ar.a[:] = a
return self.ar(error)
f_min = 50.0
f_max = 400.0
order_ltp = int(df/f_min)
print order_ltp
offset_min = int(df/f_max)
print offset_min
320 40
def ltp_error(data, T=0.005, order=order_ltp, **options):
length = len(data)
n = int(T * df) # number of samples for T s at the given sampling frequency.
frames = audio.frames.split(data, n, pad=True)
ltp = LTP(order=order, **options)
error = zeros(n*len(frames))
offset = zeros_like(error)
gain = zeros_like(error)
for i, frame in enumerate(frames):
(offset_, gain_), error_frame = ltp.encode(frame)
error[i*n:(i+1)*n] = error_frame
offset[i*n:(i+1)*n] = ones_like(error_frame) * offset_
gain[i*n:(i+1)*n] = ones_like(error_frame) * gain_
error = error[:length]
offset = offset[:length]
gain = gain[:length]
return error, offset, gain
stp_error_ = stp_error(data)
ltp_error_, offset, gain = ltp_error(stp_error_, offset_min=offset_min, SNR_min=1.1, select=select(0.5))
stp_SNR2 = mean(data*data) / mean(stp_error_*stp_error_)
ltp_SNR2 = mean(data*data) / mean(ltp_error_*ltp_error_)
T = 20.0 / 1000 # 20 ms
n = int(df * T)
data_frames = audio.frames.split(data, n, pad=True)
stp_error_frames = audio.frames.split(stp_error_, n, pad=True)
ltp_error_frames = audio.frames.split(ltp_error_, n, pad=True)
fig, (ax1, ax2, ax3) = subplots(3,1, sharex=True)
stp_SNRs = [10.0 * log10(mean(d*d) / mean(e*e)) for d,e in zip(data_frames, stp_error_frames)]
ltp_SNRs = [10.0 * log10(mean(d*d) / mean(e*e)) for d,e in zip(data_frames, ltp_error_frames)]
ax1.plot(data, "k", alpha=0.1, label="original")
ax1.plot(stp_error_, "r", alpha=0.5, label="stp residual")
ax1.plot(ltp_error_, "g", alpha=1.0, label="stp+ltp residual")
ax1.set_title("Waveforms")
ax1.axis("tight")
ax1.legend()
ax1.grid()
ax2.plot(r_[0:len(stp_SNRs)] * n, stp_SNRs, "r", alpha=0.5, label="stp SNR")
ax2.plot([0,len(data_frames)*n], [10*log10(stp_SNR2), 10*log10(stp_SNR2)], "r--", alpha=0.5)
ax2.plot(r_[0:len(ltp_SNRs)] * n, ltp_SNRs, "g", label="stp+ltp SNR")
ax2.plot([0,len(data_frames)*n], [10*log10(ltp_SNR2), 10*log10(ltp_SNR2)], "g--")
ax2.set_xlabel("sample index")
ax2.set_ylabel("SNR [dB]")
ax2.set_title("Signal-to-Noise Ratio")
ax2.legend()
ax2.grid()
#n = len(data)
#t = r_[0:n] / df
ax3.plot(df / offset)
ax3.plot(2*df / offset, "k.", alpha=0.5, ms=0.25)
ax3.plot(3*df / offset, "k.", alpha=0.25, ms=0.25)
ax3.axis([0, len(data), 0, 400.0])
ax3.set_ylabel("frequency [Hz]")
ax3.set_yticks(r_[0:400:100])
ax3.grid()
wave.write(ltp_error_, "voice-error-stp-ltp.wav", df=df)
display(Audio("voice.wav"))
display(Audio("voice-error-stp.wav"))
display(Audio("voice-error-stp-ltp.wav"))