patternMinor
Producer - multiple consumer implementation using futures
Viewed 0 times
futuresconsumerusingmultipleimplementationproducer
Problem
I've tried to implement a producer-multiple consumer pattern using futures and I was wondering should it ever deadlock, and if it should, why?
I have this method
The implementation shouldn't deadlock as far as I know and it often doesn't (running it multiple times and telling
```
package comparison_examples.futuresimplementation
import common.MeasurementHelpers._
import common._
import scala.collection.mutable
import scala.concurrent.blocking
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future, Promise}
case class Item(value: Int)
object ProducerConsumer {
private val sharedQueue = mutable.Queue[Item]()
def main(args: Array[String]): Unit = {
val n: Double = if (args.nonEmpty) args(0).toDouble else Configuration.runTimes.toDouble
val numConsumers = if (args.length == 2) args(1).toInt else Configuration.numberOfConsumers
val funRunNTimes = MeasurementHelpers.runNTimes(n.toInt) _
val results = funRunNTimes {
val p = Promise[Boolean]()
val producer = new Producer(p, sharedQueue)
val cs = startConsumers(numConsumers, List[Consumer](), sharedQueue, p)
val fp = producer.start()
val fs = cs.map(x => x.start())
val f = Future.sequence(fs)
Await.result(fp, Duration.Inf)
Await.result(f, Duration.Inf)
val allElementsObtained = cs.flatMap(_.getObtainedItems)
println(allElementsObtained.length)
}
println(s"Run times: $n")
I have this method
RunNTimes which I implemented in order to run the implementation, e.g. 1000 times with 500 consumers.The implementation shouldn't deadlock as far as I know and it often doesn't (running it multiple times and telling
RunNTimes to repeat for 1000 times). However, every once in a while it blocks at the first or second run (I'm running it from sbt). For the sake of simplicity, ignore the RunNTimes method, I've included it only to explain why the occasional deadlock seems strange to me.```
package comparison_examples.futuresimplementation
import common.MeasurementHelpers._
import common._
import scala.collection.mutable
import scala.concurrent.blocking
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future, Promise}
case class Item(value: Int)
object ProducerConsumer {
private val sharedQueue = mutable.Queue[Item]()
def main(args: Array[String]): Unit = {
val n: Double = if (args.nonEmpty) args(0).toDouble else Configuration.runTimes.toDouble
val numConsumers = if (args.length == 2) args(1).toInt else Configuration.numberOfConsumers
val funRunNTimes = MeasurementHelpers.runNTimes(n.toInt) _
val results = funRunNTimes {
val p = Promise[Boolean]()
val producer = new Producer(p, sharedQueue)
val cs = startConsumers(numConsumers, List[Consumer](), sharedQueue, p)
val fp = producer.start()
val fs = cs.map(x => x.start())
val f = Future.sequence(fs)
Await.result(fp, Duration.Inf)
Await.result(f, Duration.Inf)
val allElementsObtained = cs.flatMap(_.getObtainedItems)
println(allElementsObtained.length)
}
println(s"Run times: $n")
Solution
I notice several things.
You have a data race on the shared queue. The consumer has an unsynchronized access to the
The
But the main problem, which is suspect is the source of your deadlocks, is the race condition between the producer and the consumers. Here's what I think happens:
To solve this, you need to
You have a data race on the shared queue. The consumer has an unsynchronized access to the
nonEmpty property.The
notifyAll after you removed an element from the queue is unnecessary. Consumers shouldn't care about other consumers removing items.But the main problem, which is suspect is the source of your deadlocks, is the race condition between the producer and the consumers. Here's what I think happens:
- The producer produces the last item. It notifies all threads, releases the lock, but is preempted before setting the promise.
- Consumer C1 wakes up, sees the queue is non-empty and grabs the element. It notifies all threads, releases the lock, and goes out to add the element to its list.
- The other consumers wake up, see that the queue is empty but the promise is not set, and go back to sleep.
- The producer sets the promise.
- Consumer C1 sees that the promise is set and terminates.
- The other consumers continue sleeping for all eternity.
To solve this, you need to
notifyAll after you've set the promise. You can also probably optimize the whole thing by only calling notifyOne after adding an element.Context
StackExchange Code Review Q#163239, answer score: 4
Revisions (0)
No revisions yet.