HiveBrain v1.2.0
Get Started
← Back to all entries
patternpythonMinor

Is this code thread-safe?

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
thiscodethreadsafe

Problem

EDIT: The full source code in question can be found here.

Is this code reasonably safe against dead/livelocks, and if not, how can I fix it?

  • I know processes are usually recommended over threads for operations like this, but processing's version of queue doesn't do what I need.



  • The choose function will be called on 100 objects, but in sequence. I think each one should be finished before the next, but I'm not certain.



-
The appraise function that choose calls only gets values from dicts, and does math on them.

---thread_queue.py---

```
class ThreadsnQueues(threading.Thread):
def __init__(self, queue, out_queue=None, func=None, args=None, semaphore=None):
super(ThreadsnQueues, self).__init__()
self.queue = queue
self.out_queue = out_queue
self.semaphore = semaphore
if func is None:
try:
func = self.queue.get(block=False)
except Empty:
self.func = None
return
if func is not None:
if type(func) is list:
self.func = func[0]
self.args = func[1:]
else:
self.func = func
if args != [] and args != None: self.args = args

class ThreadScore(ThreadsnQueues):
def __init__(self, queue, out_queue, func, semaphore):
super(ThreadScore, self).__init__(queue, out_queue, func, None, semaphore)
def run(self):
try:
s = self.queue.get() #should be a stock object
except Empty:
print "Empty queue?!"
return
except:
return
while True:
try:
self.semaphore.acquire()
ret = self.func(s) #should return a score (int)
self.out_queue.put((s, ret)) #tuple with Stock, int
self.queue.task_done()
self.semaphore.release()
return
except KeyboardInterrupt:
raise

Solution


  • "reasonably safe against deadlocks" is a strange expression :)



  • In ThreadsnQueues the if func is not None: can be turned into an else.



  • Your code in ThreadScore indicates that some exceptions might be expected because you handle them and continue. The problem is if that happens while you have the semaphore acquired then it will not release it. You should move the release into finally block.



  • In choose you join on all threads in multiple places and this code [ p.join(5.0) for p in procs ] gets repeated several times. Should be refactored into a method.



-
I'm not sure what the point of the semaphore is. The way I read you code is:

  • You initialize the semaphore with N



  • Then you spawn N threads each calling acquire/release in it's run method.



This means that all threads can simultaneously enter the block protected by the semaphore. So what's the point of having it?

  • In ThreadScore.run() the main part is the while True loop. However it never seems to take anything of the queue (it only seems to do it once at the beginning of the run method) so how does it work?

Context

StackExchange Code Review Q#32890, answer score: 4

Revisions (0)

No revisions yet.