HiveBrain v1.2.0
Get Started
← Back to all entries
patternpythonModerate

Calculating the joint probability of n events from a sample sequence of occurrences

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
thesampleeventsjointoccurrencessequencecalculatingprobabilityfrom

Problem

I'm writing an algorithm to take in a sample list of sequences of events, calculate 1-step transitional probabilities from the sequences, forward or in reverse, then calculate the joint probability of n events occurring from those conditional probabilities.

Eg. if the sample list of sequences is:


['TTAGCAGTCAGT', 'TCAGCGTTCGACTGA']

cond_probs() produces a dictionary {'AT': n1, 'AG': n2, 'AA': n3, 'AC': n4, ... , 'TC': n16}, where, for one, n1 + n2 + n3 + n4 = 1, from those sequences. In reverse, the transitional probabilities are calculated with the sequences in reverse.

With that dictionary, exp_joint_probs() produces another dictionary which, when n = 3, looks like {'TTA': m1, 'TTC': m2, ..., 'GAC': m64}, where 'TTA' is calculated by P('T') P('TT') P('TA'). In reverse, it is P('A') P('AT') P('TT'), but the dictionary entry should still be 'TTA'.

I'm wondering if my algorithm could be cleaner. I feel that perhaps using Numpy would make it much nicer, so a word from a Numpy expert would be great. I'm also looking for help double-checking the math.

```
from collections import Counter
from itertools import product

def exp_joint_probs(seqList, n, reverse = 0):
probs = {}
counts = Counter(''.join(seqList))
states = counts.keys()
sumv = sum(counts.values())
initProbs = {c: counts[c] / float(sumv) for c in counts}
condProbs = cond_probs(seqList, reverse)
paths = product(counts.keys(), repeat = n)
for p in paths:
p = ''.join(p)
if reverse == 0:
probs[p] = initProbs[p[0]]
for i in range(n - 1):
probs[p] = probs[p] * condProbs.get((p[i], p[i + 1]), 0)
else:
probs[p] = initProbs[p[2]]
for i in range(n)[::-1][:-1]:
probs[p] = probs[p] * condProbs.get((p[i], p[i - 1]), 0)
return probs

def cond_probs(seqList, reverse = 0):
counts = {}
probs = {}
for seq in seqList:
if reverse:

Solution

I'm just going to look at cond_probs here. You'll see that there's plenty in this function for one answer.

  1. Comments on your code



-
The line return probs has the wrong indentation. Copy/paste error?

-
There are no docstrings. What do these functions do and how do I call them? (The text from your question would be a good starting-point for docstrings.)

-
There are no test cases. This is the kind of code that would benefit from some unit tests and/or doctests.

-
You write, "cond_probs() produces a dictionary {'AT': n1, 'AG': n2, 'AA': n3, 'AC': n4, ... , 'TC': n16}" but when I run it the dictionary has tuples for keys, not strings:

>>> cond_probs(['AAC'])
{('A', 'C'): 0.5, ('A', 'A'): 0.5}


-
I don't understand the behaviour when reverse=True. You write, "In reverse, the transitional probabilities are calculated with the sequences in reverse" but this doesn't seem to be the case:

>>> cond_probs(['CAA'], reverse=True)
{('A', 'A'): 1.0}


What has happened to C? This looks wrong to me.

If I were writing this, I'd implement cond_probs to run only in the forward direction; to do the reverse direction I would pass in reversed sequences.

-
Prefer using True and False for Booleans, not 1 and 0. So the second parameter to cond_probs should be reverse=False.

-
Instead of:

counts = {}
# ...
counts[X] = counts.get(X, 0) + 1


use collections.Counter and write:

counts = Counter()
# ...
counts[X] += 1


(But see below for how to use collections.Counter.update.)

-
Instead of:

range(len(seq))[::-1][:-1]


write:

range(len(seq) - 1, 0, -1)


(But see below for how to avoid this.)

-
Prefer iteration over elements of sequences instead of iteration over their indexes. So instead of:

for i in range(len(seq) - 1):
    counts[seq[i],seq[i + 1]] = counts.get((seq[i], seq[i + 1]), 0) + 1


use zip:

for t in zip(seq, seq[1:]):
    counts[t] += 1


or, even more simply, using collections.Counter.update:

counts.update(zip(seq, seq[1:]))


(If you are concerned about memory use, then you could use itertools.islice and write islice(seq, 1, None) instead of seq[1:] to avoid the copy.)

-
In this code:

for s in states:
    sCounts = dict((k,v) for (k,v) in counts.items() if k[0] == s)


you iterate over the whole of counts for every state, collecting the ones you want. It would be more efficient to iterate only over the transitions from s. See below for how this would look.

  1. Revised code



def cond_probs_2(sequences):
    """Calculate 1-step transitional probabilities from sequences.
    Return dictionary mapping transitions to probabilities.

    >>> sequences = ['ACABC', 'ABCAB']
    >>> sorted(cond_probs_2(sequences).items())
    ... # doctest: +NORMALIZE_WHITESPACE
    [(('A', 'B'), 0.75),
     (('A', 'C'), 0.25),
     (('B', 'C'), 1.0),
     (('C', 'A'), 1.0)]

    """
    counts = Counter()
    entries = set()
    for seq in sequences:
        entries.update(seq)
        for a, b in zip(seq, islice(seq, 1, None)):
            counts[a, b] += 1
    probs = {}
    for a in entries:
        suma = float(sum(counts[a, b] for b in entries))
        if suma != 0:
            probs.update(((a, b), counts[a, b] / suma) for b in entries
                         if counts[a, b])
    return probs


This is about 30% faster than your code:

>>> from timeit import timeit
>>> from random import choice
>>> data = ''.join(choice('ACGT') for _ in range(1000000))
>>> cond_probs([data]) == cond_probs_2([data])
True
>>> timeit(lambda:cond_probs([data]), number=1) # yours
1.4706195190083236
>>> timeit(lambda:cond_probs_2([data]), number=1) # mine
0.9721288200234994


  1. Rewriting in NumPy



Here's how I'd go about this kind of task in NumPy. NumPy works best with fixed-size numeric data, so I'd encode the input as small integers:

>>> import numpy as np
>>> data = 'TTAGCAGTCAGTTCAGCGTTCGACTGA'
>>> distinct = set(data)
>>> coding = {j:i for i, j in enumerate(distinct)}
>>> coding
{'A': 0, 'C': 1, 'T': 2, 'G': 3}
>>> coded_data = np.fromiter((coding[i] for i in data), dtype=np.uint8)
>>> coded_data
array([2, 2, 0, 3, 1, 0, 3, 2, 1, 0, 3, 2, 2, 1, 0, 3, 1, 3, 2, 2, 1, 3, 0,
       1, 2, 3, 0], dtype=uint8)


(I've chosen the type np.uint8 here because there are only four different characters in the input strings. You might need to choose a different type.)

Then you can encode adjacent pairs of numbers from 0–3 as a single number from 0–15, count the number of occurrences of each pair using numpy.bincount, and decode the result using numpy.reshape:

>>> n = len(distinct)
>>> pairs = coded_data[:-1] + n * coded_data[1:]
>>> counts = np.bincount(pairs, minlength=n*n).reshape(n, n)
>>> counts
array([[0, 3, 1, 2],
       [1, 0, 3, 2],
       [0, 1, 3, 3],
       [4, 2, 1, 0]])


This array represents the transition table:

```
source
│ A C T G
──┼───────────
A

Code Snippets

>>> cond_probs(['AAC'])
{('A', 'C'): 0.5, ('A', 'A'): 0.5}
>>> cond_probs(['CAA'], reverse=True)
{('A', 'A'): 1.0}
counts = {}
# ...
counts[X] = counts.get(X, 0) + 1
counts = Counter()
# ...
counts[X] += 1
range(len(seq))[::-1][:-1]

Context

StackExchange Code Review Q#43568, answer score: 10

Revisions (0)

No revisions yet.