from pylab import *
import pickle
import numpy, scipy
import scikits.audiolab
import scipy.signal as signal
import scipy.stats as stats
from scipy.signal import kaiserord, lfilter, firwin, freqz, fftconvolve
#from FFT import *
import random
def calculate_volume(sound):
####################################
#Calculates root mean square to measure the strength of the signal. Might be better ways to measure this but it seemed like RMS should be reasonable
####################################
c = map(lambda x: x*x, sound)
out = numpy.sqrt(sum(c)/len(c))
return out
#return pwr[0]
def filter_sound(sound):
####################################
#band pass filter
#based on example from http://www.scipy.org/Cookbook/FIRFilter and other pages
#2k-3k band seems to work pretty well for robin call
####################################
sample_rate = 44100
nsamples = viewLength
t = arange(nsamples)/sample_rate
nyq_rate = sample_rate/2.0
width = 5.0/nyq_rate
ripple_db = 60.0
N, beta = kaiserord(ripple_db, width)
high_cutoff_hz = 3000.0
low_cutoff_hz = 2000.0
lowpass = firwin(N, low_cutoff_hz/nyq_rate, window=('kaiser', beta))
highpass = -firwin(N, high_cutoff_hz/nyq_rate, window=('kaiser', beta));highpass[N/2] = highpass[N/2] + 1
bandpass = -(lowpass+highpass);bandpass[N/2] = bandpass[N/2]+1
filtered = fftconvolve(sound, bandpass, mode='valid'), 0.5*(N-1)/sample_rate
return filtered
def filter_by_volume(sound, threshhold):
####################################
#clear out low volume sounds to make signal "cleaner"
####################################
for i in range(len(sound)):
if math.fabs(sound[i]) < threshhold: sound[i] = 0
def create_comparisons(target, other, start, number):
####################################
#INPUTS
# target the target sound
# other the sound being time shifted and compared to the target
# start the starting time of the incident of interest in target sound
# number the length of the incident of interest
####################################
#OUTPUT
# Returns array of comparisons across plausible range of offsets with intention of determining best fit
# Array values are points returned from compare (see compare for point format)
####################################
max_delay = 128 #approximate amount of data recorded in amount of time for sound to travel 1 meter which is the approximate distance of the microphones
comparisons = []
for i in range(-max_delay, max_delay):
comparisons.append(compare(target[start:start + number], other[start + i:start + number+i], number))
return comparisons
def compare(sound1, sound2, number):
####################################
#INPUTS
# sound1 one of the sounds to be compared
# sound2 the other sound being compared
# number the length of the sound segments (not currently being used)
####################################
#OUTPUT
# Returns (correlation coefficient, 2-tailed p-value)
####################################
#I don't think this makes much sense as currently implemented. Perhaps looking at frequency spectrum somehow, or maybe some sort of averaging performed before doing the comparison?
s = stats.pearsonr(sound1, sound2)
return s
def plot_comparisons(number=3):
####################################
# Convenience function to plot comparisons of 'number' incidents
####################################
carr = range(-128, 128)
inds = []
for i in range(number): #range(len(incidents1)):
length = incidents1[i][1] - incidents1[i][0]#15000
comp = create_comparisons(s1, s2, incidents1[i][0], length)
figure(i)
plot(carr, comp)
#m = max(comp)
#inds.append(comp.index(m) - 128)
#plot best guess offset for each incident found
#plot(inds, 'rs')
def find_incidents(sound, zthreshold, viewLength, with_print=False, with_play=False):
####################################
#Take filtered sound and parses out sound incidents (individual chunks of sound, in this case bird calls).
# so we can analyze data in appropriate chunks
####################################
#INPUT
# sound: should be pre-filtered
# zsthreshold: an incident must be at least this long
# viewLength: how much of the sound should be parsed
# with_print: if true prints incident information to std.out
# with_play: if true plays incident audio
####################################
#OUTPUT
# Returns array of incidents start and end indices
####################################
zcount = 0
reset = True
start = 0
incidents = []
for i in range(viewLength):
if reset and sound[i] != 0:
reset = False
start = i
if not reset and sound[i] != 0:
zcount = 0
if not reset and sound[i] == 0:
zcount += 1
if zcount > zthreshhold:
incidents.append([start, i - zthreshhold])
if with_print:
print "Incident start: " + str(start/44100.0) + ", end: " + str((i - zthreshhold)/44100.0) + " (in seconds)."
if with_play:
#play incident
scikits.audiolab.play(f1[start: i - zthreshold])
reset = True
zcount = 0
return incidents
def serialize_data(data, filename):
file = open(filename, "w")
pickle.dump(data, file)
file.close()
def load_serialization(filename):
file = open(filename, 'r')
data = pickle.load(file)
file.close
return data
def save_data():
#Filter the sounds and serialize - takes more diskspace but greatly reduces load time once it has been done
f1, delay1 = filter_sound(s1[:viewLength])
print "f1"
f2, delay2 = filter_sound(s2[:viewLength])
print "f2"
serialize_data(f1, "filtered1")
serialize_data(f2, "filtered2")
i1 = numpy.copy(f1)
i2 = numpy.copy(f2)
filter_by_volume(i1, .2)
print "i1"
filter_by_volume(i2, .2)
print "i2"
incidents1 = find_incidents(i1, zthreshhold, len(i1))
print "incidents1"
incidents2 = find_incidents(i2, zthreshhold, len(i2))
print "incidents2"
serialize_data(incidents1, "incidents1")
serialize_data(incidents2, "incidents2")
def load_data():
#Load data that has been previously serialized
f1 = load_serialization("filtered1")
f2 = load_serialization("filtered2")
incidents1 = load_serialization("incidents1")
incidents2 = load_serialization("incidents2")
return f1, f2, incidents1, incidents2
def volume_comparisons():
#averages calculated previously
s1_average = .000067228
s2_average = .000049858
for i in range(len(incidents1)):
#Should probably split incidents up into smaller chunks for volume calculations
v1 = calculate_volume(f1[incidents1[i][0]:incidents1[i][1]])
v2 = calculate_volume(f2[incidents2[i][0]:incidents2[i][1]])
stereo = numpy.array([f1[incidents1[i][0]:incidents1[i][1]], f2[incidents1[i][0]:incidents1[i][1]]])
scikits.audiolab.play(stereo)
wait = raw_input("Press enter to continue")
#scikits.audiolab.play(f1[incidents1[i][0]:incidents1[i][1]])
#scikits.audiolab.play(f2[incidents2[i][0]:incidents2[i][1]])
print str(i) + " left: " + str(v1/s1_average) + ", right: " + str(v2/s2_average)
wait = raw_input("Press enter to continue")
def attempt_to_move_sound(spacer_length = 44100):
#Playing around with trying to make a sound appear to come from different directions by adjusting volume and phase of the left/right channels
#Based on this little experiment, appearance of sound location seems to be affected more by relative volume than by offsets in the phase of the left/right channels
spacer = numpy.array([range(spacer_length), range(spacer_length)])
first = numpy.array([f1[incidents1[0][0]:incidents1[0][1]], f2[incidents1[0][0] - 476:incidents1[0][1] - 476]*.7])
second = numpy.array([f1[incidents1[0][0]:incidents1[0][1]], f2[incidents1[0][0] - 276:incidents1[0][1] - 276]*.9])
third = numpy.array([f1[incidents1[0][0]:incidents1[0][1]], f2[incidents1[0][0] + 276:incidents1[0][1] + 276]*1.1])
fourth = numpy.array([f1[incidents1[0][0]:incidents1[0][1]], f2[incidents1[0][0] + 476:incidents1[0][1] + 476]*1.7])
fifth = numpy.array([f1[incidents1[0][0]:incidents1[0][1]], f2[incidents1[0][0] - 476:incidents1[0][1] - 476]*1.7])
sixth = numpy.array([f1[incidents1[0][0]:incidents1[0][1]], f2[incidents1[0][0] - 276:incidents1[0][1] - 276]*1.3])
seventh = numpy.array([f1[incidents1[0][0]:incidents1[0][1]], f2[incidents1[0][0] + 276:incidents1[0][1] + 276]*.8])
eighth = numpy.array([f1[incidents1[0][0]:incidents1[0][1]], f2[incidents1[0][0] + 476:incidents1[0][1] + 476]*.5])
combine = numpy.hstack((first, spacer, second, spacer, third, spacer, fourth, spacer, fifth, spacer, sixth, spacer, seventh, spacer, eighth))
print "Playing manually moved sounds"
scikits.audiolab.play(combine)
print "Done playing manually moved sounds"
#######################
#INITIALIZE
#Read in file and setup appropriate variables
#######################
(snd, sampFreq, nBits) = scikits.audiolab.wavread('hughrobin.wav')
s1 = snd[:,0]
s2 = snd[:,1]
slength = snd.shape[0]*1.0
viewTime = 25
viewLength = 44100*viewTime
timeArray = arange(0, viewLength*1.0, 1)
timeArray = timeArray/sampFreq
zthreshhold = 44100/1
s1_average = .000067228
s2_average = .000049858
#save_data() #creates/saves filtered sounds if that needs to be done.
f1, f2, incidents1, incidents2 = load_data() #Load filtered sounds and incident arrays
########################
##Optional: Spectagram plots
#specgram(s1[0:viewLength], 256, sampFreq)
#specgram(f2[incidents2[1][0]:incidents2[1][1]], 256, sampFreq)
#figure(2)
#specgram(f2[incidents2[3][0]:incidents2[3][1]], 256, sampFreq)
#specgram(s2[0:viewLength], 256, sampFreq)
########################
##Optional: Plot comparisons of a couple of incidents
#plot_comparisons()
########################
#show()
volume_comparisons()
########################
##Optional: Experiment with what seems to make sounds seem to come from different directions
#attempt_to_move_sound()
########################
########################
##Optional: Uncomment to plot raw data
#plot(timeArray, s1[0:viewLength], 'ro', timeArray, s2[0:viewLength], 'bs')
#show()
########################
########################
##Optional: Uncomment to output stereo wav file of raw or filtered data to file 'test.wav'
#scikits.audiolab.wavwrite([s1[0:viewLength], s2[0:viewLength]], 'test.wav', 44100)
#scikits.audiolab.wavwrite(numpy.array([s1[0:viewLength], s2[0:viewLength]]).T, 'test.wav', 44100)
#raw_input()
########################