1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
import sys, audioop
class PureMixer:
#
# An audio mixer in Python based on audioop
#
# Note that opening the audio device itself is outside the scope of
# this module. Anything else could also be done with the mixed data,
# e.g. stored on disk, for all this module knows.
def __init__(self, freq=44100, bits=8, signed=0,
channels=1, byteorder=None):
"""Open the mixer and set its parameters."""
self.freq = freq
self.bytes = bits/8
self.signed = signed
self.channels = channels
self.byteorder = byteorder or sys.byteorder
self.parameters = (freq, self.bytes, signed, channels, self.byteorder)
self.bytespersample = channels*self.bytes
self.queue = '\x00' * self.bytes
def resample(self, data, freq=44100, bits=8, signed=0,
channels=1, byteorder=None):
"Convert a sample to the mixer's own format."
bytes = bits/8
byteorder = byteorder or sys.byteorder
if (freq, bytes, signed, channels, byteorder) == self.parameters:
return data
# convert to native endianness
if byteorder != sys.byteorder:
data = byteswap(data, bytes)
byteorder = sys.byteorder
# convert unsigned -> signed for the next operations
if not signed:
data = audioop.bias(data, bytes, -(1<<(bytes*8-1)))
signed = 1
# convert stereo -> mono
while channels > self.channels:
assert channels % 2 == 0
data = audioop.tomono(data, bytes, 0.5, 0.5)
channels /= 2
# resample to self.freq
if freq != self.freq:
data, ignored = audioop.ratecv(data, bytes, channels,
freq, self.freq, None)
freq = self.freq
# convert between 8bits and 16bits
if bytes != self.bytes:
data = audioop.lin2lin(data, bytes, self.bytes)
bytes = self.bytes
# convert mono -> stereo
while channels < self.channels:
data = audioop.tostereo(data, bytes, 1.0, 1.0)
channels *= 2
# convert signed -> unsigned
if not self.signed:
data = audioop.bias(data, bytes, 1<<(bytes*8-1))
signed = 0
# convert to mixer endianness
if byteorder != self.byteorder:
data = byteswap(data, bytes)
byteorder = self.byteorder
# done
if (freq, bytes, signed, channels, byteorder) != self.parameters:
raise ValueError('sound sample conversion failed')
return data
def wavesample(self, file):
"Read a sample from a .wav file (or file-like object)."
import wave
w = wave.open(file, 'r')
return self.resample(w.readframes(w.getnframes()),
freq = w.getframerate(),
bits = w.getsampwidth() * 8,
signed = w.getsampwidth() > 1,
channels = w.getnchannels(),
byteorder = 'little')
def mix(self, mixer_channels, bufsize):
"""Mix the next batch buffer.
Each object in the mixer_channels list must be a file-like object
with a 'read(size)' method."""
data = ''
already_seen = {}
channels = mixer_channels[:]
channels.reverse()
for c in channels:
if c in already_seen:
data1 = ''
else:
data1 = c.read(bufsize)
already_seen[c] = 1
if data1:
l = min(len(data), len(data1))
data = (audioop.add(data[:l], data1[:l], 1) +
(data1[l:] or data[l:]))
else:
try:
mixer_channels.remove(c)
except ValueError:
pass
data += self.queue * ((bufsize - len(data)) / self.bytes)
self.queue = data[-self.bytes:]
return data
def byteswap(data, byte):
if byte == 1:
return
if byte == 2:
typecode = 'h'
elif byte == 4:
typecode = 'i'
else:
raise ValueError('cannot convert endianness for samples of %d bytes' % byte)
import array
a = array.array(typecode, data)
if a.itemsize != byte:
raise ValueError('endianness convertion failed')
a.byteswap()
return a.tostring()
|