HiveBrain v1.2.0
Get Started
← Back to all entries
patternpythonMinor

Synchronization algorithm for deterministic context switching between threads

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
synchronizationthreadsswitchingalgorithmbetweendeterministicforcontext

Problem

Curious how sqlite3 would handle different race conditions in a multithreaded environment, I created a simple module called deterministic for serializing the execution of multiple threads that lets the programmer explicitly program in the spots where thread-switching happens. Hopefully this example usage will explain the idea better:

import deterministic

def main():
    deterministic.run(consumer0, consumer1, consumer2)

def consumer0():
    print('consumer 0 yields to consumer 2')
    yield 2
    print('consumer 0 exits')

def consumer1():
    print('consumer 1 yields to next consumer')
    yield
    print('consumer 1 exits')

def consumer2():
    print('consumer 2 yields to consumer 1')
    yield 1
    print('consumer 2 yields to next consumer')
    yield
    print('consumer 2 exits')

if __name__ == '__main__':
    main()


Which outputs:

consumer 0 yields to consumer 2
consumer 2 yields to consumer 1
consumer 1 yields to next consumer
consumer 2 yields to next consumer
consumer 0 exits
consumer 1 exits
consumer 2 exits


Again, the idea is to experiment with how real multi-threaded code behaves, but in a highly controller environment.

The actual code follows below. Some specific feedback I'm hoping for:

  • Is there a synchronization primitive that better models what I'm trying to do than simple locking?



  • Even if not, is there a better way to structure the code to make it easier to reason about?



  • Finally, general coding and style advice is more than welcome



```
import threading, time, types

def run(*consumers):
sync = Syncrhonizer(len(consumers))
_run_threads(
threading.Thread(target=DeterministicRunner(sync, i, c).run)
for i, c in enumerate(consumers)
)

class Syncrhonizer:
def __init__(self, num_instances):
self._instances = list(range(num_instances))

self._lock = threading.Lock()
self._current = 0
self._want_abort = False

def lock(self, idx):
while True:

Solution

Threading

Trying to use time.sleep with threading.Locks usually means you’d be better of using a threading.Condition instead. It lets you wait passively instead of using an active while True: ... time.sleep(0.001).

However, in your case, you could simplify further using a threading.Event:

class Synchronizer:
    def __init__(self, num_instances):
        ...
        self._lock = threading.Event()

    def lock(self, idx):
        while self._current != idx:
            self._lock.wait()
            if self._want_abort:
                raise SynchronizerAbort()
        self._lock.clear()

    def die(self):
        self._want_abort = True
        self._lock.set()

    def yield_to(self, idx=None):
        ...
        self._current = idx
        self._lock.set()


Every thread, except the 0\$^{th}\$ one, will enter the while self._current != idx loop and wait for the Event to become available. Once available after thread 0 exits yield_to, only the one with the right ID will exit the loop and all the others will wait on the Event to be set again.

Now what have we here? Every thread needing to wait at a synchronization point, loop until a condition becomes available, and check for a flag that indicates if an other thread went through an error… Time to change for a threading.Barrier (Python 3.2 and up).

There is a drawback to using barriers, in that they must be reached by all threads before they are all released at once. Meaning we can't let DeterministicRunner.run exit before all threads are done. So we must have Synchronizer.lock return a boolean value indicating if we are running out of instances or not.

class Synchronizer:
    def __init__(self, num_instances):
        ...
        self._barrier = threading.Barrier(num_instances)

    def lock(self, idx):
        while True: # Force any thread to enter, independently of its idx
            self._barrier.wait()
            if not self._instances:
                # All threads exit at once when everything is done
                return False
            if self._current == idx:
                # Only the active thread can exit otherwise
                return True

    # No need of `die` anymore
    def yield_to(self, idx=None):
        if idx is None:
            idx = self._get_next_instance()

        if idx not in self._instances:
            self._barrier.abort() # Make all other thread abort with BrokenBarrierError
            raise Exception('Can not yield to non-existent instance "%s"' % idx)

        self._current = idx


Coding Style

It is recommended that

import threading, time, types


is written

import threading
import time
import types


However, since you’re making very little use of time and types I’d rather recommend the from .. import .. form.

Other than that, there is little to talk about. It is mostly a matter of taste:

-
converting the generator parameter of _run_threads into a list could be done using the variable-length argument syntax:

def _run_threads(*threads):
    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()


and call it with _run_threads(*()).

-
DeterministicRunner.run might be better written with the try within the while to reduce the amount of protected code:

def run(self):
    while True:
        self._syncer.lock(self.idx)
        try:
            n = next(self._g)
        except StopIteration:
            self._syncer.remove(self.idx)
            break
        else:
            self._syncer.yield_to(n)


note that when using barriers, you have to while self._syncer.lock(self.idx): and not break.

-
Synchronizer._get_next_instance might be more readable using plain list iteration:

def _get_next_instance(self):
    for n in self._instances:
        if n > self._current:
            return n
    return self._instances[0]


-
I'd check that the methods returns an object that has a __next__ method rather than being of type GeneratorType. It allows for more flexibility for anyone using your API and wanting to provide a custom class rather than generator functions.

You are also using a custom made exception for cleanup purposes but you use generic Exceptions for errors in using your API. I'd go the other way around and use custom exceptions that are easier to handle for real errors and generic ones for cleanup.

Proposed Improvements

```
import threading

class SynchronizeError(Exception):
"""Abort execution because a thread does not respect the API"""

class Synchronizer:
def __init__(self, num_instances):
self._instances = list(range(num_instances))
self._current = 0

self._barrier = threading.Barrier(num_instances)

def lock(self, idx):
while True:
self._barrier.wait()
if not self._instances:
return False
if self._current == idx:
return True

Code Snippets

class Synchronizer:
    def __init__(self, num_instances):
        ...
        self._lock = threading.Event()

    def lock(self, idx):
        while self._current != idx:
            self._lock.wait()
            if self._want_abort:
                raise SynchronizerAbort()
        self._lock.clear()

    def die(self):
        self._want_abort = True
        self._lock.set()

    def yield_to(self, idx=None):
        ...
        self._current = idx
        self._lock.set()
class Synchronizer:
    def __init__(self, num_instances):
        ...
        self._barrier = threading.Barrier(num_instances)

    def lock(self, idx):
        while True: # Force any thread to enter, independently of its idx
            self._barrier.wait()
            if not self._instances:
                # All threads exit at once when everything is done
                return False
            if self._current == idx:
                # Only the active thread can exit otherwise
                return True

    # No need of `die` anymore
    def yield_to(self, idx=None):
        if idx is None:
            idx = self._get_next_instance()

        if idx not in self._instances:
            self._barrier.abort() # Make all other thread abort with BrokenBarrierError
            raise Exception('Can not yield to non-existent instance "%s"' % idx)

        self._current = idx
import threading, time, types
import threading
import time
import types
def _run_threads(*threads):
    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()

Context

StackExchange Code Review Q#111072, answer score: 2

Revisions (0)

No revisions yet.