HiveBrain v1.2.0
Get Started
← Back to all entries
patternjavaMinor

My EventBus system

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
systemeventbusstackoverflow

Problem

I decided to roll out my own EventBus system which is intended to be thread-safe.

Hence a review should focus extra on thread safety apart from all regular concerns.

The EventBus can work in two ways:

  • You can register events and listeners directly on the EventBus.



  • You can the methods, of a specific object, that are single argument void-methods annotated with @Event.



First the code, then the unit tests below:

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Event { }


public interface EventBus {
    void registerListenersOfObject(final Object callbackObject);

     void registerListener(final Class eventClass, final Consumer eventListener);

    void executeEvent(final Object event);

    void removeListenersOfObject(final Object callbackObject);

     void removeListener(final Class eventClass, final Consumer eventListener);

    void removeAllListenersOfEvent(final Class eventClass);

    void removeAllListeners();
}


```
public class SimpleEventBus implements EventBus {
private final static Set EMPTY_SET = new HashSet<>();

private final ConcurrentMap, Set> eventMapping = new ConcurrentHashMap<>();
private final Class classConstraint;

public SimpleEventBus() {
this(Object.class);
}

public SimpleEventBus(final Class eventClassConstraint) {
this.classConstraint = Objects.requireNonNull(eventClassConstraint);
}

@Override
public void registerListenersOfObject(final Object callbackObject) {
Arrays.stream(callbackObject.getClass().getMethods())
.filter(method -> (method.getAnnotation(Event.class) != null))
.filter(method -> method.getReturnType().equals(void.class))
.filter(method -> method.getParameterCount() == 1)
.forEach(method -> {
Class clazz = method.getParameterTypes()[0];
if (!classConstraint.isAssignableFrom(clazz)) {

Solution

The synchronization in the code is in some places overly broad, and in others, it is absent where it is needed.

synchronizing on eventMapping in your registerListenersOfObject method means that only one thread can be accessing the eventMapping instance at any one time. This defeats using the ConcurrentHashMap concept entirely (where only a small portion of the map is locked and other portions are available for other threads). The granularity of this lock is overly broad.

Inside that lock, you add data to (and potentially create) a HashSet instance. This HashSet is then used in other methods, but without any synchronization. Those other methods may have issues with concurrency because they are not included in any synchronization at all.

@Override
public void executeEvent(final Object event) {
    if (classConstraint.isAssignableFrom(event.getClass())) {
        eventMapping.getOrDefault(event.getClass(), EMPTY_SET).forEach(eventHandler -> eventHandler.invoke(event));
    }
}


in the above code, while performing the forEach, any of the following things are possible (and other things as well, I am sure):

  • data could be added to the Set you are streaming, and that data may, or may not be included in the stream.



  • the stream could throw a ConcurrentModificationException



  • the steam could end early (and some data may not be processed at all.



  • ......



Consider the following code in the SimpleEventBus. This code handles adding and using event handlers (though removing handlers needs to be fixed as well)....

private final void includeEventHandler(final Class clazz, final EventHandler handler) {
    Set existing = eventMapping.get(clazz);
    if (existing == null) {
        final Set created = new HashSet<>();
        // optimistically assume that we are the first thread for this particular class.
        existing = eventMapping.putIfAbsent(clazz, created);
        if (existing == null) {
            // we are the first thread to add one for this clazz
            existing = created;
        }
    }
    synchronized (existing) {
        existing.add(handler);
    }
}

private final EventHandler[] getEventHandlers(final Class clazz) {
    Set handlers = eventMapping.get(clazz);
    if (handlers == null) {
        return new EventHandler[0];
    }
    synchronized(handlers) {
        return handlers.toArray(new EventHandler[handlers.size()]);
    }
}

@Override
public void registerListenersOfObject(final Object callbackObject) {
    Arrays.stream(callbackObject.getClass().getMethods())
            .filter(method -> (method.getAnnotation(Event.class) != null))
            .filter(method -> method.getReturnType().equals(void.class))
            .filter(method -> method.getParameterCount() == 1)
            .forEach(method -> {
                Class clazz = method.getParameterTypes()[0];
                if (!classConstraint.isAssignableFrom(clazz)) {
                    return;
                }
                includeEventHandler(clazz, new MethodEventHandler(method, callbackObject, clazz));
            });
}

@Override
@SuppressWarnings("unchecked")
public  void registerListener(final Class eventClass, final Consumer eventListener) {
    Objects.requireNonNull(eventClass);
    Objects.requireNonNull(eventListener);
    if (!classConstraint.isAssignableFrom(eventClass)) {
        return;
    }
    includeEventHandler(eventClass, new ConsumerEventHandler((Consumer)eventListener));
}

@Override
public void executeEvent(final Object event) {
    if (classConstraint.isAssignableFrom(event.getClass())) {
        Arrays.stream(getEventHandlers(event.getClass())).forEach(eventHandler -> eventHandler.invoke(event));
    }
}


The above code uses the ConcurrentHashMap in a way that is minimally locked. It uses an optimistic process for creating a new HashSet only when it is likely going to be used (instead of creating, and throwing it away almost all the time). It also makes sure that, if one is created in a different thread, and our optimism was proven wrong, that we use the one that other threads are using.

Then, for the actual HashSet, it synchronizes on the whole set, and all operations are completely isolated from other threads.

This is OK, because, the only time there will be thread blocking, is when two threads are accessing the event handlers for a single Class.... which is likely to be uncommon.

Note, that the getHandlers creates a defensive copy of the Set, so that iteration has a consistent copy of the data, and that there does not need to be any locking during the iteration.

Edit: To remove unnecessary work in the code, I would actually recommend the following:

```
private final EventHandler[] getEventHandlers(final Class clazz) {
Set handlers = eventMapping.get(clazz);
if (handlers == null) {
return null;
}
synchronized(handlers) {
return handlers.toArray(new EventHandler[handlers.size()]);
}
}

@Override
public void executeEvent(final Obj

Code Snippets

@Override
public void executeEvent(final Object event) {
    if (classConstraint.isAssignableFrom(event.getClass())) {
        eventMapping.getOrDefault(event.getClass(), EMPTY_SET).forEach(eventHandler -> eventHandler.invoke(event));
    }
}
private final void includeEventHandler(final Class<?> clazz, final EventHandler handler) {
    Set<EventHandler> existing = eventMapping.get(clazz);
    if (existing == null) {
        final Set<EventHandler> created = new HashSet<>();
        // optimistically assume that we are the first thread for this particular class.
        existing = eventMapping.putIfAbsent(clazz, created);
        if (existing == null) {
            // we are the first thread to add one for this clazz
            existing = created;
        }
    }
    synchronized (existing) {
        existing.add(handler);
    }
}

private final EventHandler[] getEventHandlers(final Class<?> clazz) {
    Set<EventHandler> handlers = eventMapping.get(clazz);
    if (handlers == null) {
        return new EventHandler[0];
    }
    synchronized(handlers) {
        return handlers.toArray(new EventHandler[handlers.size()]);
    }
}



@Override
public void registerListenersOfObject(final Object callbackObject) {
    Arrays.stream(callbackObject.getClass().getMethods())
            .filter(method -> (method.getAnnotation(Event.class) != null))
            .filter(method -> method.getReturnType().equals(void.class))
            .filter(method -> method.getParameterCount() == 1)
            .forEach(method -> {
                Class<?> clazz = method.getParameterTypes()[0];
                if (!classConstraint.isAssignableFrom(clazz)) {
                    return;
                }
                includeEventHandler(clazz, new MethodEventHandler(method, callbackObject, clazz));
            });
}

@Override
@SuppressWarnings("unchecked")
public <T> void registerListener(final Class<T> eventClass, final Consumer<? extends T> eventListener) {
    Objects.requireNonNull(eventClass);
    Objects.requireNonNull(eventListener);
    if (!classConstraint.isAssignableFrom(eventClass)) {
        return;
    }
    includeEventHandler(eventClass, new ConsumerEventHandler((Consumer<Object>)eventListener));
}

@Override
public void executeEvent(final Object event) {
    if (classConstraint.isAssignableFrom(event.getClass())) {
        Arrays.stream(getEventHandlers(event.getClass())).forEach(eventHandler -> eventHandler.invoke(event));
    }
}
private final EventHandler[] getEventHandlers(final Class<?> clazz) {
    Set<EventHandler> handlers = eventMapping.get(clazz);
    if (handlers == null) {
        return null;
    }
    synchronized(handlers) {
        return handlers.toArray(new EventHandler[handlers.size()]);
    }
}

@Override
public void executeEvent(final Object event) {
    if (classConstraint.isAssignableFrom(event.getClass())) {
        EventHandler[] handlers = getEventHandlers(event.getClass());
        if (handlers != null) {
            Arrays.stream(handlers).forEach(eventHandler -> eventHandler.invoke(event));
        }
    }
}

Context

StackExchange Code Review Q#48869, answer score: 5

Revisions (0)

No revisions yet.