patternjavaMinor
Generic Java task-scheduler
Viewed 0 times
genericschedulertaskjava
Problem
I wrote this generic task scheduler for executing tasks in fixed-delay intervals. Can you find anything wrong with it, or issues that may arise from using it for sending something like queued mails in a database?
```
package system;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* a generic scheduler that may be extended to provide fixed-delay execution to tasks
* @author Willie Scholtz
* @param the type of bean to be scheduled
*/
public abstract class Scheduler> {
private static final Log LOG = LogFactory.getLog(Scheduler.class);
private static final int MAX_THREADS = Runtime.getRuntime().availableProcessors();
private Timer timer = null;
private final int delay;
/**
* creates a new scheduler
* @param delay the delay in seconds between executions
*/
public Scheduler(final int delay) {
this.delay = delay;
}
/**
* creates a new timer for executing tasks
* @param seconds number of seconds between each execution
* @return a Timer
*/
private Timer getSchedulerTimer(int seconds) {
final String cName = this.getClass().getSimpleName();
final Timer sTimer = new Timer(cName + " Scheduler", false);
final TimerTask sTask = new TimerTask() {
@Override
public void run() {
LOG.debug("before running " + cName + " timer");
Scheduler.this.runScheduler();
LOG.debug("after running " + cName + " timer");
}
};
```
package system;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* a generic scheduler that may be extended to provide fixed-delay execution to tasks
* @author Willie Scholtz
* @param the type of bean to be scheduled
*/
public abstract class Scheduler> {
private static final Log LOG = LogFactory.getLog(Scheduler.class);
private static final int MAX_THREADS = Runtime.getRuntime().availableProcessors();
private Timer timer = null;
private final int delay;
/**
* creates a new scheduler
* @param delay the delay in seconds between executions
*/
public Scheduler(final int delay) {
this.delay = delay;
}
/**
* creates a new timer for executing tasks
* @param seconds number of seconds between each execution
* @return a Timer
*/
private Timer getSchedulerTimer(int seconds) {
final String cName = this.getClass().getSimpleName();
final Timer sTimer = new Timer(cName + " Scheduler", false);
final TimerTask sTask = new TimerTask() {
@Override
public void run() {
LOG.debug("before running " + cName + " timer");
Scheduler.this.runScheduler();
LOG.debug("after running " + cName + " timer");
}
};
Solution
Your question would do better with a more detailed description. The way I understand your situation is:
Despite your assurances that a ScheduledExecutorService is not up to your needs, I think you are wrong. But, for good reasons, not bad reasons.
the
What you want to do in order to solve the problem, is to have two classes. One class is a
Then, you should have another thread pool that, all it does, is sit there and pull items off the queue, and then dump them on to your parallel-thread
I would do the second class as a nested class of the first.
So, in your class, I would have a few things:
Then, the start method would be:
The stop method would be:
Now, what you have, is a system that can add a repeating task to a service, and the task can be removed.
All the task does, is dump 'ready' tasks on to the queue.
The rest of the problem is 'how do you handle the queue'....
For that, it's somewhat easy. The problem here is that you will need to wait for a future completion.... so, create Runnable that waits, and logs for a future:
Then, we have the 'worker' thread pool that processes the actual jobs, and a logger thread pool that awaits (and logs) the terminations....
The bottom line is that you:
Geniet, en sterkte.
- you accumulate a bunch of 'notifications' that need to be mailed in the database
- on a fixed schedule you find all the notifications that are now 'due', and you 'pull' them in using
List tasks = this.getTasksForExecution(now);
- you then process those notifications in parallel using the service
pool
Despite your assurances that a ScheduledExecutorService is not up to your needs, I think you are wrong. But, for good reasons, not bad reasons.
the
java.util.concurrent.* tools often need to thought of in a back-to-front manner, and, if you reverse the logic of your problem, the solution is actually quite simple.What you want to do in order to solve the problem, is to have two classes. One class is a
Runnable that gets scheduled on a ScheduledExecutorService. It periodically runs, and, when it does, it creates Tasks that, instead of processing immediately, it just dumps them on to a LinkedBlockingQueue. It does not do anything more.Then, you should have another thread pool that, all it does, is sit there and pull items off the queue, and then dump them on to your parallel-thread
pool service.I would do the second class as a nested class of the first.
So, in your class, I would have a few things:
private final AtomocReference> ticker = new AtomicReference<>();
private final ScheduledExecutorService tickserver;
private final LinkedBlockingQueue taskQ = new LinkedBlockingQueue<>();
private final int delay;
// This runnable can be scheduled repeatedly, and will add tasks to the queue.
private final Runnable tickRunner = new Runnable() {
@Override
public void run() {
taskQ.addAll(getTasksForExecution(new Date()));
}
};Then, the start method would be:
public void start() {
if (ticker.get() == null) {
// try not to start multiple scheduled tasks, but, if we do, it's OK.
ScheduledFuture ntick = tickserver.scheduleAtFixedRate(new TickRunner(), delay, delay, TimeUnit.SECONDS);
if (!ticker.compareAndSet(null, ntick)) {
// some other thread started and raced us, and won.
ntick.cancel();
}
}
}The stop method would be:
public void stop() {
ScheduledFuture tick = ticker.getAndSet(null);
if (tick != null) {
tick.cancel();
}
}Now, what you have, is a system that can add a repeating task to a service, and the task can be removed.
All the task does, is dump 'ready' tasks on to the queue.
The rest of the problem is 'how do you handle the queue'....
For that, it's somewhat easy. The problem here is that you will need to wait for a future completion.... so, create Runnable that waits, and logs for a future:
private final class TaskLogger implements Runnable() {
private final Future tolog;
public TaskLogger(Future tolog) {
this.tolog = tolog;
}
@Override
public void run() {
try {
final T sendTaskOp = tolog.get(1L, TimeUnit.MINUTES);
LOG.info("task[" + sendTaskOp + "] executed...");
} catch (InterruptedException ex) {
LOG.error("interupted while executing task - " + ex.getMessage(), ex);
} catch (ExecutionException ex) {
LOG.error("error while executiong task - " + ex.getMessage(), ex);
} catch (TimeoutException ex) {
LOG.error("executing the task timed out! - " + ex.getMessage(), ex);
}
}
}Then, we have the 'worker' thread pool that processes the actual jobs, and a logger thread pool that awaits (and logs) the terminations....
final ExecutorService workerpool = Executors.newFixedThreadPool(MAX_THREADS);
final ExecutorService loggerpool = Executors.newCachedThreadPool();
private void processTasks() {
while (true) {
try {
// wait for a task by blocking on the taskQ.
// submit the task to the worker pool,
// and wait for the result in the log pool
loggerpool.submit(new TaskLogger(workerpool.submit(taskQ.take())));
} catch (InterruptedException ie) {
// do something, not sure what...
}
}
}The bottom line is that you:
- use a schedule to pull jobs to process.
- feed them on to a blocking queue
- take them off the queue and feed them on to a 'worker thread pool'.
- have one thread per task that waits for the task to complete (or time out), and logs the result.
Geniet, en sterkte.
Code Snippets
private final AtomocReference<ScheduledFuture<?>> ticker = new AtomicReference<>();
private final ScheduledExecutorService tickserver;
private final LinkedBlockingQueue<T> taskQ = new LinkedBlockingQueue<>();
private final int delay;
// This runnable can be scheduled repeatedly, and will add tasks to the queue.
private final Runnable tickRunner = new Runnable() {
@Override
public void run() {
taskQ.addAll(getTasksForExecution(new Date()));
}
};public void start() {
if (ticker.get() == null) {
// try not to start multiple scheduled tasks, but, if we do, it's OK.
ScheduledFuture<?> ntick = tickserver.scheduleAtFixedRate(new TickRunner(), delay, delay, TimeUnit.SECONDS);
if (!ticker.compareAndSet(null, ntick)) {
// some other thread started and raced us, and won.
ntick.cancel();
}
}
}public void stop() {
ScheduledFuture<?> tick = ticker.getAndSet(null);
if (tick != null) {
tick.cancel();
}
}private final class TaskLogger implements Runnable() {
private final Future<T> tolog;
public TaskLogger(Future<T> tolog) {
this.tolog = tolog;
}
@Override
public void run() {
try {
final T sendTaskOp = tolog.get(1L, TimeUnit.MINUTES);
LOG.info("task[" + sendTaskOp + "] executed...");
} catch (InterruptedException ex) {
LOG.error("interupted while executing task - " + ex.getMessage(), ex);
} catch (ExecutionException ex) {
LOG.error("error while executiong task - " + ex.getMessage(), ex);
} catch (TimeoutException ex) {
LOG.error("executing the task timed out! - " + ex.getMessage(), ex);
}
}
}final ExecutorService workerpool = Executors.newFixedThreadPool(MAX_THREADS);
final ExecutorService loggerpool = Executors.newCachedThreadPool();
private void processTasks() {
while (true) {
try {
// wait for a task by blocking on the taskQ.
// submit the task to the worker pool,
// and wait for the result in the log pool
loggerpool.submit(new TaskLogger(workerpool.submit(taskQ.take())));
} catch (InterruptedException ie) {
// do something, not sure what...
}
}
}Context
StackExchange Code Review Q#67531, answer score: 3
Revisions (0)
No revisions yet.