patternjavaMinor
Coordinating Threads with Reactive Streams
Viewed 0 times
coordinatingreactivewiththreadsstreams
Problem
I just started learning reactive streams with RxJava. After reading a couple of books and a lot of articles our there I am still having trouble understanding how to coordinate multiple threads.
I would appreciate a critique of the following code, particularly if anyone knows a better way to do it.
I basically want to execute multiple tasks in the background, and as results start to get back I want to get immediately notified of the results. I want the background tasks to run in individual threads, but I want the notification thread to run in a single thread.
My idea is that the notification code does not need to worry about synchronization and I can achieve that by making sure the notification code only runs in one thread.
This was my best attempt of that:
```
public class Question {
public static void main(String[] args) {
//A hypothetical list of tasks to run asynchronously
List> tasks = Arrays.asList(
() -> "One: " + Thread.currentThread().getName(),
() -> "Two: " + Thread.currentThread().getName(),
() -> "Three: " + Thread.currentThread().getName()
);
//A blocking queue to hold the results when ready
BlockingQueue queue = new ArrayBlockingQueue<>(3);
//when a task is done, this observer puts it in the queue.
//observer code then will run in the currently processing thread
Observer observer = Observers.create(queue::offer);
tasks.stream().map(Question::async).forEach(result -> result.subscribe(observer));
//as tasks get resolve and enter the queue, this other observer process the results
//in my current thread, not in any of the task threads.
consumer(queue, 3).forEach(item -> {
System.out.println("Received " + item + " at " + Thread.currentThread().getName());
});
}
static Observable async(Callable supplier) {
return Observable.create(subscriber -> {
try {
I would appreciate a critique of the following code, particularly if anyone knows a better way to do it.
I basically want to execute multiple tasks in the background, and as results start to get back I want to get immediately notified of the results. I want the background tasks to run in individual threads, but I want the notification thread to run in a single thread.
My idea is that the notification code does not need to worry about synchronization and I can achieve that by making sure the notification code only runs in one thread.
This was my best attempt of that:
```
public class Question {
public static void main(String[] args) {
//A hypothetical list of tasks to run asynchronously
List> tasks = Arrays.asList(
() -> "One: " + Thread.currentThread().getName(),
() -> "Two: " + Thread.currentThread().getName(),
() -> "Three: " + Thread.currentThread().getName()
);
//A blocking queue to hold the results when ready
BlockingQueue queue = new ArrayBlockingQueue<>(3);
//when a task is done, this observer puts it in the queue.
//observer code then will run in the currently processing thread
Observer observer = Observers.create(queue::offer);
tasks.stream().map(Question::async).forEach(result -> result.subscribe(observer));
//as tasks get resolve and enter the queue, this other observer process the results
//in my current thread, not in any of the task threads.
consumer(queue, 3).forEach(item -> {
System.out.println("Received " + item + " at " + Thread.currentThread().getName());
});
}
static Observable async(Callable supplier) {
return Observable.create(subscriber -> {
try {
Solution
You are jumping through a lot of hoops to get your queue, consumer, and observer strategies aligned. I look at your code, and I don't really follow it all. Part of the reason is that I don't know RxJava at all, but, normally, I can follow these things without much research.
So, your code streams the tasks, for each one, it creates an Observable for it which is subscribed on the Reactive computation engine. The observer for the Observable is set to add the result to the queue.
Now, for the queue, you create an Observer that registers one action for each expected result. You then loop over those events, and print the result.
I can understand why you may want to use RxJava for some situations, but, that last part makes no sense to me... Let me show you your code:
where consumer is:
What does that code do? It looks for three strings on a queue, and prints them.
What's wrong with that (apart from the 3 constant)? ^^^^
Having figured out all the loops you are going through, I struggled to understand the advantage that RxJava is giving you, especially when Java has already got the "right tool" in the
Let me explain, the
Creating a Thread Factory that matches the names you have in RxJava quickly, I have this code:
That code creates an ExecutorService which has threads named things like your code's results.
Now, with that code available, I can have a really simple problem space:
Note that the CompletionService returns a completed
I have put this code up on to ideone so you can see it in action.
Note, the solution I provided has no need for RxJava, and that's because, if you want to execute a bunch of things asynchronously, and retrieve the results in a single thread, then perhaps the observable pattern is not the right one....
So, your code streams the tasks, for each one, it creates an Observable for it which is subscribed on the Reactive computation engine. The observer for the Observable is set to add the result to the queue.
Now, for the queue, you create an Observer that registers one action for each expected result. You then loop over those events, and print the result.
I can understand why you may want to use RxJava for some situations, but, that last part makes no sense to me... Let me show you your code:
//as tasks get resolve and enter the queue, this other observer process the results
//in my current thread, not in any of the task threads.
consumer(queue, 3).forEach(item -> {
System.out.println("Received " + item + " at " + Thread.currentThread().getName());
});where consumer is:
static Observable consumer(BlockingQueue queue, int count) {
return Observable.create(subscriber -> {
for (int i = 0; i < count; i++) {
try {
T text = queue.take();
subscriber.onNext(text);
}
catch (InterruptedException ex) {
subscriber.onError(ex);
}
}
subscriber.onCompleted();
}).observeOn(Schedulers.immediate());
}What does that code do? It looks for three strings on a queue, and prints them.
for (int i = 0; i < 3; i++) {
System.out.println("Received " + queue.take() + " at " + Thread.currentThread().getName());
}What's wrong with that (apart from the 3 constant)? ^^^^
Having figured out all the loops you are going through, I struggled to understand the advantage that RxJava is giving you, especially when Java has already got the "right tool" in the
java.util.concurrent.* toolbox.Let me explain, the
CompletionService concept says: Submit a bunch of jobs to the service, and the service will tell you when they complete. It has a submit(...) method to add jobs to the service, and a take(...) method which will wait until the next-completed task, and return it.Creating a Thread Factory that matches the names you have in RxJava quickly, I have this code:
private static final AtomicInteger threadId = new AtomicInteger(0);
private static Thread threadFactory(Runnable r) {
Thread t = new Thread(r, "RxComputationThreadPool-" + threadId.incrementAndGet());
t.setDaemon(true);
return t;
}
private static ExecutorService pool = Executors.newCachedThreadPool(Reactive::threadFactory);That code creates an ExecutorService which has threads named things like your code's results.
Now, with that code available, I can have a really simple problem space:
public static void main(String[] args) throws InterruptedException, ExecutionException {
List> tasks = Arrays.asList(
() -> "One: " + Thread.currentThread().getName(),
() -> "Two: " + Thread.currentThread().getName(),
() -> "Three: " + Thread.currentThread().getName()
);
int count = tasks.size();
CompletionService completor = new ExecutorCompletionService<>(pool);
tasks.stream().forEach(completor::submit);
for (int i = 0; i < count; i++) {
System.out.println("Received: " + completor.take().get() + " at " + Thread.currentThread().getName());
}
}Note that the CompletionService returns a completed
Future, not the actual String, so you have to get() the string from the Future.I have put this code up on to ideone so you can see it in action.
Note, the solution I provided has no need for RxJava, and that's because, if you want to execute a bunch of things asynchronously, and retrieve the results in a single thread, then perhaps the observable pattern is not the right one....
Code Snippets
//as tasks get resolve and enter the queue, this other observer process the results
//in my current thread, not in any of the task threads.
consumer(queue, 3).forEach(item -> {
System.out.println("Received " + item + " at " + Thread.currentThread().getName());
});static <T> Observable<T> consumer(BlockingQueue<T> queue, int count) {
return Observable.<T>create(subscriber -> {
for (int i = 0; i < count; i++) {
try {
T text = queue.take();
subscriber.onNext(text);
}
catch (InterruptedException ex) {
subscriber.onError(ex);
}
}
subscriber.onCompleted();
}).observeOn(Schedulers.immediate());
}for (int i = 0; i < 3; i++) {
System.out.println("Received " + queue.take() + " at " + Thread.currentThread().getName());
}private static final AtomicInteger threadId = new AtomicInteger(0);
private static Thread threadFactory(Runnable r) {
Thread t = new Thread(r, "RxComputationThreadPool-" + threadId.incrementAndGet());
t.setDaemon(true);
return t;
}
private static ExecutorService pool = Executors.newCachedThreadPool(Reactive::threadFactory);public static void main(String[] args) throws InterruptedException, ExecutionException {
List<Callable<String>> tasks = Arrays.asList(
() -> "One: " + Thread.currentThread().getName(),
() -> "Two: " + Thread.currentThread().getName(),
() -> "Three: " + Thread.currentThread().getName()
);
int count = tasks.size();
CompletionService<String> completor = new ExecutorCompletionService<>(pool);
tasks.stream().forEach(completor::submit);
for (int i = 0; i < count; i++) {
System.out.println("Received: " + completor.take().get() + " at " + Thread.currentThread().getName());
}
}Context
StackExchange Code Review Q#107831, answer score: 2
Revisions (0)
No revisions yet.