patternjavaMinor
Synchronization Event with awaitAny
Viewed 0 times
withsynchronizationawaitanyevent
Problem
In Java, there is no way I know of to wait for multiple events at the same time (see Stack Overflow). Since I would like to use that feature (similar to
For some more specific questions:
```
public class Event
{
private boolean bSignaled = false;
private static final Object anySignaler = new Object();
/**
* Wait for the event to signal. If the event has been signaled before,
* return immediately.
*/
public void await() throws InterruptedException
{
if (!bSignaled) {
synchronized (this) {
while (!bSignaled) {
this.wait();
}
}
}
}
/**
* Signal the event. This notifies all threads that are waiting for this
* event to occur, as well as all threads that are waiting for any event.
*/
public void signal()
{
bSignaled = true;
sendNotify(this);
sendNotify(anySignaler);
}
/**
* Helper function that calls notifiyAll() inside a locked block.
*/
private static final void sendNotify(Object lock)
{
synchronized (lock) {
lock.notifyAll();
}
}
/**
* Return the signaled state of this event.
*/
public boolean isSignaled()
{
return bSignaled;
}
/**
* @see Event#awaitAll(Event[])
*/
public static
WaitForMultipleObjects() in the Win32 API), I decided to write my own event class that includes this feature. While the class is pretty straight-forward, thread synchronization always has some hidden pitfalls. I would like to uncover those, if any are present.For some more specific questions:
- Are there any possible deadlocks, race conditions, data races or other bugs present in the code?
- Is the synchronization sufficient, or should
bSignaledbe made volatile?
- Are there redundant checks or other things reducing the performance of the code?
- Is there a library class that I have missed, offering the same functionality?
```
public class Event
{
private boolean bSignaled = false;
private static final Object anySignaler = new Object();
/**
* Wait for the event to signal. If the event has been signaled before,
* return immediately.
*/
public void await() throws InterruptedException
{
if (!bSignaled) {
synchronized (this) {
while (!bSignaled) {
this.wait();
}
}
}
}
/**
* Signal the event. This notifies all threads that are waiting for this
* event to occur, as well as all threads that are waiting for any event.
*/
public void signal()
{
bSignaled = true;
sendNotify(this);
sendNotify(anySignaler);
}
/**
* Helper function that calls notifiyAll() inside a locked block.
*/
private static final void sendNotify(Object lock)
{
synchronized (lock) {
lock.notifyAll();
}
}
/**
* Return the signaled state of this event.
*/
public boolean isSignaled()
{
return bSignaled;
}
/**
* @see Event#awaitAll(Event[])
*/
public static
Solution
In general, you should use one, and only one synchronization strategy in any class. You have synchronization, and are contemplating volatiles too.
The best-practice mechanism for now in Java is using the classes from
Your locking at the moment relies on a 'global' lock that all events use, thus if you have many collections of events running, each collection uses the same lock. Thus, if some other Events are happening in some other code block somewhere, we will still be notified or each of those 'remote' event completions, and our completions will notify their awaits....
Additionally, I did not like the way you manage the await-any concept. You literally scan the Events for changes, and check for one to complete.
In essence, I dislike:
Finally, I did not like the method you chose as the core await container... Instead of the
Still, having thrown all that criticism at you, here's how I would probably do it. Note, the logic change for the await....
You have a single static lock. What I suggest is to create a 'completion listener'. In the case of awaitAll, each Event has it's own listener. In the case of awaitAny, they all share a single listener, so there's just one notification.
The system below makes each
This is untested, so go through and understand it well ;-)
```
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Event {
private static final class EventCompletion {
private final Lock plock = new ReentrantLock();
private final Condition pcond = plock.newCondition();
private boolean complete = false;
void completed() {
plock.lock();
try {
if (!complete) {
complete = true;
pcond.signalAll();
}
} finally {
plock.unlock();
}
}
void awaitCompletion() throws InterruptedException {
plock.lock();
try {
while (!complete) {
pcond.await();
}
} finally {
plock.unlock();
}
}
}
private final Lock lock = new ReentrantLock();
private boolean complete = false;
private final List listeners = new ArrayList<>();
private void addListener(EventCompletion ec) {
boolean notifyNow = false;
lock.lock();
try {
if (!complete) {
listeners.add(ec);
} else {
notifyNow = true;
}
} finally {
lock.unlock();
}
if (notifyNow) {
// immediate notification of completion...
ec.completed();
}
}
/**
* Return the signaled state of this Event.
*/
public boolean isSignaled() {
lock.lock();
try {
return complete;
} finally {
lock.unlock();
}
}
/**
* Return the signaled state of this Event.
*/
public void signal() {
EventCompletion[] toNotify = null;
lock.lock();
try {
if (complete) {
return;
}
toNotify = listeners.toArray(new EventCompletion[listeners.size()]);
listeners.clear(); // no need to remember previous notifications....
} finally {
lock.unlock();
}
for (EventCompletion ec : toNotify) {
ec.completed();
}
}
public static void awaitAll(Event ev0, Event... events) throws InterruptedException {
List toWait = new ArrayList<>(1 + events.length);
toWait.add(ev0);
for (Event e : events) {
toWait.add(e);
}
awaitAll(toWait);
}
public static void awaitAll(Event[] events) throws InterruptedException {
awaitAll(Arrays.asList(events));
}
public static void awaitAll(List events) throws InterruptedException {
EventCompletion[] toComplete = new EventCompletion[events.size()];
int pos = 0;
for (Event e : events) {
EventCompletion ec = new EventCompletion();
e.addListener(ec);
toComplete[pos++] = ec;
}
for (EventCompletion ec : toComplete) {
ec.awaitCompletion();
The best-practice mechanism for now in Java is using the classes from
java.util.concurrent.* to manage locking, and memory synchronization. The benefit from there that will help you most, is the Condition concept, as well as the fine-grained management of locks.Your locking at the moment relies on a 'global' lock that all events use, thus if you have many collections of events running, each collection uses the same lock. Thus, if some other Events are happening in some other code block somewhere, we will still be notified or each of those 'remote' event completions, and our completions will notify their awaits....
Additionally, I did not like the way you manage the await-any concept. You literally scan the Events for changes, and check for one to complete.
In essence, I dislike:
- using a static
lock
- using synchronization
- using messy awaitAny logic
Finally, I did not like the method you chose as the core await container... Instead of the
awaitAll(Event[]) I prefer the awaitAll(List...). I cannot defend that preference right now though. Not sure why I prefer it.Still, having thrown all that criticism at you, here's how I would probably do it. Note, the logic change for the await....
You have a single static lock. What I suggest is to create a 'completion listener'. In the case of awaitAll, each Event has it's own listener. In the case of awaitAny, they all share a single listener, so there's just one notification.
The system below makes each
await*(...) an independent operation. The only signals are what's happening that's relevant to the completion service.This is untested, so go through and understand it well ;-)
```
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Event {
private static final class EventCompletion {
private final Lock plock = new ReentrantLock();
private final Condition pcond = plock.newCondition();
private boolean complete = false;
void completed() {
plock.lock();
try {
if (!complete) {
complete = true;
pcond.signalAll();
}
} finally {
plock.unlock();
}
}
void awaitCompletion() throws InterruptedException {
plock.lock();
try {
while (!complete) {
pcond.await();
}
} finally {
plock.unlock();
}
}
}
private final Lock lock = new ReentrantLock();
private boolean complete = false;
private final List listeners = new ArrayList<>();
private void addListener(EventCompletion ec) {
boolean notifyNow = false;
lock.lock();
try {
if (!complete) {
listeners.add(ec);
} else {
notifyNow = true;
}
} finally {
lock.unlock();
}
if (notifyNow) {
// immediate notification of completion...
ec.completed();
}
}
/**
* Return the signaled state of this Event.
*/
public boolean isSignaled() {
lock.lock();
try {
return complete;
} finally {
lock.unlock();
}
}
/**
* Return the signaled state of this Event.
*/
public void signal() {
EventCompletion[] toNotify = null;
lock.lock();
try {
if (complete) {
return;
}
toNotify = listeners.toArray(new EventCompletion[listeners.size()]);
listeners.clear(); // no need to remember previous notifications....
} finally {
lock.unlock();
}
for (EventCompletion ec : toNotify) {
ec.completed();
}
}
public static void awaitAll(Event ev0, Event... events) throws InterruptedException {
List toWait = new ArrayList<>(1 + events.length);
toWait.add(ev0);
for (Event e : events) {
toWait.add(e);
}
awaitAll(toWait);
}
public static void awaitAll(Event[] events) throws InterruptedException {
awaitAll(Arrays.asList(events));
}
public static void awaitAll(List events) throws InterruptedException {
EventCompletion[] toComplete = new EventCompletion[events.size()];
int pos = 0;
for (Event e : events) {
EventCompletion ec = new EventCompletion();
e.addListener(ec);
toComplete[pos++] = ec;
}
for (EventCompletion ec : toComplete) {
ec.awaitCompletion();
Code Snippets
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Event {
private static final class EventCompletion {
private final Lock plock = new ReentrantLock();
private final Condition pcond = plock.newCondition();
private boolean complete = false;
void completed() {
plock.lock();
try {
if (!complete) {
complete = true;
pcond.signalAll();
}
} finally {
plock.unlock();
}
}
void awaitCompletion() throws InterruptedException {
plock.lock();
try {
while (!complete) {
pcond.await();
}
} finally {
plock.unlock();
}
}
}
private final Lock lock = new ReentrantLock();
private boolean complete = false;
private final List<EventCompletion> listeners = new ArrayList<>();
private void addListener(EventCompletion ec) {
boolean notifyNow = false;
lock.lock();
try {
if (!complete) {
listeners.add(ec);
} else {
notifyNow = true;
}
} finally {
lock.unlock();
}
if (notifyNow) {
// immediate notification of completion...
ec.completed();
}
}
/**
* Return the signaled state of this Event.
*/
public boolean isSignaled() {
lock.lock();
try {
return complete;
} finally {
lock.unlock();
}
}
/**
* Return the signaled state of this Event.
*/
public void signal() {
EventCompletion[] toNotify = null;
lock.lock();
try {
if (complete) {
return;
}
toNotify = listeners.toArray(new EventCompletion[listeners.size()]);
listeners.clear(); // no need to remember previous notifications....
} finally {
lock.unlock();
}
for (EventCompletion ec : toNotify) {
ec.completed();
}
}
public static void awaitAll(Event ev0, Event... events) throws InterruptedException {
List<Event> toWait = new ArrayList<>(1 + events.length);
toWait.add(ev0);
for (Event e : events) {
toWait.add(e);
}
awaitAll(toWait);
}
public static void awaitAll(Event[] events) throws InterruptedException {
awaitAll(Arrays.asList(events));
}
public static void awaitAll(List<Event> events) throws InterruptedException {
EventCompletion[] toComplete = new EventCompletion[events.size()];
int pos = 0;
for (Event e : events) {
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class Event {
/* Private class can be used as a synchronization lock too */
private static final class EventCompletion {
private boolean complete = false;
synchronized void completed() {
if (!complete) {
complete = true;
this.notifyAll();
}
}
synchronized void awaitCompletion() throws InterruptedException {
while (!complete) {
this.wait();
}
}
}
private boolean complete = false;
// use private-final listeners class as private lock on public class.
private final List<EventCompletion> listeners = new ArrayList<>();
private void addListener(EventCompletion ec) {
boolean notifyNow = false;
synchronized (listeners) {
if (!complete) {
listeners.add(ec);
} else {
notifyNow = true;
}
}
if (notifyNow) {
// immediate notification of completion...
ec.completed();
}
}
/**
* Return the signaled state of this Event.
*/
public boolean isSignaled() {
synchronized(listeners) {
return complete;
}
}
/**
* Return the signaled state of this Event.
*/
public void signal() {
EventCompletion[] toNotify = null;
synchronized(listeners) {
if (complete) {
return;
}
toNotify = listeners.toArray(new EventCompletion[listeners.size()]);
listeners.clear(); // no need to remember previous notifications....
}
for (EventCompletion ec : toNotify) {
ec.completed();
}
}
public static void awaitAll(Event ev0, Event... events) throws InterruptedException {
List<Event> toWait = new ArrayList<>(1 + events.length);
toWait.add(ev0);
for (Event e : events) {
toWait.add(e);
}
awaitAll(toWait);
}
public static void awaitAll(Event[] events) throws InterruptedException {
awaitAll(Arrays.asList(events));
}
public static void awaitAll(List<Event> events) throws InterruptedException {
EventCompletion[] toComplete = new EventCompletion[events.size()];
int pos = 0;
for (Event e : events) {
EventCompletion ec = new EventCompletion();
e.addListener(ec);
toComplete[pos++] = ec;
}
for (EventCompletion ec : toComplete) {
ec.awaitCompletion();
}
}
public static int awaitAny(Event ev0, Event... events) throws InterruptedException {
List<Event> toWait = new ArrayList<>(1 + events.length);
toWait.add(ev0);
for (Event e : events) {
toWait.add(e);
}
return awaitAny(toWait);
}
public static int aContext
StackExchange Code Review Q#72156, answer score: 5
Revisions (0)
No revisions yet.