HiveBrain v1.2.0
Get Started
← Back to all entries
patternjavaMinor

Multithreading readied Median Filtering

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
readiedmultithreadingmedianfiltering

Problem

So I've been confronted with a rather simple problem in the Java chat on Stack Overflow. It's about doing a median filter with a sliding reference window.

For the purpose of the task we can assume the window used for filtering is always an odd integer between 3 and 21. The sliding window does not wrap around the array borders and accordingly array elements that cannot be processed with a full window are simply copied over to the target array.

For an array

$$[N_0, N_1, N_2, N_3, \dots, N_{n-1}]$$

the window moves over the whole array once and writes the result of the median calculation to the current index of a result array.

Small reminder: The median of a Set is calculated by ordering the set and choosing the middle element. For sets with an even number of elements, the median is the arithmetic mean of the two middle elements.

$$ [1, 5, 2, 7, 3] \mapsto [1, 2, 3, 5, 7] \mapsto 3$$
$$[1, 9, 7, 2, 7, 4] \mapsto [1, 2, 4, 7, 7, 9] \mapsto \frac{4+7}{2} = 5.5$$

To make this a little clearer, here's how it works for a sample array with a window size of 3. In the beginning, the target array contains no elements. Since we cannot run a median filtering in places where the window doesn't fit we copy over the outermost \$\frac{w - 1}{2}\$ elements. For window size 3 that's 1 element.

$$A_{source} = [1, 5, 2, 7, 6, 3] $$
$$ A_{target} = [1, 0, 0, 0, 0, 3]$$

Now we can run the first median filter. Placing the window at the first position possible, we can examine the indices 0, 1 and 2. These elements are responsible for the target value at index 1.

$$A_{window} = [1, 5, 2] \mapsto A_{sorted} = [1, 2, 5]$$

accordingly the median value for index 1 is \$2\$. writing that result to \$A_{target}\$ gets us:

$$A_{target} = [1, 2, 0, 0, 0, 3]$$

Now the reference window moves by one index. We examine indices 1, 2 and 3:

$$A_{window} = [5, 2, 7] \mapsto A_{sorted} = [2, 5, 7]$$

Take result 5, write it into the target array, rinse and repeat unti we finally arrive at th

Solution

You claim that your "code is a generic approach at providing code that can be trivially multithreaded to run these calculations on a Comparable[]". I see a number of problems with this statement.... most obviously, you are not using generics at all, even when they are needed, as Comparable is a generic class, and has compile warnings when the generic type is omitted. The second issue is your claim that it is "trivially multithreaded". That is an oversimplification. Your code allows the filter to be run in the background, but the filter is still single-threaded.

Using the CountDownLatch as a barrier for the background thread is convenient for a one-shot implementation, but your model for running the code, by creating a 1-time use instance, and getting the results asynchronously, is limiting. It should also instead use the native Java paradigm of a Future instead of a custom "API".

In other words, your class should instead have the following sort of API:

public static  Future doFilter(T[] data, int window) {
    ......
}


That way, people can get a Future for the result, and call it something like:

Integer[] data = ......

Future medianFut = doFilter(data, 5);

//.... do other stuff ....

Integer[] medians = medianFut.get();


That would be a much better asynchronous (note, not multithreaded) usage model.

For a true multi-threaded implementation I would recommend a Java 8 streaming approach, with a parallel stream. That would be an exercise for the reader ... ;-)

Other things to note about the above API, it has the following attributes:

  • it is static - your code requires creating a class instance for each call, and the methods are instance methods.



  • it is generic - it can take any form of comparable as input - your code requires that data is manually transformed in to a Comparable array



  • it is generically type-safe - it would have no compile warnings on the usage side.



Other issues I have with your code are the exception that's thrown in the event of a window size that's larger than the data. That should not be an exception since there's an easy, and logical solution, which is to just return the input data as the result.

On the other hand, you treat the windowSize with an assert if it is even, and that should be an IllegalArgumentException instead. If it was me, though, I would rename the parameter to be "shoulderSize" instead, and that would make an even number impossible to provide. (a window of 1 would be a shoulder of 0, and a shoulder of 5 would be a window of 11, etc.

One final note, is that it's often easier to solve a synchronous problem first, and then wrap it in a neat asynchronous pattern afterwards. This also helps a lot with testing.

So, putting this all together, I would instead have your code similar to:

import java.util.Arrays;
import java.util.Random;

public class MedianFilter {

    public static > T[] getMedians(T[] data, int shoulder) {
        T[] result = Arrays.copyOf(data, data.length);

        for (int i = shoulder; i > T medianPoint(T[] data, int index, int shoulder) {
        T[] window = Arrays.copyOfRange(data, index - shoulder, index + shoulder + 1);
        Arrays.sort(window);
        return window[shoulder];
    }

    public static void main(String[] args)  {
        Random rand = new Random(100);
        Integer[] data = rand.ints().limit(100).boxed().toArray(s -> new Integer[s]);
        System.out.println(Arrays.toString(data));
        System.out.println(Arrays.toString(getMedians(data, 4)));
    }
}


Now, that code can be made asynchronous relatively easy by using the native Java tools. I will use a Java 8 function expression to create a Daemon thread for the background execution... and also the Callable functional interface.

public static > Future asyncMedians(T[] data, int shoulder) {
    ExecutorService service = Executors.newSingleThreadExecutor(r -> {
        Thread t = new Thread(r);
        t.setDaemon(true);
        return t;
    });

    Future fut =  service.submit(() -> getMedians(data, shoulder));
    service.shutdown();
    return fut;
}


Now, you can call that asynchronously with:

Future fut = asyncMedians(data, 4);
    System.out.println(Arrays.toString(fut.get()));


Finally, if you want true multithreaded execution in your filter, you can use Java streams instead of the for-loop I have. Consider this replacement function:

public static > T[] getMedians(T[] data, int shoulder) {
    T[] result = Arrays.copyOf(data, data.length);
    IntStream.range(shoulder, data.length - shoulder)
        .parallel()
        .forEach(i -> result[i] = medianPoint(data, i, shoulder));
    return result;
}


I have put this all together in ideone

Code Snippets

public static <T extends Comparable> Future<T[]> doFilter(T[] data, int window) {
    ......
}
Integer[] data = ......

Future<Integer[]> medianFut = doFilter(data, 5);

//.... do other stuff ....

Integer[] medians = medianFut.get();
import java.util.Arrays;
import java.util.Random;

public class MedianFilter {

    public static <T extends Comparable<T>> T[] getMedians(T[] data, int shoulder) {
        T[] result = Arrays.copyOf(data, data.length);

        for (int i = shoulder; i < data.length - shoulder; i++) {
            result[i] = medianPoint(data, i, shoulder);
        }

        return result;
    }

    private static <T extends Comparable<T>> T medianPoint(T[] data, int index, int shoulder) {
        T[] window = Arrays.copyOfRange(data, index - shoulder, index + shoulder + 1);
        Arrays.sort(window);
        return window[shoulder];
    }

    public static void main(String[] args)  {
        Random rand = new Random(100);
        Integer[] data = rand.ints().limit(100).boxed().toArray(s -> new Integer[s]);
        System.out.println(Arrays.toString(data));
        System.out.println(Arrays.toString(getMedians(data, 4)));
    }
}
public static <T extends Comparable<T>> Future<T[]> asyncMedians(T[] data, int shoulder) {
    ExecutorService service = Executors.newSingleThreadExecutor(r -> {
        Thread t = new Thread(r);
        t.setDaemon(true);
        return t;
    });

    Future<T[]> fut =  service.submit(() -> getMedians(data, shoulder));
    service.shutdown();
    return fut;
}
Future<Integer[]> fut = asyncMedians(data, 4);
    System.out.println(Arrays.toString(fut.get()));

Context

StackExchange Code Review Q#101118, answer score: 4

Revisions (0)

No revisions yet.