patternpythonMinor
Is this code thread-safe?
Viewed 0 times
thiscodethreadsafe
Problem
EDIT: The full source code in question can be found here.
Is this code reasonably safe against dead/livelocks, and if not, how can I fix it?
-
The appraise function that choose calls only gets values from dicts, and does math on them.
---thread_queue.py---
```
class ThreadsnQueues(threading.Thread):
def __init__(self, queue, out_queue=None, func=None, args=None, semaphore=None):
super(ThreadsnQueues, self).__init__()
self.queue = queue
self.out_queue = out_queue
self.semaphore = semaphore
if func is None:
try:
func = self.queue.get(block=False)
except Empty:
self.func = None
return
if func is not None:
if type(func) is list:
self.func = func[0]
self.args = func[1:]
else:
self.func = func
if args != [] and args != None: self.args = args
class ThreadScore(ThreadsnQueues):
def __init__(self, queue, out_queue, func, semaphore):
super(ThreadScore, self).__init__(queue, out_queue, func, None, semaphore)
def run(self):
try:
s = self.queue.get() #should be a stock object
except Empty:
print "Empty queue?!"
return
except:
return
while True:
try:
self.semaphore.acquire()
ret = self.func(s) #should return a score (int)
self.out_queue.put((s, ret)) #tuple with Stock, int
self.queue.task_done()
self.semaphore.release()
return
except KeyboardInterrupt:
raise
Is this code reasonably safe against dead/livelocks, and if not, how can I fix it?
- I know processes are usually recommended over threads for operations like this, but processing's version of queue doesn't do what I need.
- The choose function will be called on 100 objects, but in sequence. I think each one should be finished before the next, but I'm not certain.
-
The appraise function that choose calls only gets values from dicts, and does math on them.
---thread_queue.py---
```
class ThreadsnQueues(threading.Thread):
def __init__(self, queue, out_queue=None, func=None, args=None, semaphore=None):
super(ThreadsnQueues, self).__init__()
self.queue = queue
self.out_queue = out_queue
self.semaphore = semaphore
if func is None:
try:
func = self.queue.get(block=False)
except Empty:
self.func = None
return
if func is not None:
if type(func) is list:
self.func = func[0]
self.args = func[1:]
else:
self.func = func
if args != [] and args != None: self.args = args
class ThreadScore(ThreadsnQueues):
def __init__(self, queue, out_queue, func, semaphore):
super(ThreadScore, self).__init__(queue, out_queue, func, None, semaphore)
def run(self):
try:
s = self.queue.get() #should be a stock object
except Empty:
print "Empty queue?!"
return
except:
return
while True:
try:
self.semaphore.acquire()
ret = self.func(s) #should return a score (int)
self.out_queue.put((s, ret)) #tuple with Stock, int
self.queue.task_done()
self.semaphore.release()
return
except KeyboardInterrupt:
raise
Solution
- "reasonably safe against deadlocks" is a strange expression :)
- In
ThreadsnQueuestheif func is not None:can be turned into anelse.
- Your code in
ThreadScoreindicates that some exceptions might be expected because you handle them and continue. The problem is if that happens while you have the semaphore acquired then it will not release it. You should move the release intofinallyblock.
- In
chooseyou join on all threads in multiple places and this code[ p.join(5.0) for p in procs ]gets repeated several times. Should be refactored into a method.
-
I'm not sure what the point of the semaphore is. The way I read you code is:
- You initialize the semaphore with N
- Then you spawn N threads each calling acquire/release in it's run method.
This means that all threads can simultaneously enter the block protected by the semaphore. So what's the point of having it?
- In
ThreadScore.run()the main part is thewhile Trueloop. However it never seems to take anything of the queue (it only seems to do it once at the beginning of therunmethod) so how does it work?
Context
StackExchange Code Review Q#32890, answer score: 4
Revisions (0)
No revisions yet.