patternjavaMinor
Cache<Long, BlockingDeque<Peer>> combined with Striped<ReadWriteLock>: is it thread-safe
Viewed 0 times
withblockingdequelongpeerstripedcachethreadsafereadwritelockcombined
Problem
I've implemented
Question: Is all those operations thread-safe in my context?
If it isn't thread safe, please, provide an example which demonstrate scenario where I'll get inconsistent data.
```
public class PeersContainer {
private final static int MAX_CACHE_SIZE = 100;
private final static int STRIPES_AMOUNT = 10;
private final static int PEER_ACCESS_TIMEOUT_MIN = 30;
private final static PeersContainer INSTANCE;
static {
INSTANCE = new PeersContainer();
}
public static PeersContainer getInstance() {
return INSTANCE;
}
private Striped stripes = Striped.lazyWeakReadWriteLock(STRIPES_AMOUNT);
private final Cache> peers = CacheBuilder.newBuilder()
.maximumSize(MAX_CACHE_SIZE)
.expireAfterAccess(PEER_ACCESS_TIMEOUT_MIN, TimeUnit.MINUTES)
.removalListener(new RemovalListener>() {
public void onRemoval(RemovalNotification> removal) {
if (removal.getCause() == RemovalCause.EXPIRED) {
//send logOut response to all use'r {@code Peer} objects.
for (Peer peer : removal.getValue()) {
//peer.sendLogOutResponse();
}
}
}
})
.build();
private final Cache authToRestore = CacheBuilder.newBuilder()
.maximumSize(MAX_CACHE_SIZE)
.expireAfterWrite(PEER_ACCESS_TIMEOUT_MIN, TimeUnit.MINUTES)
.build();
public BlockingDeque getPeers(long sessionId) {
Lock lock = stripes.get(sessionId).readLock();
peers.cleanUp();
lock.lock();
try
PeersContainer class with all operations that I need. I need something like multimap in concurrent environment, moreover I need remove entities automatically on timeout. So, I decide to use Cache> combined with Striped. Also I use two caches which can transfer entities between each other.Question: Is all those operations thread-safe in my context?
If it isn't thread safe, please, provide an example which demonstrate scenario where I'll get inconsistent data.
```
public class PeersContainer {
private final static int MAX_CACHE_SIZE = 100;
private final static int STRIPES_AMOUNT = 10;
private final static int PEER_ACCESS_TIMEOUT_MIN = 30;
private final static PeersContainer INSTANCE;
static {
INSTANCE = new PeersContainer();
}
public static PeersContainer getInstance() {
return INSTANCE;
}
private Striped stripes = Striped.lazyWeakReadWriteLock(STRIPES_AMOUNT);
private final Cache> peers = CacheBuilder.newBuilder()
.maximumSize(MAX_CACHE_SIZE)
.expireAfterAccess(PEER_ACCESS_TIMEOUT_MIN, TimeUnit.MINUTES)
.removalListener(new RemovalListener>() {
public void onRemoval(RemovalNotification> removal) {
if (removal.getCause() == RemovalCause.EXPIRED) {
//send logOut response to all use'r {@code Peer} objects.
for (Peer peer : removal.getValue()) {
//peer.sendLogOutResponse();
}
}
}
})
.build();
private final Cache authToRestore = CacheBuilder.newBuilder()
.maximumSize(MAX_CACHE_SIZE)
.expireAfterWrite(PEER_ACCESS_TIMEOUT_MIN, TimeUnit.MINUTES)
.build();
public BlockingDeque getPeers(long sessionId) {
Lock lock = stripes.get(sessionId).readLock();
peers.cleanUp();
lock.lock();
try
Solution
- Thread safety
From a concurrency stand-point, I don't see any issue with the code: reading is guarded by the
readLock, all the updates are guarded by the writeLock, and you're using thread-safe BlockingDeques for the values.However, you're returning the actual
BlockingDeque from the Cache in getPeers(), so you're not mandating going through your PeersContainer to manipulate the values. You depend on how the other objects collaborate.- Other considerations
From a more general stand-point, I'd make a few changes:
- Consistent API usage
public BlockingDeque getPeers(long sessionId) {
Lock lock = stripes.get(sessionId).readLock();
peers.cleanUp();
lock.lock();
try {
return peers.asMap().get(sessionId);
} finally {
lock.unlock();
}
}Why use
asMap()? I'd stay with :return peers.getIfPresent(sessionId);- Intermixing blocks
Especially with locks, I like them close to where I use them. So instead of
Lock lock = stripes.get(sessionId).readLock();
peers.cleanUp();
lock.lock();I'd write
peers.cleanUp();
Lock lock = stripes.get(sessionId).readLock();
lock.lock();- Multiple look-ups
Even if
Cache is fast, no need to abuse it :). In removePeer():UserAuthorities authorities = null;
if (peers.getIfPresent(sessionId) != null) {
authorities = peers.getIfPresent(sessionId).getFirst().getAuthorities();
authToRestore.put(sessionId, authorities);
peers.getIfPresent(sessionId).remove(peer);
}can be replaced by (also fixing the unconditional
getFirst() call):BlockingDeque userPeers = peers.getIfPresent(sessionId);
if (userPeers != null && !userPeers.isEmpty()) {
UserAuthorities authorities = userPeers.getFirst().getAuthorities();
authToRestore.put(sessionId, authorities);
userPeers.remove(peer);
}- Excessive
cleanUp
I understand you don't want to wait for Guava to decide when the removals should actually happen, but forcing it on every access might be overkill (and create more contention), and you still depend on a call to
PeersContainer to trigger the clean-up.You could use some form of scheduler to call a global
PeersContainer.cleanUp() method every minute, instead.Updated to answer the comment.
Unencapsulated modifications
By returning the actual
BlockingDeque from the Cache in getPeers(), you leave the possibility to modify the shared collection outside of the PeersContainer context, which may or not have an impact depending on the "friendliness" of the code using that data. If it's under your control and you never modify (even by accident) the collection, then it should be fine.Otherwise, there are obviously 2 types of modifications:
- removing a
Peer: OK, it's not there anymore, but to thePeersContainerit's not that big a deal, it can also happen if the entry expires in the cache
- adding a
Peer: it won't have itsauthoritiesset, which means it might erase aUserAuthoritiessaved in theauthToRestorecache by the removal of a previousPeerfor the same session. Is it critical? I don't know, it's your application.
Cache expiration
From the
CacheBuilder javadoc:If expireAfterWrite or expireAfterAccess is requested entries may be evicted on each cache modification, on occasional cache accesses, or on calls to Cache.cleanUp(). Expired entries may be counted by Cache.size(), but will never be visible to read or write operations.
Even if your cache doesn't have enough write activity, with enough read activity the expired entries will eventually be removed. But even if they're not removed yet, the
Cache will never return entries once they're expired (it wouldn't be doing a very good job otherwise).The only reason why you may need to call
cleanUp() in your case is because of the nature of your RemovalListener: you want to send some information back to the peer, so you want to do it in a timely manner after the entry has expired. It's up to you to define what "timely" means depending on your requirements, but in my opinion, it could be something between 15 seconds and 5 minutes.Anyway, the way it's implemented now, if you have a single, inactive peer (so no calls at all to the methods of
PeersContainer), it'll never receive a logout, whereas with a scheduled task it would happen no matter what. And on the other hand, if you have enough activity in the application, the clean-up will happen naturally with the writes or numerous reads, and an extra, scheduled cleanUp call won't cost much, whereas calling cleanUp() on each and every call will cause contention because the clean-up needs to sequentially lock all the segments in the Cache to clean them one by one.Another round.
You don't really need a
ReadWriteLock:- the
readLockingetPeers()is useless since theCacheitself is thread-safe
- the one in
restoreSession()is also useless because it's already guarded by the
Code Snippets
public BlockingDeque<Peer> getPeers(long sessionId) {
Lock lock = stripes.get(sessionId).readLock();
peers.cleanUp();
lock.lock();
try {
return peers.asMap().get(sessionId);
} finally {
lock.unlock();
}
}return peers.getIfPresent(sessionId);Lock lock = stripes.get(sessionId).readLock();
peers.cleanUp();
lock.lock();peers.cleanUp();
Lock lock = stripes.get(sessionId).readLock();
lock.lock();UserAuthorities authorities = null;
if (peers.getIfPresent(sessionId) != null) {
authorities = peers.getIfPresent(sessionId).getFirst().getAuthorities();
authToRestore.put(sessionId, authorities);
peers.getIfPresent(sessionId).remove(peer);
}Context
StackExchange Code Review Q#43295, answer score: 8
Revisions (0)
No revisions yet.