HiveBrain v1.2.0
Get Started
← Back to all entries
patternpythonMinor

A pythonic way of de-interleaving a list (i.e. data from a generator), into multiple lists

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
pythonicinterleavingintowaygeneratormultiplelistslistfromdata

Problem

I've recently discovered the wonders of the Python world, and am quickly learning. Coming from Windows/C#/.NET, I find it refreshing working in Python on Linux. A day you've learned something new is not a day wasted.

I need to unpack data received from a device. Data is received as a string of "bytes", of arbitrary length. Each packet (string) consists of samples, for eight channels. The number of samples varies, but will always be a multiple of the number of channels. The channels are interleaved. To make things a bit more complex, samples can be either 8 or 16 bits in length. Check the code, and you'll see.

I've already got a working implementation. However, as I've just stumbled upon generators, iterators, maps and ... numpy, I suspect there might be a more efficient way of doing it. If not efficient, maybe more "pythonic". I'm curious, and if someone would spend some time giving me a pointer in the right (or any) direction, I would be very grateful. As of now, I am aware of the fact that my Python has a strong smell of C#. But I'm learning ...

This is my working implementation. It is efficient enough, but I suspect it can be improved. Especially the de-interleaving part. On my machine it prints:

time to create generator: 0:00:00.000040
time to de-interleave data: 0:00:00.004111
length of channel A is 750: True


As you can see, creating the generator takes no amount of time. De-interleaving the data is the real issue. Maybe the data generation and de-interleaving can be done simultaneously?

This is not my first implementation, but I never seem to be able to drop below approx 4 ms.

```
from datetime import datetime

def unpack_data(data):
l = len(data)
p = 0

while p > 1 > 1

# generate some test data ...
test_data = ''
for n in range(500 12 2 - 1):
test_data += chr(n % 256)

t0 = datetime.utcnow()

# in this example we have 6000 samples, 8 channels, 750 samples/channel
# data received is interleaved: A1, B1, C1, ..., A2,

Solution

from datetime import datetime

def unpack_data(data):
    l = len(data)
    p = 0


I'd avoid such small variable names, it makes your code harder to follow

while p > 1 > 1

# generate some test data ...
test_data = ''
for n in range(500 * 12 * 2 - 1):
    test_data += chr(n % 256)


It usually better to put all the pieces of a string in a list and then join them. Python doesn't have good performance for added strings.

t0 = datetime.utcnow()

# in this example we have 6000 samples, 8 channels, 750 samples/channel
# data received is interleaved: A1, B1, C1, ..., A2, B2, C2, ... F750, G750, H750
channels = ('A', 'B', 'C', 'D', 'E', 'F', 'G', 'H')

samples = { channel : [] for channel in channels}

# call unpack_data(), receive a generator
gen = unpack_data(test_data)

t1 = datetime.utcnow()

print 'time to create generator: %s' % (t1-t0)


All you've done is created the generator, that won't do any actual work. So you aren't measuring much of anything here. You are still spending much of the time inside the function you've defined after this point.

try:
    while True:
        for channel in channels:
            samples[channel].append(gen.next())
except StopIteration:
    pass


It's best to avoid dealing with StopIteration directly if you can. In this case you can do:

for sample, channel in zip(gen, itertools.cycle(channels)):
     samples[channel].append(sample)


itertools.cycle() will give you a generator that goes repeatedly through all the channels in order.

print 'time to de-interleave data: %s' % (datetime.utcnow()-t1)

print 'length of channel A is 750: %s' % (len(samples['A']) == 750)


You can use numpy, I've done that for you. Basically, numpy lets you do operations over a whole array and that's faster then doing them in your loops. See below:

from datetime import datetime
import numpy

def unpack_data(data):
    # reads the string in as a sequence of uint8
    data = numpy.fromstring(data, numpy.uint8)
    # figure out if the most significant bit is set
    # for everything
    odds = numpy.logical_not(data & 0x01)

    # calculate the interpretation of each number
    # both possible ways
    singles = data.astype(numpy.int8) >> 1
    doubles = singles << 8 + numpy.roll(data, -1)

    # I couldn't vectorize this, it fills up the 
    # result array with True for every actual starting value
    result = numpy.empty(data.shape, bool)
    current = True
    for index, byte in enumerate(odds):
        # the next bit is a starting bit if
        # if this isn't a starting bit, or the 1 bit wasn't set
        current = not current or byte
        result[index] = current

    # where chooses from the single and doubles
    # based on the lsb, and result filters those we actually want
    return numpy.where(odds, singles, doubles)[result]

# generate some test data ...
test_data = ''
for n in range(500 * 12 * 2 - 1):
    test_data += chr(n % 256)

t0 = datetime.utcnow()

# in this example we have 6000 samples, 8 channels, 750 samples/channel
# data received is interleaved: A1, B1, C1, ..., A2, B2, C2, ... F750, G750, H750
channels = ('A', 'B', 'C', 'D', 'E', 'F', 'G', 'H')

samples = { channel : [] for channel in channels}

# call unpack_data(), receive a generator
data = unpack_data(test_data)

t1 = datetime.utcnow()
print 'time to create generator: %s' % (t1-t0)

# reshape converts 1 dimensional array
# into two dimensional array
data = data.reshape(-1, len(channels))
for index, channel in enumerate(channels):
    samples[channel] = data[:,index]

print 'time to de-interleave data: %s' % (datetime.utcnow()-t1)

print 'length of channel A is 750: %s' % (len(samples['A']) == 750)

Code Snippets

from datetime import datetime

def unpack_data(data):
    l = len(data)
    p = 0
while p < l:
        # convert 'char' or byte to (signed) int8
        i1 = (((ord(data[p]) + 128) % 256) - 128)
        p += 1
        if i1 & 0x01:
            # read next 'char' as an (unsigned) uint8
            #
            # due to the nature of the protocol,
            # we will always have sufficient data
            # available to avoid reading past the end
            i2 = ord(data[p])
            p += 1
            yield (i1 >> 1 << 8) + i2
        else:
            yield i1 >> 1


# generate some test data ...
test_data = ''
for n in range(500 * 12 * 2 - 1):
    test_data += chr(n % 256)
t0 = datetime.utcnow()

# in this example we have 6000 samples, 8 channels, 750 samples/channel
# data received is interleaved: A1, B1, C1, ..., A2, B2, C2, ... F750, G750, H750
channels = ('A', 'B', 'C', 'D', 'E', 'F', 'G', 'H')

samples = { channel : [] for channel in channels}

# call unpack_data(), receive a generator
gen = unpack_data(test_data)

t1 = datetime.utcnow()

print 'time to create generator: %s' % (t1-t0)
try:
    while True:
        for channel in channels:
            samples[channel].append(gen.next())
except StopIteration:
    pass
for sample, channel in zip(gen, itertools.cycle(channels)):
     samples[channel].append(sample)

Context

StackExchange Code Review Q#20895, answer score: 4

Revisions (0)

No revisions yet.