HiveBrain v1.2.0
Get Started
← Back to all entries
patternpythonMinor

Thread-safe memoizer

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
memoizerthreadsafe

Problem

I have searched around but I was not able to find a complete implementation of the memoize pattern in a multithreaded context. It's the first time playing with thread synchronization. The code seems to works, but I am not able to test it extensively, in particular to find if there are race conditions. Do you think there is a better implementation, maybe using more advanced synchronization mechanism?

from threading import Thread, Lock, current_thread
import thread

def memoize(obj):
    import functools
    cache = obj.cache = {}
    lock_cache = obj.lock_cache = Lock()

    @functools.wraps(obj)
    def memoizer(*args, **kwargs):
        key = str(args) + str(kwargs)
        cached_result = None
        item_lock = None
        with lock_cache:
            # do the minimal needed things here: just look inside the cache,
            # return if present, otherwise create and acquire lock  specific to a
            # particular computation. Postpone the actual computation, do  not
            # block other computation.
             cached_result = cache.get(key, None)
            if cached_result is not None:
                 if type(cached_result) is not thread.LockType:
                     return cached_result
            else:
                item_lock = Lock()
                item_lock.acquire()
                cache[key] = item_lock

        if item_lock is not None:
            # if we are here it means we have created the lock: do the
            # computation, return and release.
            result = obj(*args, **kwargs)
            with lock_cache:
                cache[key] = result
            item_lock.release()
            return result

        if type(cached_result) is thread.LockType:
            # if we are here it means that other thread have created the
            # locks and are doing the computation, just wait
            with cached_result:
                return cache[key]
    return memoizer


Here a little test:

```
import time

@mem

Solution


  1. Review



-
There's no docstring. What does memoize do, and how do I use it?

-
The code uses str(args) + str(kwargs) as the cache key. But this means that, for example, the number 1 and the string "1" will collide.

-
The code is not portable to Python 3, where there's no thread module.

-
It's not clear to me why functools is only imported inside memoize, but threading is imported at top level.

-
It's not clear to me why the code assigns to obj.cache and obj.lock_cache. What if obj already has a cache property?

-
The locking logic is hard to follow and needs comments. As far as I can see, lock_cache protects the cache dictionary; and item_cache protects the cache slot that is currently being computed. Its necessary to have two locks because the call to obj might be recursive and so need to re-enter memoizer.

-
The code uses three states for each key: (i) not present in the cache; (ii) currently being computed; (iii) present in the cache. The way it distinguishes between (ii) and (iii) is by checking to see if the type is thread.LockType. But this means that you can't use memoize on a function that might return a lock. It would be better to have a more robust mechanism for distinguishing these cases.

-
The naming is inconsistent: lock_cache versus item_lock.

-
key is looked up in cache multiple times on some code paths.

  1. Revised code



from functools import wraps
from threading import Lock

class CacheEntry: pass

def memoized(f):
    """Decorator that caches a function's result each time it is called.
    If called later with the same arguments, the cached value is
    returned, and not re-evaluated. Access to the cache is
    thread-safe.

    """
    cache = {}                  # Map from key to CacheEntry
    cache_lock = Lock()         # Guards cache

    @wraps(f)
    def memoizer(*args, **kwargs):
        key = args, tuple(kwargs.items())
        result_lock = None

        with cache_lock:
            try:
                entry = cache[key]
            except KeyError:
                entry = cache[key] = CacheEntry()
                result_lock = entry.lock = Lock() # Guards entry.result
                result_lock.acquire()

        if result_lock:
            # Compute the new entry without holding cache_lock, to avoid
            # deadlock if the call to f is recursive (in which case
            # memoizer is re-entered).
            result = entry.result = f(*args, **kwargs)
            result_lock.release()
            return result
        else:
            # Wait on entry.lock without holding cache_lock, to avoid
            # blocking other threads that need to use the cache.
            with entry.lock:
                return entry.result

    return memoizer


Notes:

-
This is untested.

-
This doesn't delete result_lock after the result is computed. This simplifies the logic (for example, there's no need to take cache_lock again and no need for a type check) at the cost of 40 bytes to keep the Lock object around.

Code Snippets

from functools import wraps
from threading import Lock

class CacheEntry: pass

def memoized(f):
    """Decorator that caches a function's result each time it is called.
    If called later with the same arguments, the cached value is
    returned, and not re-evaluated. Access to the cache is
    thread-safe.

    """
    cache = {}                  # Map from key to CacheEntry
    cache_lock = Lock()         # Guards cache

    @wraps(f)
    def memoizer(*args, **kwargs):
        key = args, tuple(kwargs.items())
        result_lock = None

        with cache_lock:
            try:
                entry = cache[key]
            except KeyError:
                entry = cache[key] = CacheEntry()
                result_lock = entry.lock = Lock() # Guards entry.result
                result_lock.acquire()

        if result_lock:
            # Compute the new entry without holding cache_lock, to avoid
            # deadlock if the call to f is recursive (in which case
            # memoizer is re-entered).
            result = entry.result = f(*args, **kwargs)
            result_lock.release()
            return result
        else:
            # Wait on entry.lock without holding cache_lock, to avoid
            # blocking other threads that need to use the cache.
            with entry.lock:
                return entry.result

    return memoizer

Context

StackExchange Code Review Q#91656, answer score: 5

Revisions (0)

No revisions yet.