patternpythonMinor
Synchronization algorithm for deterministic context switching between threads
Viewed 0 times
synchronizationthreadsswitchingalgorithmbetweendeterministicforcontext
Problem
Curious how sqlite3 would handle different race conditions in a multithreaded environment, I created a simple module called
Which outputs:
Again, the idea is to experiment with how real multi-threaded code behaves, but in a highly controller environment.
The actual code follows below. Some specific feedback I'm hoping for:
```
import threading, time, types
def run(*consumers):
sync = Syncrhonizer(len(consumers))
_run_threads(
threading.Thread(target=DeterministicRunner(sync, i, c).run)
for i, c in enumerate(consumers)
)
class Syncrhonizer:
def __init__(self, num_instances):
self._instances = list(range(num_instances))
self._lock = threading.Lock()
self._current = 0
self._want_abort = False
def lock(self, idx):
while True:
deterministic for serializing the execution of multiple threads that lets the programmer explicitly program in the spots where thread-switching happens. Hopefully this example usage will explain the idea better:import deterministic
def main():
deterministic.run(consumer0, consumer1, consumer2)
def consumer0():
print('consumer 0 yields to consumer 2')
yield 2
print('consumer 0 exits')
def consumer1():
print('consumer 1 yields to next consumer')
yield
print('consumer 1 exits')
def consumer2():
print('consumer 2 yields to consumer 1')
yield 1
print('consumer 2 yields to next consumer')
yield
print('consumer 2 exits')
if __name__ == '__main__':
main()Which outputs:
consumer 0 yields to consumer 2
consumer 2 yields to consumer 1
consumer 1 yields to next consumer
consumer 2 yields to next consumer
consumer 0 exits
consumer 1 exits
consumer 2 exitsAgain, the idea is to experiment with how real multi-threaded code behaves, but in a highly controller environment.
The actual code follows below. Some specific feedback I'm hoping for:
- Is there a synchronization primitive that better models what I'm trying to do than simple locking?
- Even if not, is there a better way to structure the code to make it easier to reason about?
- Finally, general coding and style advice is more than welcome
```
import threading, time, types
def run(*consumers):
sync = Syncrhonizer(len(consumers))
_run_threads(
threading.Thread(target=DeterministicRunner(sync, i, c).run)
for i, c in enumerate(consumers)
)
class Syncrhonizer:
def __init__(self, num_instances):
self._instances = list(range(num_instances))
self._lock = threading.Lock()
self._current = 0
self._want_abort = False
def lock(self, idx):
while True:
Solution
Threading
Trying to use
However, in your case, you could simplify further using a
Every thread, except the
Now what have we here? Every thread needing to wait at a synchronization point, loop until a condition becomes available, and check for a flag that indicates if an other thread went through an error… Time to change for a
There is a drawback to using barriers, in that they must be reached by all threads before they are all released at once. Meaning we can't let
Coding Style
It is recommended that
is written
However, since you’re making very little use of
Other than that, there is little to talk about. It is mostly a matter of taste:
-
converting the generator parameter of
and call it with
-
note that when using barriers, you have to
-
-
I'd check that the methods returns an object that has a
You are also using a custom made exception for cleanup purposes but you use generic
Proposed Improvements
```
import threading
class SynchronizeError(Exception):
"""Abort execution because a thread does not respect the API"""
class Synchronizer:
def __init__(self, num_instances):
self._instances = list(range(num_instances))
self._current = 0
self._barrier = threading.Barrier(num_instances)
def lock(self, idx):
while True:
self._barrier.wait()
if not self._instances:
return False
if self._current == idx:
return True
Trying to use
time.sleep with threading.Locks usually means you’d be better of using a threading.Condition instead. It lets you wait passively instead of using an active while True: ... time.sleep(0.001).However, in your case, you could simplify further using a
threading.Event:class Synchronizer:
def __init__(self, num_instances):
...
self._lock = threading.Event()
def lock(self, idx):
while self._current != idx:
self._lock.wait()
if self._want_abort:
raise SynchronizerAbort()
self._lock.clear()
def die(self):
self._want_abort = True
self._lock.set()
def yield_to(self, idx=None):
...
self._current = idx
self._lock.set()Every thread, except the
0\$^{th}\$ one, will enter the while self._current != idx loop and wait for the Event to become available. Once available after thread 0 exits yield_to, only the one with the right ID will exit the loop and all the others will wait on the Event to be set again.Now what have we here? Every thread needing to wait at a synchronization point, loop until a condition becomes available, and check for a flag that indicates if an other thread went through an error… Time to change for a
threading.Barrier (Python 3.2 and up).There is a drawback to using barriers, in that they must be reached by all threads before they are all released at once. Meaning we can't let
DeterministicRunner.run exit before all threads are done. So we must have Synchronizer.lock return a boolean value indicating if we are running out of instances or not.class Synchronizer:
def __init__(self, num_instances):
...
self._barrier = threading.Barrier(num_instances)
def lock(self, idx):
while True: # Force any thread to enter, independently of its idx
self._barrier.wait()
if not self._instances:
# All threads exit at once when everything is done
return False
if self._current == idx:
# Only the active thread can exit otherwise
return True
# No need of `die` anymore
def yield_to(self, idx=None):
if idx is None:
idx = self._get_next_instance()
if idx not in self._instances:
self._barrier.abort() # Make all other thread abort with BrokenBarrierError
raise Exception('Can not yield to non-existent instance "%s"' % idx)
self._current = idxCoding Style
It is recommended that
import threading, time, typesis written
import threading
import time
import typesHowever, since you’re making very little use of
time and types I’d rather recommend the from .. import .. form.Other than that, there is little to talk about. It is mostly a matter of taste:
-
converting the generator parameter of
_run_threads into a list could be done using the variable-length argument syntax:def _run_threads(*threads):
for thread in threads:
thread.start()
for thread in threads:
thread.join()and call it with
_run_threads(*()).-
DeterministicRunner.run might be better written with the try within the while to reduce the amount of protected code:def run(self):
while True:
self._syncer.lock(self.idx)
try:
n = next(self._g)
except StopIteration:
self._syncer.remove(self.idx)
break
else:
self._syncer.yield_to(n)note that when using barriers, you have to
while self._syncer.lock(self.idx): and not break.-
Synchronizer._get_next_instance might be more readable using plain list iteration:def _get_next_instance(self):
for n in self._instances:
if n > self._current:
return n
return self._instances[0]-
I'd check that the methods returns an object that has a
__next__ method rather than being of type GeneratorType. It allows for more flexibility for anyone using your API and wanting to provide a custom class rather than generator functions.You are also using a custom made exception for cleanup purposes but you use generic
Exceptions for errors in using your API. I'd go the other way around and use custom exceptions that are easier to handle for real errors and generic ones for cleanup.Proposed Improvements
```
import threading
class SynchronizeError(Exception):
"""Abort execution because a thread does not respect the API"""
class Synchronizer:
def __init__(self, num_instances):
self._instances = list(range(num_instances))
self._current = 0
self._barrier = threading.Barrier(num_instances)
def lock(self, idx):
while True:
self._barrier.wait()
if not self._instances:
return False
if self._current == idx:
return True
Code Snippets
class Synchronizer:
def __init__(self, num_instances):
...
self._lock = threading.Event()
def lock(self, idx):
while self._current != idx:
self._lock.wait()
if self._want_abort:
raise SynchronizerAbort()
self._lock.clear()
def die(self):
self._want_abort = True
self._lock.set()
def yield_to(self, idx=None):
...
self._current = idx
self._lock.set()class Synchronizer:
def __init__(self, num_instances):
...
self._barrier = threading.Barrier(num_instances)
def lock(self, idx):
while True: # Force any thread to enter, independently of its idx
self._barrier.wait()
if not self._instances:
# All threads exit at once when everything is done
return False
if self._current == idx:
# Only the active thread can exit otherwise
return True
# No need of `die` anymore
def yield_to(self, idx=None):
if idx is None:
idx = self._get_next_instance()
if idx not in self._instances:
self._barrier.abort() # Make all other thread abort with BrokenBarrierError
raise Exception('Can not yield to non-existent instance "%s"' % idx)
self._current = idximport threading, time, typesimport threading
import time
import typesdef _run_threads(*threads):
for thread in threads:
thread.start()
for thread in threads:
thread.join()Context
StackExchange Code Review Q#111072, answer score: 2
Revisions (0)
No revisions yet.