patternkotlinMinor
Simple thread-safe loading cache based on RxJava
Viewed 0 times
simplerxjavacachethreadloadingsafebased
Problem
I'm sketching a simple thread-safe cache which can load missing values. It is based on RxJava's Observable which also means that it should be possible for a client to join a request for value which is already in flight.
My major concern is how to make it thread-safe whilst keeping an eye at performance and possible deadlocks.
So far I've started with
```
class ObservableCacheDispatcher(val cache: ICache, val dataMapper: IDataMapper): ICacheDispatcher {
/** storage for requests which are being loaded at moment */
val requestsInFlight = ConcurrentHashMap>()
val lock = ReentrantReadWriteLock()
val readLock = lock.readLock()
val writeLock = lock.writeLock()
/**
* Returns data for the given {@code key}. If data is present in {@code cache} and not expired, returns it
* immediately; otherwise loads data using {@code loader}, saves it {@code cache} and returns to caller.
*
* @param key string key which is a unique identifier of data
* @param clazz class of data
* @param loader loader used to load data if it's not present in {@code cache}
* @param expiration date and time in future until cached data remains valid
*
* @return {@link Observable} of data
*/
override fun get(key: String, clazz: Class, loader: () -> T, expiration: Date): Observable {
readLock.lock()
try {
// even if request is removed from requestsInFlight after get() but before if(),
// the returned Observable will still be valid to obtain data from
val observable = requestsInFlight.get(key)
if (observable != null) {
return observable as Observable
}
// If there were no requests in flight, these possibilities exist:
// 1) no request for this key was even made, and
My major concern is how to make it thread-safe whilst keeping an eye at performance and possible deadlocks.
So far I've started with
ReentrantReadWriteLock and ConcurrentHashMap to achieve thread safety. There are comments along code lines used to clarify / justify decisions.```
class ObservableCacheDispatcher(val cache: ICache, val dataMapper: IDataMapper): ICacheDispatcher {
/** storage for requests which are being loaded at moment */
val requestsInFlight = ConcurrentHashMap>()
val lock = ReentrantReadWriteLock()
val readLock = lock.readLock()
val writeLock = lock.writeLock()
/**
* Returns data for the given {@code key}. If data is present in {@code cache} and not expired, returns it
* immediately; otherwise loads data using {@code loader}, saves it {@code cache} and returns to caller.
*
* @param key string key which is a unique identifier of data
* @param clazz class of data
* @param loader loader used to load data if it's not present in {@code cache}
* @param expiration date and time in future until cached data remains valid
*
* @return {@link Observable} of data
*/
override fun get(key: String, clazz: Class, loader: () -> T, expiration: Date): Observable {
readLock.lock()
try {
// even if request is removed from requestsInFlight after get() but before if(),
// the returned Observable will still be valid to obtain data from
val observable = requestsInFlight.get(key)
if (observable != null) {
return observable as Observable
}
// If there were no requests in flight, these possibilities exist:
// 1) no request for this key was even made, and
Solution
I believe you can significantly simplify your code. First of all you can eliminate locks by using
The other thing you can note to simplify you code is the fact that both initial loader and deserializer are actually
You also have to note that you shouldn't use locks because of potential blocking in
And, finally, you can look at RxJava Kotlin bindings here, which is going to be released for M11 soon.
ConcurentHashMap.putIfAbsent or Map.getOrPut. Both of them not guaranteed however in most cases it is acceptable for caches to double load data due to concurrency at get/contains/put of Map.The other thing you can note to simplify you code is the fact that both initial loader and deserializer are actually
Observable that returns the same result. So you can always create observable depends on the case: Observable.from(loader).onComplete(serializer) or Observable.from(deserializer), put it to concurrent map and then return as is to user as loading and deserialization is also potentially blocking operation. You also have to note that you shouldn't use locks because of potential blocking in
cache.get(): Entry and mapper.fromBytes. Also both could take time so other requests couldn't be handled.And, finally, you can look at RxJava Kotlin bindings here, which is going to be released for M11 soon.
Context
StackExchange Code Review Q#84350, answer score: 2
Revisions (0)
No revisions yet.