HiveBrain v1.2.0
Get Started
← Back to all entries
patternjavaModerate

Extending ThreadPoolExecutor

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
threadpoolexecutorextendingstackoverflow

Problem

I have implemented a ThreadPoolExecutor that will run a Consumer only on elements not already consumed. This code uses Java 8.

The background behind this is that I scan a directory every x time units for which files are present, I must maintain a 100% accuracy on finding files and other mechanisms such as JNotify or a simple plain WatcherService do not achieve that.

```
public class SingleExecutionThreadPoolExecutor extends ThreadPoolExecutor {
private final Consumer consumer;
private final List elementsInProcess = new ArrayList<>();

public SingleExecutionThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, Consumer consumer) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
this.consumer = consumer;
}

public SingleExecutionThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, Consumer consumer, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
this.consumer = consumer;
}

public SingleExecutionThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, Consumer consumer, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
this.consumer = consumer;
}

public SingleExecutionThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, Consumer consumer, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.consumer = consumer;
}

public void execute(E element) {
if (!elementsInProcess.contains(element)) {
super.execute(() -> consumer.acc

Solution

.equals

The last part of your .equals implementation can be simplified to:

return Objects.equals(this.path, other.path) && Objects.equals(this.fileTime, other.fileTime);


HashCode

int hash = 3;
return hash;


Do I even have to comment anything about what I think about this implementation? You use Objects.equals above, you can use Objects.hashCode(this.path, this.fileTime); here. It is preferred if hashCode and equals use the same fields for their operations.

Set or List?

Whenever you use a List in a program, ask yourself the question: Does the order of the elements matter?

In this case, they don't. Use the Set interface and create as HashSet for your elementsInProcess variable.

private final Set elementsInProcess = new HashSet<>();


When using a Set, it is extra important that the elements in the set has properly implemented hashCode and equals! (Or are using the default implementations inherited from the Object class)

Cleanup

I suggest you override the shutdown methods of a ThreadPoolExecutor to clear the elementsInProcess collection.

execute(Runnable command)

Please throw a UnsupportedOperationException or similar in this method:

public void execute(Runnable command) { }


Edit: Scratch that. I think palacsint is absolutely right. By doing nothing here, you're breaking the contract of the Executor interface (which you implement, whether you know it or not - by extending an ExecutorService). The Javadoc for the execute(Runnable command) method says:


Executes the given command at some time in the future.

As it currently stands, you are not doing that and have no real ability of doing so in that method. This fact is an indication that you should not use extends ThreadPoolExecutor and instead use a private ThreadPoolExecutor executor;, and therefore using composition over inheritance.

Concurrency

If two threads would call execute at the same time, there is a concurrency issue on elementsInProcess. You need to synchronize on something there, for minimal blocking I suggest you do something like this:

synchronize (lock) {
    if (elementsInProcess.contains(element)) {
        return;
    }
    elementsInProcess.add(element);
}
super.execute(() -> consumer.accept(element));


Where lock is a private final Object lock = new Object();

Or, you could utilize the fact that the .add method of a set actually returns a boolean indicating if the add was successful and use a Collections.synchronizedSet.

if (!elementsInProcess.add(element)) {
    return;
}
super.execute(() -> consumer.accept(element));


It is important when using a synchronized Set that you only do one method call here, as there could otherwise be a gap between .contains and .add which would again cause concurrency issues between threads.

Code Snippets

return Objects.equals(this.path, other.path) && Objects.equals(this.fileTime, other.fileTime);
int hash = 3;
return hash;
private final Set<E> elementsInProcess = new HashSet<>();
public void execute(Runnable command) { }
synchronize (lock) {
    if (elementsInProcess.contains(element)) {
        return;
    }
    elementsInProcess.add(element);
}
super.execute(() -> consumer.accept(element));

Context

StackExchange Code Review Q#46159, answer score: 10

Revisions (0)

No revisions yet.