HiveBrain v1.2.0
Get Started
← Back to all entries
patternjavaMinor

Storing messages for the application's runtime

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
theapplicationruntimemessagesforstoring

Problem

I am currently writing a well.. manual testing site for a chat-based bot. The full code can be found on github.

For that purpose I had to keep track of the messages currently in the "system". The stored message format is that of a simple and immutable class with non-complex fields. The code for this format is coming from the bot's implementation.

I wanted to get into thread safety anyways, since it's a personally percieved shortcoming of mine, and thought this to be a good possibility, since the collection will be accessed from multiple threads.

For simplicity I decided to implement the class as a singleton. As of now, there's no contributors aside from myself, so I have to admit the project overall is underdocumented. Any comments about that are "breaking through open doors" as we Germans put it.

MessageTracker.java:

```
package de.vogel612.testclient_javabot.core;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntSupplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import com.gmail.inverseconduit.datatype.ChatMessage;

public final class MessageTracker {

private static final int LIMIT = 200;

private static final MessageTracker INSTANCE = new MessageTracker();

private final ChatMessage[] messages = new ChatMessage[LIMIT];

private final AtomicInteger lastQueryTime = new AtomicInteger(0);

private final AtomicInteger currentItem = new AtomicInteger(0);

private MessageTracker() {}

private long getLimit(final int from) {
return (LIMIT - from + currentItem.get()) % LIMIT;
}

public List newMessages() {
return newMessages(lastQueryTime.get());
}

public List newMessages(final int since) {
final List newMessages;
final long limit = getLimit(since);
synchronized (messages) {
newMessages =

Solution

General Notes

I had to look at your code quite hard before I figured out what it does. Despite having variable names and method names that are descriptive, I still found it hard to 'get it'. Comments describing that the messages array is essentially a ring-buffer, and that the Counter is only used in the Stream, etc.
Threading and concurrency - don't mix systems

Your class uses two distinct strategies for thread management, synchronization, and atomics.

In general, whenever you see a mix of strategies in a single class, it is a warning that something is not right. The Java memory management required for thread-safe concurrency is relatively complicated (though not as complicated as other systems). The two strategies you use affect the model in ways that may not be obvious, and mixing them makes it even harder.

Additionally, the two models use different lock points, so you seldom get the exclusive thread accesses you need when you mix them.

Use just one system in any one class.

In this case, synchronization is the right choice. The atomic classes are useful when you do not have multi-stage operations that need to be thread-safe. Your code needs to increment counters, and add messages at the same time, and that requires at least two related operations in a single locked sequence, so synchronization is your friend for this.
Concurrency Bugs

Your code is not thread safe. Consider this method:

public boolean newUserMessage(String message) {
    ChatMessage userMessage;
    userMessage = ChatMessageUtils.createFromString(message, "You");
    incrementAndWrap();
    messages[currentItem.get()] = userMessage;
    return true;
}


That method does three things:

  • creates a message



  • increments (perhaps wraps) the index



  • inserts the message in to the array.



Steps 2 and 3 should happen as one safe thing. We want each message to go in to its own index.

If two messages are created at the same time, and just before each message is created, the current index is at 2.

one possible order-of-execution will be:

Thread 1            Thread 2

                    create message

create message     
increment/wrap to 3
add to array at 3

                    increment/wrap to 4
                    add to array at 4


That's what would be a successful insert. Even though thread 2 started first, it added after.

But, what if the order was:

Thread 1            Thread 2

                    create message

create message     
increment/wrap to 3

                    increment/wrap to 4

add to array at 4

                    add to array at 4


Notice how the two increments can happen before the add-to-the-array.

Your add-to-array code is:

messages[currentItem.get()] = userMessage;


and that atomic get() may not get the value that was previously set.

So, you have a potential race condition in your atomics. That's why atomics are not the best choice here.

Now, you have a second bug in here. If your computer has 2 CPU's, the threads doing the work may be running on different cores, and have different memory caches. Consider the one thread does the add at position 4, and the other thread does it at position 3.

Since the add to position 4 is in one thread, and that access and assignment to the messages array is not in a memory-controlled block, there is no reason for anything happening on the other CPU to get the updated 'copy' of that array, and you may lose that setting at position 4. Any access to a variable/array that happens in one thread may not get the same value as what's in another thread unless the accesses are all done via the same memory control/lock/monitor.

In your 'read' side (getMessages()) you synchronize on the messages array, but, since the writes were not synchronized, there's no reason to expect the reads to get the written values.

In other words, what you put in to the messages array may not be there when you read from the messages array.
Other Issues

  • When you reset the Tracker, you should null out the values in the messages to allow for garbage collection.



  • when you return all messages, you are leaking encapsulation. The Arrays.asList(messages) call you do, will allow the person who calls your code to change the values in the array. You should return an unrelated copy of the data.



Consider this...

I put this code together to illustrate what I would consider doing. Use it as a guide. I am not thrilled with the int-stream generator, I think there's a better way, but this review really focuses on the concurrency and other bugs:

```
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntSupplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

//import com.gmail.inverseconduit.datatype.ChatMessage;

public final class MessageTracker {

public static MessageTracker getInstance() {
return INSTANCE;
}

priva

Code Snippets

public boolean newUserMessage(String message) {
    ChatMessage userMessage;
    userMessage = ChatMessageUtils.createFromString(message, "You");
    incrementAndWrap();
    messages[currentItem.get()] = userMessage;
    return true;
}
Thread 1            Thread 2

                    create message

create message     
increment/wrap to 3
add to array at 3

                    increment/wrap to 4
                    add to array at 4
Thread 1            Thread 2

                    create message

create message     
increment/wrap to 3

                    increment/wrap to 4

add to array at 4

                    add to array at 4
messages[currentItem.get()] = userMessage;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntSupplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

//import com.gmail.inverseconduit.datatype.ChatMessage;

public final class MessageTracker {

    public static MessageTracker getInstance() {
        return INSTANCE;
    }

    private static final MessageTracker INSTANCE = new MessageTracker();

    private static final int CAPACITY = 200;
    private final ChatMessage[] messages = new ChatMessage[CAPACITY];

    private long messageId = 0;
    private long lastMessageReported = 0;
    private int size = 0;
    

    private MessageTracker() {
    }

    public List<ChatMessage> getMessages() {
        synchronized (messages) {
            // let the system figure it out.
            return newMessages(messageId - CAPACITY);
        }
    }
    
    public List<ChatMessage> newMessages() {
        synchronized (messages) {
            return newMessages(lastMessageReported);
        }
    }

    public List<ChatMessage> newMessages(final long since) {
        if (since < 0) {
            // correct broken input with cheeky recursion
            return newMessages(0);
        }
        synchronized (messages) {
            // cast to int is safe because size is int.
            final int reportCount = (int)Math.min(size, messageId - since);
            final List<ChatMessage> result = new ArrayList<>(reportCount);
            
            int offset = (int)((messageId - reportCount + 1) % CAPACITY);
            for (int i = 0; i < reportCount; i++) {
                result.add(messages[offset]);
                offset = (offset + 1) % CAPACITY;
            }
            
            lastMessageReported = messageId;
            return result;
        }
    }
    
    private boolean addMessage(ChatMessage message) {
        synchronized(messages) {
            messageId++;
            messages[(int)(messageId % CAPACITY)] = message;
            if (size < CAPACITY) {
                size++;
            }
            return true;
        }
    }

    public boolean newBotMessage(String message) {
        return addMessage(ChatMessageUtils.createFromString(message, "Junior"));
    }

    public boolean newUserMessage(String message) {
        return addMessage(ChatMessageUtils.createFromString(message, "You"));
    }

    /**
     * Allows resetting the TestingChatClient. All currently stored messages
     * will be lost.
     */
    public void reset() {
        synchronized (messages) {
            messageId = 0;
            lastMessageReported = 0;
            Arrays.fill(messages, null);
        }
    }
}

Context

StackExchange Code Review Q#77127, answer score: 7

Revisions (0)

No revisions yet.