HiveBrain v1.2.0
Get Started
← Back to all entries
patternjavaModerate

Find Top 10 IP out of more than 5GB data

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
topthanmore5gbfinddataout

Problem

I have a few of files, and total size of them is more than 5 GB. Each line of the files is a IP address, looks like:

127.0.0.1 reset success

...

127.0.0.2 reset success

how can i find Top10 frequently IP in 25s within 500M memory?
Here is my code,but it takes probably 64s and use more than 1G memory.

```
package main;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.charset.Charset;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

public class TopK {
public TreeMap tMap = new TreeMap();

public static int MaxLength = 1024 1024 10;

public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(12);
FileHelper fileHelper = new FileHelper(args[0]);
fileHelper.getFileList(args[0]);
System.out.println("begin submit: " + LocalTime.now());
TopK testCountWords = new TopK();
fileHelper.allFiles.parallelStream().forEach(file -> {
int count = (int) (file.length() / MaxLength + 1);
for (int i = 0; i result = topK(testCountWords.tMap, 5);
result.stream().forEach(System.out::println);
}

public static List topK(Map ips, int k) {
List ipAndCounts = new ArrayList<>();

for (Map.Entry entry : ips.entrySet()) {
ipAndCounts.add(new IPAndCount(entry.getKey(), entry.getValue()));
}

return ipAndCounts.parallelStream().sorted((ic1, ic2) -> {
return ic2.getCount() - ic1.getCount();
}).colle

Solution

Yours is an interesting problem, for sure, and problems like this can often be solved better by using more primitive data structures. Additionally, concurrency is often useful, but in IO based input operations your bottleneck is often the IO component, and not the processing.

So, multiple input files may seem like a good thing to parallelize, but the reality is that IO is often sequential. Admittedly, in the current age of SSDs and so on, random access is less of a concern, but I doubt that having parallel IO streams is helping you at all.

So, three major items for you to consider:

  • Remove all concurrency for this problem - your inputs are sequential, and are probably also your bottleneck. You can sequentially read 5GB of data at 100MB/s on a decent HDD in 50 seconds, and on an SSD you can get that done in 10 seconds.



  • Do the work in two operations - one to count each record, and the second to sort the results.



  • Use a better data structure. I would consider using a sparse trie-like primitive int[][][] array. See https://en.wikipedia.org/wiki/Trie



The nature of most network-based operation is that almost all traffic comes from a small set of subnets, so your data will be clustered in small number of high-frequency clusters. The array I would use to count in would be something like:

private int[][][] counters = new int[1 << 16][][];


That creates an array of 65K pointers to int[][] arrays. Now, you take your IP addresses, like 127.0.0.1 and the first two bytes 127.0 is the index in to that array. 127.0 is, in hex, 0x7f00, so make sure that there's an array populated in that location:

/**
 * Locate the counter for the given IP address (creating it if necessary) and increment it.
 * @returns the newly incremented count for the given IP.
 */
private static int incrementCount(int[][][] counters, String ipString) {
    // .... ipString = "127.0.0.1"
    byte[] ip = InetAddress.getByName(ipString).getAddress();
    int majorIndex = ip[0] << 8 + ip[1];
    int[][] midTrie = counters[majorIndex];
    if (midTrie == null) {
        midTrie = new int[256][];
        counters[majorIndex] = midTrie;
    }
    if (midTrie[ip[2]] == null) {
        midTrie[ip[2]] = new int[256];
    }
    return ++midTrie[ip[2]][ip[3]];
}


OK, so the above function will maintain the counts for each IP address in a primitive data structure, and with little waste of memory. In a worst-case scenario, where you have traffic from every subnet in the planet, you will run out of memory..... but you will have other problems at that point.

So, with the above, you can then build the full set of counters for your data, and after that, you can post-process it and extract the topX count for the IP's.

That topX count is done with a normal mechanism of managing a list of size X containing the IP and count of the largest values.... something like:

private static class Candidate {
    private String ip;
    private int count;
    // ....
}

// walk the entire trie of counters, scanning it all.
List topX = new ArrayList<>(x);
for (int i = 0; i < counters.length; i++) {
    if (counters[i] == null) {
        continue;
    }
    for (int j = 0; j < counters[i].length; j++) {
        if (counters[i][j] == null) {
            continue;
        }
        for (int k = 0; k < counters[i][j].length; k++) {
            int count = counters[i][j][k];
            if (count == 0) {
                continue;
            }
            checkCounter(topX, x, i, j, k, count);
        }
    }
}


Now, your checkCounter method just needs to see whether the count supplied is better than the smallest value already in the topX list, and, if it is, it needs to insert the new value at the correct (sorted) position, and if the list is now larger than size x, it should discard the smallest value.

These operations would all be encumbered by concurrency checking, etc.

Code Snippets

private int[][][] counters = new int[1 << 16][][];
/**
 * Locate the counter for the given IP address (creating it if necessary) and increment it.
 * @returns the newly incremented count for the given IP.
 */
private static int incrementCount(int[][][] counters, String ipString) {
    // .... ipString = "127.0.0.1"
    byte[] ip = InetAddress.getByName(ipString).getAddress();
    int majorIndex = ip[0] << 8 + ip[1];
    int[][] midTrie = counters[majorIndex];
    if (midTrie == null) {
        midTrie = new int[256][];
        counters[majorIndex] = midTrie;
    }
    if (midTrie[ip[2]] == null) {
        midTrie[ip[2]] = new int[256];
    }
    return ++midTrie[ip[2]][ip[3]];
}
private static class Candidate {
    private String ip;
    private int count;
    // ....
}

// walk the entire trie of counters, scanning it all.
List<Candidate> topX = new ArrayList<>(x);
for (int i = 0; i < counters.length; i++) {
    if (counters[i] == null) {
        continue;
    }
    for (int j = 0; j < counters[i].length; j++) {
        if (counters[i][j] == null) {
            continue;
        }
        for (int k = 0; k < counters[i][j].length; k++) {
            int count = counters[i][j][k];
            if (count == 0) {
                continue;
            }
            checkCounter(topX, x, i, j, k, count);
        }
    }
}

Context

StackExchange Code Review Q#144101, answer score: 11

Revisions (0)

No revisions yet.