HiveBrain v1.2.0
Get Started
← Back to all entries
patternjavaMinor

Processing compressed .csv files

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
compressedprocessingfilescsv

Problem

I have some .csv files compressed in .bz2 format. I need to take a subset of the records (and data) and switch to .gz.

I am not happy with the performance. Is there a more efficient way to do it?

//For each file in a folder:
try (BufferedReader br = new BufferedReader(new InputStreamReader(
        new BZip2CompressorInputStream(new FileInputStream(fileIn))));
     BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(
        new GZIPOutputStream(new FileOutputStream(fileOut))))) {

    String line;
    while ((line = br.readLine()) != null) {
        String[] parts = line.split(',');
        if(isLineToSkip(parts[0])) {
            continue;
        }
        String outLine = parts[0] + "," + parts[3]; 

        writer.append(outLine);
        writer.newLine();
    }
} . . .

Solution

Well, your code is neat, and you use the Try-with-resources structures well. There is one potential bug, which is that there may be lines whihc have a 'successful' part[0], but no part[3] which would cause an IndexOutOfBoundsException.

As for the performance, the key here is Amdahl's Law.... essentially parallelization.

You have five CPU intensive parts to your problem:

  • decompression



  • Stream->Reader



  • Split & Filter



  • Writer->Stream



  • Compression



Assuming the transformations that you do in each of these are similarly expensive to do computationally, then you can probably make your system go 5 times faster by doing them each in parallel. Five threads:

  • one of them reads the data from the file, and decompresses that in to chunks of bytes which it feeds in to a queue.



  • the second takes chunks from the queue, and decodes it in to characters (UTF-8?), which it puts in to a char chunk queue



  • the third takes char chunks, identifies and splits the lines, and filters what's junk.



  • the fourth encodes the chars back to a byte stream, in chunks which it places on a queue



  • the fifth compresses the byte chunks back to disk



This is going to net you a potential 5X improvement.

That's pretty complicated though.

What would be much simpler is to use a different axis to process the parallelism. Your comments indicate that you have multiple files to process.... you should do them each in multiple threads...

Consider the following structure:

private static final boolean processFile(final File fileIn) throws IOException {
    //For each file in a folder:
    try (BufferedReader br = new BufferedReader(new InputStreamReader(
            new BZip2CompressorInputStream(new FileInputStream(fileIn))));
         BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(
            new GZIPOutputStream(new FileOutputStream(fileOut))))) {

        String line;
        while ((line = br.readLine()) != null) {
            String[] parts = line.split(',');
            if(isLineToSkip(parts[0])) {
                continue;
            }
            String outLine = parts[0] + "," + parts[3]; 

            writer.append(outLine);
            writer.newLine();
        }
    }
    return true;
}

.......

ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
List> queued = new LinkedList<>();
for (final File toprocess : ....... ) {
    queued.add(service.submit(new Callable () {
        public Boolean call() throws IOException {
            return processFile(toprocess);
        }
    }));
}

for (Future future : queueud) {
    future.get();
}


With the above code, you will do one file in each thread, and the system CPU will run at 100% ... no matter how many CPU's you have. Essentially you are using your system fully.

Code Snippets

private static final boolean processFile(final File fileIn) throws IOException {
    //For each file in a folder:
    try (BufferedReader br = new BufferedReader(new InputStreamReader(
            new BZip2CompressorInputStream(new FileInputStream(fileIn))));
         BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(
            new GZIPOutputStream(new FileOutputStream(fileOut))))) {

        String line;
        while ((line = br.readLine()) != null) {
            String[] parts = line.split(',');
            if(isLineToSkip(parts[0])) {
                continue;
            }
            String outLine = parts[0] + "," + parts[3]; 

            writer.append(outLine);
            writer.newLine();
        }
    }
    return true;
}

.......


ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
List<Future<Boolean>> queued = new LinkedList<>();
for (final File toprocess : ....... ) {
    queued.add(service.submit(new Callable () {
        public Boolean call() throws IOException {
            return processFile(toprocess);
        }
    }));
}

for (Future<Boolean> future : queueud) {
    future.get();
}

Context

StackExchange Code Review Q#51586, answer score: 4

Revisions (0)

No revisions yet.