patternjavaMinor
MailQueue implementation with auto start - stop
Viewed 0 times
withautomailqueuestopstartimplementation
Problem
Previous question was a little portion of the
I finished the
```
public enum MailQueue implements Runnable {
INSTANCE;
private JavaMailSender sender;
private boolean run = false;
private final ConcurrentLinkedQueue mailsToSend = new ConcurrentLinkedQueue();
private final ConcurrentLinkedQueue errorRun = new ConcurrentLinkedQueue();
private final Map mailsWithErrors = new ConcurrentHashMap();
private static final Logger LOGGER = LoggerFactory.getLogger(MailQueue.class);
private static final int WAIT_FAILURE_TIME = 120000;
private static final int MAX_THREADS_SEND_MAIL = 4;
private static final int MAX_ELEMENTS_BEFORE_NEW_THREAD = 25;
private static final AtomicInteger CURRENT_THREADS_SEND_MAIL = new AtomicInteger(0);
@Override
public void run() {
run = true;
CURRENT_THREADS_SEND_MAIL.getAndIncrement();
while (run) {
while (mailsToSend.peek() != null) {
int currentThreads = CURRENT_THREADS_SEND_MAIL.get();
if (currentThreads (MAX_ELEMENTS_BEFORE_NEW_THREAD * currentThreads)) {
new Thread(this).start();
}
MimeMessage message = mailsToSend.remove();
sendMessage(message);
}
}
if (CURRENT_THREADS_SEND_MAIL.decrementAndGet() messages) {
boolean result = mailsToSend.addAll(messages);
if (!run) {
new Thread(this).start();
}
return result;
}
/**
* Removes a specific mail from the error list.
* @param message to remove
* @throws MessagingException When there is a fault with getting recip
mailQueue.I finished the
MailQueue, which has the ability to start and stop itself. I also implemented some more threads for sending when the load becomes greater. This is because we do send at some points over the 15000 mail in a short time. The last MailQueue did need 2hrs after adding mail to empty the Queue.```
public enum MailQueue implements Runnable {
INSTANCE;
private JavaMailSender sender;
private boolean run = false;
private final ConcurrentLinkedQueue mailsToSend = new ConcurrentLinkedQueue();
private final ConcurrentLinkedQueue errorRun = new ConcurrentLinkedQueue();
private final Map mailsWithErrors = new ConcurrentHashMap();
private static final Logger LOGGER = LoggerFactory.getLogger(MailQueue.class);
private static final int WAIT_FAILURE_TIME = 120000;
private static final int MAX_THREADS_SEND_MAIL = 4;
private static final int MAX_ELEMENTS_BEFORE_NEW_THREAD = 25;
private static final AtomicInteger CURRENT_THREADS_SEND_MAIL = new AtomicInteger(0);
@Override
public void run() {
run = true;
CURRENT_THREADS_SEND_MAIL.getAndIncrement();
while (run) {
while (mailsToSend.peek() != null) {
int currentThreads = CURRENT_THREADS_SEND_MAIL.get();
if (currentThreads (MAX_ELEMENTS_BEFORE_NEW_THREAD * currentThreads)) {
new Thread(this).start();
}
MimeMessage message = mailsToSend.remove();
sendMessage(message);
}
}
if (CURRENT_THREADS_SEND_MAIL.decrementAndGet() messages) {
boolean result = mailsToSend.addAll(messages);
if (!run) {
new Thread(this).start();
}
return result;
}
/**
* Removes a specific mail from the error list.
* @param message to remove
* @throws MessagingException When there is a fault with getting recip
Solution
private boolean run = false;A verb as a variable name seems weird to me. Especially as a boolean. Try to make your booleans adjectives, questions or statements instead. In this case, I'd go for
running.You'd get an aptly named
isRunning method from this too, rather than isRun.public enum MailQueue implements Runnable {
INSTANCE;What the...
This is clever abuse of language mechanics, and I don't like it for that specific reason.
Do it the proper way. Have one class that keeps track of the tasks and one class that does the tasks. Not this self-forking madness where you keep a reference to the main instance by a enum variable.
Thread-safety
I was wondering about the thread-safety of your code, so I tested if threads start directly after the
start method is called.for(int threads = 0; threads < 10; threads++){
final int thred = threads;
System.out.println("Creating thread "+thred);
new Thread(new Runnable(){
@Override
public void run()
{
System.out.println("Thread "+thred);
}
}).start();
System.out.println("Created thread "+thred);
}The output?
Creating thread 0
Created thread 0
Creating thread 1
Created thread 1
Creating thread 2
Created thread 2
Thread 0
Thread 1
Creating thread 3
Thread 2
Created thread 3
Creating thread 4
Created thread 4
Thread 3
Creating thread 5
Thread 4
Created thread 5
Creating thread 6
Thread 5
Created thread 6
Creating thread 7
Created thread 7
Creating thread 8
Thread 6
Created thread 8
Thread 7
Creating thread 9
Thread 8
Created thread 9
Thread 9Oh dear. It seems I can create three threads before any threads have even started running.
It is not required for the JVM to start running your thread when you create it.
Thus, you can exceed your max threads here:
@Override
public void run() {
run = true;
CURRENT_THREADS_SEND_MAIL.getAndIncrement();
while (run) {
while (mailsToSend.peek() != null) {
int currentThreads = CURRENT_THREADS_SEND_MAIL.get();
if (currentThreads (MAX_ELEMENTS_BEFORE_NEW_THREAD * currentThreads)) {
new Thread(this).start();
}
MimeMessage message = mailsToSend.remove();
sendMessage(message);
}
}
if (CURRENT_THREADS_SEND_MAIL.decrementAndGet() < 1) {
getErrorThread().start();
}
run = false;
}There's a full queue already of 15000 mails (remember, threads are allowed to be suspended indefinitely, so I can add 15k mails before the VM starts your thread). First thread is created and run. It increments to 1. We put the limit at 2. It sees there's mail, and currently 1 thread. It adds a new thread and sends a message.
The message sending is done, but the other thread hasn't started yet. So we create a new thread.
Repeat until we have ~14975 threads.
That was a single thread breaking your code - so synchronization is not gonna help.
Though whatever you do, you'll want to have synchronization as well.
while (mailsToSend.peek() != null) {
int currentThreads = CURRENT_THREADS_SEND_MAIL.get();
if (currentThreads (MAX_ELEMENTS_BEFORE_NEW_THREAD * currentThreads)) {
new Thread(this).start();
}
MimeMessage message = mailsToSend.remove();
sendMessage(message);
}There's 100 mails. Thread cap at 3 threads.
Thread 1 grabs a mail, starts a thread, sends a mail.
Thread 2 grabs a mail, retrieves the thread counter, suspends.
Thread 1 returns from sending mail, grabs a mail, retrieves the thread counter, suspends.
Thread 2 creates a thread, sends a mail.
Thread 1 creates a thread, sends a mail.
You now have 4 threads.
So how do we fix this?
First, we need synchronization for starting a new thread.
At the top of the class:
private static final Object lockObject = new Object();And in the run method:
while (mailsToSend.peek() != null) {
MimeMessage message = mailsToSend.remove();
synchronized(lockObject){
int currentThreads = CURRENT_THREADS_SEND_MAIL.get();
if (currentThreads (MAX_ELEMENTS_BEFORE_NEW_THREAD * currentThreads)) {
new Thread(this).start();
}
}
sendMessage(message);
}Hurray, synchronization!
Also, I just murdered your throughput (for every mail acquired, a thread must get a lock and free a lock). Clearly, this situation doesn't work.
... additionally, I just realized that you have this:
```
while (run) {
while (mailsToSend.peek() != null) {
int currentThreads = CURRENT_THREADS_SEND_MAIL.get();
if (currentThreads (MAX_ELEMENTS_BEFORE_NEW_THREAD * currentThreads)) {
new Thread(this).start();
}
MimeMessage message = mailsToSend.rem
Code Snippets
private boolean run = false;public enum MailQueue implements Runnable {
INSTANCE;for(int threads = 0; threads < 10; threads++){
final int thred = threads;
System.out.println("Creating thread "+thred);
new Thread(new Runnable(){
@Override
public void run()
{
System.out.println("Thread "+thred);
}
}).start();
System.out.println("Created thread "+thred);
}Creating thread 0
Created thread 0
Creating thread 1
Created thread 1
Creating thread 2
Created thread 2
Thread 0
Thread 1
Creating thread 3
Thread 2
Created thread 3
Creating thread 4
Created thread 4
Thread 3
Creating thread 5
Thread 4
Created thread 5
Creating thread 6
Thread 5
Created thread 6
Creating thread 7
Created thread 7
Creating thread 8
Thread 6
Created thread 8
Thread 7
Creating thread 9
Thread 8
Created thread 9
Thread 9@Override
public void run() {
run = true;
CURRENT_THREADS_SEND_MAIL.getAndIncrement();
while (run) {
while (mailsToSend.peek() != null) {
int currentThreads = CURRENT_THREADS_SEND_MAIL.get();
if (currentThreads < MAX_THREADS_SEND_MAIL && mailsToSend.size() > (MAX_ELEMENTS_BEFORE_NEW_THREAD * currentThreads)) {
new Thread(this).start();
}
MimeMessage message = mailsToSend.remove();
sendMessage(message);
}
}
if (CURRENT_THREADS_SEND_MAIL.decrementAndGet() < 1) {
getErrorThread().start();
}
run = false;
}Context
StackExchange Code Review Q#87823, answer score: 6
Revisions (0)
No revisions yet.