HiveBrain v1.2.0
Get Started
← Back to all entries
patternjavaMinor

Sending data to a database in size-limited chunks

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
sizechunkssendingdatabaselimiteddata

Problem

I have a method which takes a parameter which is Partition enum. This method will be called by multiple background threads around same time period by passing different value of partition. Here dataHoldersByPartition is a map of Partition and ConcurrentLinkedQueue.

private void validateAndSend(final Partition partition) {  
    ConcurrentLinkedQueue dataHolders = dataHoldersByPartition.get(partition);
    Map clientKeyBytesAndProcessBytesHolder = new HashMap<>();
    int totalSize = 0;      
    DataHolder dataHolder;
    while ((dataHolder = dataHolders.poll())  != null) {      
      byte[] clientKeyBytes = dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8);
      if (clientKeyBytes.length > 255)
        continue;
      byte[] processBytes = dataHolder.getProcessBytes();

      int clientKeyLength = clientKeyBytes.length;
      int processBytesLength = processBytes.length;

      int additionalLength = clientKeyLength + processBytesLength;
      if (totalSize + additionalLength > 50000) {
        Message message = new Message(partition, clientKeyBytesAndProcessBytesHolder);
        sendToDatabase(message.getAddress(), message.getLocation());
        clientKeyBytesAndProcessBytesHolder.clear(); // watch out for gc
        totalSize = 0;
      }
      clientKeyBytesAndProcessBytesHolder.put(clientKeyBytes, processBytes);
      totalSize += additionalLength;
    }
    // calling again with remaining values only if clientKeyBytesAndProcessBytesHolder is not empty
    if(!clientKeyBytesAndProcessBytesHolder.isEmpty()) {
        Message message = new Message(partition, clientKeyBytesAndProcessBytesHolder);
        sendToDatabase(message.getAddress(), message.getLocation());        
    }
  }


In the method, I will iterate dataHolders CLQ and I will extract clientKeyBytes and processBytes from it. Here is the validation that I am supposed to do:

  • If the clientKeyBytes length is greater than 255 then I will skip it and continue iteratin

Solution

I waited a few days to get other answers to your questions. now I feel I can add my 2 cents.

First off, the obvious: This is one single method. It does:

  • Polling a queue



  • Serialising a message into a byte[]



  • Validating the messages



  • Packaging in 50Kb packages



  • Sending chunks to a Database



The obvious thing to do is split those actions into individual methods, and move the right methods into the right objects which allow you to use a nicer syntax (right now, too much is in the current single method of the current objec, so your hands are tied).
Our goal is to make your code more readable, like this:

public void batchSend(final Partition partition) {
    Queue dataHolderQueue = dataHoldersByPartition.get(partition);
    DataHolder data = dataHolderQueue.poll();
    Message message = new Message();
    while (data != null) {
        if(data.isValid()){
            if(message.hasRoomFor(data)){
                message.append(dataHolder);
            } else {
                // Message is at maximum capacity, send it
                database.send(message);
                message = new Message();
            }
        }
        data = dataHolderQueue.poll();
    }
    if(!message.isEmpty()) {
        database.send(message); // Queue is empty, sending remaining message
    }
}


I simply modified the Message Object, which now holds a bunch of DataHolder's serialized data and tells you when you reach a certain size limit. I don't really understand how you concatenate the messages together (it's all hidden in the Message class, which was not included) so most of the below is just a guess:

public class Message {
    public static final int MAX_BYTES_PER_MESSAGE = 50000;
    private final Map data = new HashMap<>();
    private int size = 0;
    ... Other fields I don't know about ...
    public boolean hasRoomFor(DataHolder holder){
        return size + holder.getSizeIncludingHeader()  50000 ?
        data.put(clientKeyBytes, processBytes);
    }
    public boolean isEmpty(){
        return map.isEmpty();
    }
    // provide getters on the data field for sendTodatabase() method
    ... Other methods I don't know about ...
}


Also modified the DataHolder Object to add:

public boolean dataHolder.isValid(){
    return dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8) <= 255;
}


Indeed, its validity seem to be an intrinsic property of a DataHolder. By this I mean a DataHolder either is valid, or isn't, but it does not depend on external factors. This is a perfect candidate to be made a method of the DataHolder Object.

Weird message usage

This is weird:

Message message = new Message(partition, clientKeyBytesAndProcessBytesHolder);
   sendToDatabase(message.getAddress(), message.getLocation());


You're creating an instance of message, but not using the instance, only some of its fields, via getters. The point of using a Message Object is to keep its constitutive elements consistent inside, so those elements are only consistent if using the instance.

By allowing sendToDatabase to have two split parameter, you're allowing the caller to pass two inconsistent parameters:

sendToDatabase(anAddress, aTotallyUnrelatedLocation);


The obvious fix is to pass in the Message Object entirely, the method will just extract the right data:

boolean sendToDatabase(Message message){
    Address address = message.getAddress();
    Location location = message.getLocation();
    ... do Stuff
    return successStatus;
}


It is additionally syntaxically more pleasing to send a Message rather than an address and a location.

By the way, why would the message body not be sent as well? This is confusing. I would have expected sendToDatabase(Message message, Address address) at least. Maybe the Location is the Message body? It does not make sense to me.

GC Handling

This should never be written:

clientKeyBytesAndProcessBytesHolder.clear(); // watch out for gc


The GC knows his stuff. Don't 'help' him. It knows if the collection needs to be GC'ed, or if the data in it does. This call doesn't even make the memory available!
If the collection must be cleared, it must be for business reasons, not because of GC.

Also be careful, if you clear() it, but the sendToDatabase() spawned a Thread which holds onto the reference a bit longer, you've just deleted all data before it was sent.

The best would be to overwrite the reference to the collection with a new one:

clientKeyBytesAndProcessBytesHolder = new HashMap<>();


With this, you signal to the GC that it can get rid of the Collection (you no longer keep a handle), so when no one else holds a reference either, it will be recycled.

Late validation

I wonder why you validate() your messages so late. You only validate them just before sending them. You could do this check earlier, for example when the message is sent to the queues.

This is the "Fail Fast" principle: Don't keep corrupt data in yo

Code Snippets

public void batchSend(final Partition partition) {
    Queue<DataHolder> dataHolderQueue = dataHoldersByPartition.get(partition);
    DataHolder data = dataHolderQueue.poll();
    Message message = new Message();
    while (data != null) {
        if(data.isValid()){
            if(message.hasRoomFor(data)){
                message.append(dataHolder);
            } else {
                // Message is at maximum capacity, send it
                database.send(message);
                message = new Message();
            }
        }
        data = dataHolderQueue.poll();
    }
    if(!message.isEmpty()) {
        database.send(message); // Queue is empty, sending remaining message
    }
}
public class Message {
    public static final int MAX_BYTES_PER_MESSAGE = 50000;
    private final Map<byte[], byte[]> data = new HashMap<>();
    private int size = 0;
    ... Other fields I don't know about ...
    public boolean hasRoomFor(DataHolder holder){
        return size + holder.getSizeIncludingHeader() <= 50000;
    }
    public void append(DataHolder holder){
        byte[] clientKeyBytes = dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8);
        byte[] processBytes = dataHolder.getProcessBytes();
        size += clientKeyBytes.length + processBytes.length;
        // Maybe throw InvalidOperationException is size > 50000 ?
        data.put(clientKeyBytes, processBytes);
    }
    public boolean isEmpty(){
        return map.isEmpty();
    }
    // provide getters on the data field for sendTodatabase() method
    ... Other methods I don't know about ...
}
public boolean dataHolder.isValid(){
    return dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8) <= 255;
}
Message message = new Message(partition, clientKeyBytesAndProcessBytesHolder);
   sendToDatabase(message.getAddress(), message.getLocation());
sendToDatabase(anAddress, aTotallyUnrelatedLocation);

Context

StackExchange Code Review Q#154446, answer score: 7

Revisions (0)

No revisions yet.