patternpythonMinor
A pythonic way of de-interleaving a list (i.e. data from a generator), into multiple lists
Viewed 0 times
pythonicinterleavingintowaygeneratormultiplelistslistfromdata
Problem
I've recently discovered the wonders of the Python world, and am quickly learning. Coming from
I need to unpack data received from a device. Data is received as a string of "bytes", of arbitrary length. Each packet (string) consists of samples, for eight channels. The number of samples varies, but will always be a multiple of the number of channels. The channels are interleaved. To make things a bit more complex, samples can be either 8 or 16 bits in length. Check the code, and you'll see.
I've already got a working implementation. However, as I've just stumbled upon generators, iterators, maps and ... numpy, I suspect there might be a more efficient way of doing it. If not efficient, maybe more "pythonic". I'm curious, and if someone would spend some time giving me a pointer in the right (or any) direction, I would be very grateful. As of now, I am aware of the fact that my Python has a strong smell of C#. But I'm learning ...
This is my working implementation. It is efficient enough, but I suspect it can be improved. Especially the de-interleaving part. On my machine it prints:
As you can see, creating the generator takes no amount of time. De-interleaving the data is the real issue. Maybe the data generation and de-interleaving can be done simultaneously?
This is not my first implementation, but I never seem to be able to drop below approx
```
from datetime import datetime
def unpack_data(data):
l = len(data)
p = 0
while p > 1 > 1
# generate some test data ...
test_data = ''
for n in range(500 12 2 - 1):
test_data += chr(n % 256)
t0 = datetime.utcnow()
# in this example we have 6000 samples, 8 channels, 750 samples/channel
# data received is interleaved: A1, B1, C1, ..., A2,
Windows/C#/.NET, I find it refreshing working in Python on Linux. A day you've learned something new is not a day wasted.I need to unpack data received from a device. Data is received as a string of "bytes", of arbitrary length. Each packet (string) consists of samples, for eight channels. The number of samples varies, but will always be a multiple of the number of channels. The channels are interleaved. To make things a bit more complex, samples can be either 8 or 16 bits in length. Check the code, and you'll see.
I've already got a working implementation. However, as I've just stumbled upon generators, iterators, maps and ... numpy, I suspect there might be a more efficient way of doing it. If not efficient, maybe more "pythonic". I'm curious, and if someone would spend some time giving me a pointer in the right (or any) direction, I would be very grateful. As of now, I am aware of the fact that my Python has a strong smell of C#. But I'm learning ...
This is my working implementation. It is efficient enough, but I suspect it can be improved. Especially the de-interleaving part. On my machine it prints:
time to create generator: 0:00:00.000040
time to de-interleave data: 0:00:00.004111
length of channel A is 750: TrueAs you can see, creating the generator takes no amount of time. De-interleaving the data is the real issue. Maybe the data generation and de-interleaving can be done simultaneously?
This is not my first implementation, but I never seem to be able to drop below approx
4 ms.```
from datetime import datetime
def unpack_data(data):
l = len(data)
p = 0
while p > 1 > 1
# generate some test data ...
test_data = ''
for n in range(500 12 2 - 1):
test_data += chr(n % 256)
t0 = datetime.utcnow()
# in this example we have 6000 samples, 8 channels, 750 samples/channel
# data received is interleaved: A1, B1, C1, ..., A2,
Solution
from datetime import datetime
def unpack_data(data):
l = len(data)
p = 0I'd avoid such small variable names, it makes your code harder to follow
while p > 1 > 1
# generate some test data ...
test_data = ''
for n in range(500 * 12 * 2 - 1):
test_data += chr(n % 256)It usually better to put all the pieces of a string in a list and then join them. Python doesn't have good performance for added strings.
t0 = datetime.utcnow()
# in this example we have 6000 samples, 8 channels, 750 samples/channel
# data received is interleaved: A1, B1, C1, ..., A2, B2, C2, ... F750, G750, H750
channels = ('A', 'B', 'C', 'D', 'E', 'F', 'G', 'H')
samples = { channel : [] for channel in channels}
# call unpack_data(), receive a generator
gen = unpack_data(test_data)
t1 = datetime.utcnow()
print 'time to create generator: %s' % (t1-t0)All you've done is created the generator, that won't do any actual work. So you aren't measuring much of anything here. You are still spending much of the time inside the function you've defined after this point.
try:
while True:
for channel in channels:
samples[channel].append(gen.next())
except StopIteration:
passIt's best to avoid dealing with StopIteration directly if you can. In this case you can do:
for sample, channel in zip(gen, itertools.cycle(channels)):
samples[channel].append(sample)itertools.cycle() will give you a generator that goes repeatedly through all the channels in order.print 'time to de-interleave data: %s' % (datetime.utcnow()-t1)
print 'length of channel A is 750: %s' % (len(samples['A']) == 750)You can use numpy, I've done that for you. Basically, numpy lets you do operations over a whole array and that's faster then doing them in your loops. See below:
from datetime import datetime
import numpy
def unpack_data(data):
# reads the string in as a sequence of uint8
data = numpy.fromstring(data, numpy.uint8)
# figure out if the most significant bit is set
# for everything
odds = numpy.logical_not(data & 0x01)
# calculate the interpretation of each number
# both possible ways
singles = data.astype(numpy.int8) >> 1
doubles = singles << 8 + numpy.roll(data, -1)
# I couldn't vectorize this, it fills up the
# result array with True for every actual starting value
result = numpy.empty(data.shape, bool)
current = True
for index, byte in enumerate(odds):
# the next bit is a starting bit if
# if this isn't a starting bit, or the 1 bit wasn't set
current = not current or byte
result[index] = current
# where chooses from the single and doubles
# based on the lsb, and result filters those we actually want
return numpy.where(odds, singles, doubles)[result]
# generate some test data ...
test_data = ''
for n in range(500 * 12 * 2 - 1):
test_data += chr(n % 256)
t0 = datetime.utcnow()
# in this example we have 6000 samples, 8 channels, 750 samples/channel
# data received is interleaved: A1, B1, C1, ..., A2, B2, C2, ... F750, G750, H750
channels = ('A', 'B', 'C', 'D', 'E', 'F', 'G', 'H')
samples = { channel : [] for channel in channels}
# call unpack_data(), receive a generator
data = unpack_data(test_data)
t1 = datetime.utcnow()
print 'time to create generator: %s' % (t1-t0)
# reshape converts 1 dimensional array
# into two dimensional array
data = data.reshape(-1, len(channels))
for index, channel in enumerate(channels):
samples[channel] = data[:,index]
print 'time to de-interleave data: %s' % (datetime.utcnow()-t1)
print 'length of channel A is 750: %s' % (len(samples['A']) == 750)Code Snippets
from datetime import datetime
def unpack_data(data):
l = len(data)
p = 0while p < l:
# convert 'char' or byte to (signed) int8
i1 = (((ord(data[p]) + 128) % 256) - 128)
p += 1
if i1 & 0x01:
# read next 'char' as an (unsigned) uint8
#
# due to the nature of the protocol,
# we will always have sufficient data
# available to avoid reading past the end
i2 = ord(data[p])
p += 1
yield (i1 >> 1 << 8) + i2
else:
yield i1 >> 1
# generate some test data ...
test_data = ''
for n in range(500 * 12 * 2 - 1):
test_data += chr(n % 256)t0 = datetime.utcnow()
# in this example we have 6000 samples, 8 channels, 750 samples/channel
# data received is interleaved: A1, B1, C1, ..., A2, B2, C2, ... F750, G750, H750
channels = ('A', 'B', 'C', 'D', 'E', 'F', 'G', 'H')
samples = { channel : [] for channel in channels}
# call unpack_data(), receive a generator
gen = unpack_data(test_data)
t1 = datetime.utcnow()
print 'time to create generator: %s' % (t1-t0)try:
while True:
for channel in channels:
samples[channel].append(gen.next())
except StopIteration:
passfor sample, channel in zip(gen, itertools.cycle(channels)):
samples[channel].append(sample)Context
StackExchange Code Review Q#20895, answer score: 4
Revisions (0)
No revisions yet.