patternMinor
Debounce in Scala
Viewed 0 times
debouncescalastackoverflow
Problem
This is my attempt at implementing debounce in Scala without any 3rd party library. How can I improve the code/make it more idiomatic (and any bugs I might have missed)?
import scala.compat.Platform.{currentTime => now}
import scala.concurrent.duration._
import scala.util.control.NonFatal
/**
* Return a function f that it can only be invoked iff it is not already running
* and atleast wait duration has passed since it last stopped running
* Usage:
* def plus1(x: Int) = println(x + 1)
* val f = debounce(1.second)(plus1)
* f(1)
* f(2)
* Thread.sleep(2000)
* f(3)
*
* @return a function such that it returns Some(original output) if it was invoked
* or else if it failed to run because of above rules, None
*/
def debounce[A, B](wait: Duration)(f: A => B): A => Option[B] = {
var (isRunning, lastStopTime) = (false, Long.MinValue)
(input: A) => {
if (!isRunning && lastStopTime + wait.toMillis throw e
} finally {
isRunning = false
lastStopTime = now
}
} else {
None
}
}
}Solution
The most noticeable problem is the race condition on the access of
For example:
When running making many concurrent calls to
The easiest way to fix this is to synchronize access to
This will have some performance implications, particularly if
I also removed the
I should note there is also a fundamental difference between the way your method behaves, and the linked javascript function. Yours intends to prevent
For fun, here is a version that uses
Similarly to the
isRunning within the returned anonymous function. It is possible that two different threads can call the function at the same exact time, so they both see that isRunning == true before it is flipped to false.For example:
def test(i: Int): Int = { println(i); i }
val db = debounce(1.seconds)(test)When running making many concurrent calls to
db, it occasionally will leak an extra Some, noted by the extra printed number (occurred less than 1ms after the original call).scala> (0 to 10000000).par.map(db)
0
5001723The easiest way to fix this is to synchronize access to
isRunning (and really, the whole function), which can be done by surrounding the result of the function with this.synchronized:def debounce[A, B](wait: Duration)(f: A => B): A => Option[B] = {
var (isRunning, lastStopTime) = (false, Long.MinValue)
(input: A) => this.synchronized {
if (!isRunning && lastStopTime + wait.toMillis <= now) {
try {
isRunning = true
Some(f(input))
} finally {
isRunning = false
lastStopTime = now
}
} else {
None
}
}
}This will have some performance implications, particularly if
f is slow, since we are locking on it's evaluation. That is, you can end up blocking the calling threads when making multiple concurrent calls while waiting for the first f to evaluate.I also removed the
catch block, as it was superfluous.I should note there is also a fundamental difference between the way your method behaves, and the linked javascript function. Yours intends to prevent
f from being evaluated more than once during a specific duration after an evaluation (rate limiting). Underscore's debounce method will not evaluate the function at all until one wait period after the last call to it has expired. That is, if wait was one second and my function db was called once every 0.5 seconds repeatedly, the contained function would never be evaluated (with the underscore behavior). The author of that post does not seem to understand that underscore's debounce is not for rate limiting, but for calling a function after a stream of input has been interrupted for an arbitrary amount of time.For fun, here is a version that uses
AtomicBoolean, similarly to a ReentrantLock. It should be better than synchronize in that it won't block calling threads while f is evaluating. It uses compareAndSet to first check that isRunning is false, and then sets it to true if it was already false, but only if the wait hasn't expired. This ensures that it isn't currently running, and it doesn't prematurely re-lock before the time has elapsed.import java.util.concurrent.atomic.AtomicBoolean
def debounce[A, B](wait: Duration)(f: A => B): A => Option[B] = {
var (isRunning, lastStopTime) = (new AtomicBoolean(false), Long.MinValue)
(input: A) => {
val doneWaiting = lastStopTime + wait.toMillis <= now
if (isRunning.compareAndSet(false, doneWaiting) && doneWaiting) {
try {
Some(f(input))
} finally {
lastStopTime = now
isRunning.set(false)
}
} else {
None
}
}
}Similarly to the
ReentrantLock though, this suffers from contention to check the status of isRunning. Which means that if there are many concurrent calls to compareAndSet, it will actually delay the call to set. This shouldn't really be a problem unless a ridiculous amount of calls are being made, though.Code Snippets
def test(i: Int): Int = { println(i); i }
val db = debounce(1.seconds)(test)scala> (0 to 10000000).par.map(db)
0
5001723def debounce[A, B](wait: Duration)(f: A => B): A => Option[B] = {
var (isRunning, lastStopTime) = (false, Long.MinValue)
(input: A) => this.synchronized {
if (!isRunning && lastStopTime + wait.toMillis <= now) {
try {
isRunning = true
Some(f(input))
} finally {
isRunning = false
lastStopTime = now
}
} else {
None
}
}
}import java.util.concurrent.atomic.AtomicBoolean
def debounce[A, B](wait: Duration)(f: A => B): A => Option[B] = {
var (isRunning, lastStopTime) = (new AtomicBoolean(false), Long.MinValue)
(input: A) => {
val doneWaiting = lastStopTime + wait.toMillis <= now
if (isRunning.compareAndSet(false, doneWaiting) && doneWaiting) {
try {
Some(f(input))
} finally {
lastStopTime = now
isRunning.set(false)
}
} else {
None
}
}
}Context
StackExchange Code Review Q#98407, answer score: 3
Revisions (0)
No revisions yet.