HiveBrain v1.2.0
Get Started
← Back to all entries
patternpythonMinor

Optimize Cython code with np.ndarray contained

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
ndarraywithcontainedcythonoptimizecode

Problem

Can someone help me further optimize the following Cython code snippets? Specifically, a and b are np.ndarray with int value (range(256)) in them. They are one dimension arrays with dynamic length. resultHamming is a one-dimension array with float value in it (dynamic length). bits is an int list (size 256).

The function is to compare two dynamic length bit vector, and return a similarity value as the distance of the two, where the length of each vector is a multiple of 2048-bit (256 bytes). I want to find the best match between these two bit vector by comparing each 2048-bit block, where each bit vector is represented as ndarray (read the bit sequence byte by byte, thus each position is range from 0 to 2^8 = 256). Rule for matching is to find global minimum distance between all block pairs and allow one block in A to be matched with more than one block in B if they have smaller distance. Always compare the smaller size vector against the larger one.

The following code assumes b vector is smaller. We can limit resultHamming to be smaller than size of numArrayB and only record numArrayB smallest distance value, but need to track the current size when inserting new value into it. Even with current case (record all the pairwise distance), we actually know the final size of resultHamming at the beginning.

```
def compare(a, b):
cdef double dis, hammingTotal = 0
cdef int numArrayA = int(a.size/256)
cdef int numArrayB = int(b.size/256)
cdef int i, j, k, l, index
bits = list(xrange(256))
# Prepare a bit number table for fast query
for l in xrange(256):
# nnz() counts the number of 1s in value
bits[l] = nnz(l)

resultHamming = []
for i in xrange(numArrayB):
# Count the number of 1-bits in i-th block of B
onesB = sum(bits[b[k+256*i]] for k in xrange(256))
for j in xrange(numArrayA):
# Count the number of 1-bits in j-th block of A
onesA = sum(bi

Solution

As a general rule for optimizing with cython, try to avoid python function calls if possible, i.e. replace pythonic sum with numpy sum or explicit for loop, replace bit number table with numpy array, and try to declare types for all variables.

You can also precalculate most of the variables used in distance calculation and use a separate array to hold minimum distances for each block. I quickly put something together and got the code below.

Just wondering, what is the rationale for normalizing the distance with the number of set bits?

import cython
import numpy as np
cimport numpy as np

cdef bitCount(unsigned char value):
    cdef count = 0
    while(value):
        value &= value - 1
        count += 1
    return(count)

def compare(np.ndarray[np.uint8_t, ndim=1, mode='c'] a, 
            np.ndarray[np.uint8_t, ndim=1, mode='c'] b):

    cdef int nblocksA = int(a.size/256)
    cdef int nblocksB = int(b.size/256)
    cdef int i, j, k

    cdef double partial_hamming_distance

    # Prepare a bit number table for fast query
    cdef np.ndarray[np.uint8_t, ndim=1] bits = np.zeros(256, dtype=np.uint8)
    for i in range(256):
        bits[i] = bitCount(i) 

    cdef np.ndarray[np.float_t, ndim=1] min_distances = np.ones(nblocksB,
                                                                dtype=np.float)
    min_distances *= 1000000

    cdef np.ndarray[np.float_t, ndim=1] set_bitsA = np.zeros(nblocksA, dtype=np.float)
    for i in range(nblocksA):
        for k in range(256):
            set_bitsA[i] += bits[a[k + 256 * i]]

    cdef np.ndarray[np.float_t, ndim=1] set_bitsB = np.zeros(nblocksB, dtype=np.float)
    for i in range(nblocksB):
        for k in range(256):
            set_bitsB[i] += bits[b[k + 256 * i]]

    for i in range(nblocksB):
        for j in range(nblocksA):
            partial_hamming_distance = 0

            for k in range(256):
                partial_hamming_distance += (bits[b[i*256+k] ^ a[j*256+k]] 
                                          / (set_bitsB[i] + set_bitsA[j]))

            if partial_hamming_distance < min_distances[i]:
                min_distances[i] = partial_hamming_distance

    return np.sum(min_distances)

Code Snippets

import cython
import numpy as np
cimport numpy as np

cdef bitCount(unsigned char value):
    cdef count = 0
    while(value):
        value &= value - 1
        count += 1
    return(count)


def compare(np.ndarray[np.uint8_t, ndim=1, mode='c'] a, 
            np.ndarray[np.uint8_t, ndim=1, mode='c'] b):

    cdef int nblocksA = int(a.size/256)
    cdef int nblocksB = int(b.size/256)
    cdef int i, j, k

    cdef double partial_hamming_distance

    # Prepare a bit number table for fast query
    cdef np.ndarray[np.uint8_t, ndim=1] bits = np.zeros(256, dtype=np.uint8)
    for i in range(256):
        bits[i] = bitCount(i) 

    cdef np.ndarray[np.float_t, ndim=1] min_distances = np.ones(nblocksB,
                                                                dtype=np.float)
    min_distances *= 1000000

    cdef np.ndarray[np.float_t, ndim=1] set_bitsA = np.zeros(nblocksA, dtype=np.float)
    for i in range(nblocksA):
        for k in range(256):
            set_bitsA[i] += bits[a[k + 256 * i]]

    cdef np.ndarray[np.float_t, ndim=1] set_bitsB = np.zeros(nblocksB, dtype=np.float)
    for i in range(nblocksB):
        for k in range(256):
            set_bitsB[i] += bits[b[k + 256 * i]]

    for i in range(nblocksB):
        for j in range(nblocksA):
            partial_hamming_distance = 0

            for k in range(256):
                partial_hamming_distance += (bits[b[i*256+k] ^ a[j*256+k]] 
                                          / (set_bitsB[i] + set_bitsA[j]))

            if partial_hamming_distance < min_distances[i]:
                min_distances[i] = partial_hamming_distance

    return np.sum(min_distances)

Context

StackExchange Code Review Q#43413, answer score: 4

Revisions (0)

No revisions yet.