HiveBrain v1.2.0
Get Started
← Back to all entries
patternkotlinMinor

Simple thread-safe loading cache based on RxJava

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
simplerxjavacachethreadloadingsafebased

Problem

I'm sketching a simple thread-safe cache which can load missing values. It is based on RxJava's Observable which also means that it should be possible for a client to join a request for value which is already in flight.

My major concern is how to make it thread-safe whilst keeping an eye at performance and possible deadlocks.

So far I've started with ReentrantReadWriteLock and ConcurrentHashMap to achieve thread safety. There are comments along code lines used to clarify / justify decisions.

```
class ObservableCacheDispatcher(val cache: ICache, val dataMapper: IDataMapper): ICacheDispatcher {

/** storage for requests which are being loaded at moment */
val requestsInFlight = ConcurrentHashMap>()

val lock = ReentrantReadWriteLock()

val readLock = lock.readLock()

val writeLock = lock.writeLock()

/**
* Returns data for the given {@code key}. If data is present in {@code cache} and not expired, returns it
* immediately; otherwise loads data using {@code loader}, saves it {@code cache} and returns to caller.
*
* @param key string key which is a unique identifier of data
* @param clazz class of data
* @param loader loader used to load data if it's not present in {@code cache}
* @param expiration date and time in future until cached data remains valid
*
* @return {@link Observable} of data
*/
override fun get(key: String, clazz: Class, loader: () -> T, expiration: Date): Observable {
readLock.lock()
try {
// even if request is removed from requestsInFlight after get() but before if(),
// the returned Observable will still be valid to obtain data from
val observable = requestsInFlight.get(key)
if (observable != null) {
return observable as Observable
}

// If there were no requests in flight, these possibilities exist:
// 1) no request for this key was even made, and

Solution

I believe you can significantly simplify your code. First of all you can eliminate locks by using ConcurentHashMap.putIfAbsent or Map.getOrPut. Both of them not guaranteed however in most cases it is acceptable for caches to double load data due to concurrency at get/contains/put of Map.
The other thing you can note to simplify you code is the fact that both initial loader and deserializer are actually Observable that returns the same result. So you can always create observable depends on the case: Observable.from(loader).onComplete(serializer) or Observable.from(deserializer), put it to concurrent map and then return as is to user as loading and deserialization is also potentially blocking operation.

You also have to note that you shouldn't use locks because of potential blocking in cache.get(): Entry and mapper.fromBytes. Also both could take time so other requests couldn't be handled.

And, finally, you can look at RxJava Kotlin bindings here, which is going to be released for M11 soon.

Context

StackExchange Code Review Q#84350, answer score: 2

Revisions (0)

No revisions yet.