patternjavaMinor
Refactoring this producer consumer class to minimize what is exposed
Viewed 0 times
refactoringthisminimizeexposedwhatconsumerproducerclass
Problem
I don't like the service class below, specifically, the way it exposes the internals of the producer. While it would be possible to combine all three classes into one producer-consumer, I would rather that they stay separated.
Service:
```
package net.bounceme.dur.client;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
import net.bounceme.dur.data.Title;
public class ProducerConsumerService implements Executor {
private static final Logger log = Logger.getLogger(ProducerConsumerService.class.getName());
private final BlockingQueue queue = new ArrayBlockingQueue<>(1);
private Producer producer = null;
private Consumer consumer = null;
private Title title = null;
public void ProducerConsumerService() {
}
public Title produce() throws InterruptedException, IOException, ClassNotFoundException {
producer.produce();
title = queue.take();
consumer.consume(title);
return title;
}
public void startService() throws IOException {
Properties props = PropertiesReader.getProps();
int portNumber = Integer.parseInt(props.getProperty("port"));
String host = props.getProperty("server");
Socket socket = new Socket(host, portNumber);
ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
producer = new Producer(queue, objectInputStream);
consumer = new Consumer(queue, objectOutputStream);
new Thread((Runnable) producer).start();
new Thread((Runnable) consumer).start();
log.info("started...");
}
@Override
public void execute(Runna
Service:
```
package net.bounceme.dur.client;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
import net.bounceme.dur.data.Title;
public class ProducerConsumerService implements Executor {
private static final Logger log = Logger.getLogger(ProducerConsumerService.class.getName());
private final BlockingQueue queue = new ArrayBlockingQueue<>(1);
private Producer producer = null;
private Consumer consumer = null;
private Title title = null;
public void ProducerConsumerService() {
}
public Title produce() throws InterruptedException, IOException, ClassNotFoundException {
producer.produce();
title = queue.take();
consumer.consume(title);
return title;
}
public void startService() throws IOException {
Properties props = PropertiesReader.getProps();
int portNumber = Integer.parseInt(props.getProperty("port"));
String host = props.getProperty("server");
Socket socket = new Socket(host, portNumber);
ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
producer = new Producer(queue, objectInputStream);
consumer = new Consumer(queue, objectOutputStream);
new Thread((Runnable) producer).start();
new Thread((Runnable) consumer).start();
log.info("started...");
}
@Override
public void execute(Runna
Solution
Yeah, your dependency graph has gotten all tangled up.
In this case, the root problem seems to be that there are a number of different things going on, and you haven't really identified them. So you have your producer/consumer idiom, your queuing, and your threading conflated.
My recommendation is to start from a simple case, and then gradually add more elements.
In producer/consumer (publish/subscribe, event handling, etc), I find the the cleanest approach to take is to start by working a single threaded example, to identify the right abstractions, and then to add threading after it's clear that the basic abstractions are right.
In a single thread, the simplest produce consume logic would look something like
A key idea is that, in this trivial example, it should be clear that the glue code doesn't care where producer is getting the thing, or what the consumer is doing with it. The producer and consumer are completely unaware of each other as well. This helps us to learn that we have two simple interfaces to consider
Where does the Title come from? That's an implementation detail - the code that uses the interface doesn't need to know anything about it. But the code that implements the interface does, and the code that creates the instance of the implementation will have some awareness of it.
For example, we can put together a really simple implementation.
Granted, those aren't particularly exciting implementations, but they are enough to see that the hand off is working.
Now, in your case, you had specific streams in mind to produce and consume the objects. The interfaces we've discovered so far don't allow you to specify the streams to be used -- that's deliberate. Because the streams are really part of the implementation only, we don't want them in the signature of the methods, but instead in the signature of the constructors. This is usually described as dependency injection (a useful idea to google).
You can easily see that our earlier test is readily adapted to the useful implementation
Now, we want to produce and consume on separate threads. This is actually two different problems to solve
Important trick -- implement the thread-safe data exchange in a single thread. Let's walk through it slowly.
Yeah, it's a complicated way to do a single thread hand off. But it affords a key insight -
```
QueueTitleProducer implements TitleProducer {
private final BlockingQueue queue;
QueueTitleProducer(BlockingQueue queue) {
this.queue = queue;
}
public Title produce () {
return this.queue.take();
}
}
QueueTitleConsumer implements TitleConsumer {
private final BlockingQueue queue;
QueueTitleConsumer(BlockingQueue queue) {
this.queue = queue;
}
public void consume(Title title) {
this.queue.put(title);
}
}
BlockingQueue queue = ...
Consumer queueConsumer = new QueueTitleConsumer(queue);
Producer queueProducer = new QueueTitleProducer(que
In this case, the root problem seems to be that there are a number of different things going on, and you haven't really identified them. So you have your producer/consumer idiom, your queuing, and your threading conflated.
My recommendation is to start from a simple case, and then gradually add more elements.
In producer/consumer (publish/subscribe, event handling, etc), I find the the cleanest approach to take is to start by working a single threaded example, to identify the right abstractions, and then to add threading after it's clear that the basic abstractions are right.
In a single thread, the simplest produce consume logic would look something like
Title title = producer.produce();
consumer.consume(title);A key idea is that, in this trivial example, it should be clear that the glue code doesn't care where producer is getting the thing, or what the consumer is doing with it. The producer and consumer are completely unaware of each other as well. This helps us to learn that we have two simple interfaces to consider
interface TitleProducer {
Title produce();
}
interface TitleConsumer {
void consume(Title title);
}Where does the Title come from? That's an implementation detail - the code that uses the interface doesn't need to know anything about it. But the code that implements the interface does, and the code that creates the instance of the implementation will have some awareness of it.
For example, we can put together a really simple implementation.
class TrivialTitleProducer implements TitleProducer {
private final Title title;
TrivialTitleProducer(Title title) {
this.title = title;
}
public Title produce () {
return title;
}
}
class TrivialTitleConsumer implements TitleConsumer {
public void consume(Title title) {
System.out.println(title.toString());
}
}Granted, those aren't particularly exciting implementations, but they are enough to see that the hand off is working.
Producer producer = new TrivialTitleProducer(new Title());
Consumer consumer = new TrivialTitleConsumer();
Title title = producer.produce();
consumer.consume(title);Now, in your case, you had specific streams in mind to produce and consume the objects. The interfaces we've discovered so far don't allow you to specify the streams to be used -- that's deliberate. Because the streams are really part of the implementation only, we don't want them in the signature of the methods, but instead in the signature of the constructors. This is usually described as dependency injection (a useful idea to google).
StreamTitleProducer implements TitleProducer {
private final ObjectInputStream in;
StreamTitleProducer(ObjectInputStream in) {
this.in = in;
}
public Title produce () {
return (Title) in.readObject();
}
}
StreamTitleConsumer implements TitleConsumer {
private final ObjectOutputStream out;
StreamTitleConsumer(ObjectOutputStream out) {
this.out = out;
}
public void consume(Title title) {
out.writeObject(title);
}
}You can easily see that our earlier test is readily adapted to the useful implementation
ObjectInputStream in = ...
ObjectOutputStream out = ...
Producer producer = new StreamTitleProducer(in);
Consumer consumer = new StreamTitleConsumer(out);
Title title = producer.produce();
consumer.consume(title);Now, we want to produce and consume on separate threads. This is actually two different problems to solve
- How do we exchange data between a producer on one thread and a consumer on another thread?
- How do we execute that code on separate threads.
Important trick -- implement the thread-safe data exchange in a single thread. Let's walk through it slowly.
BlockingQueue is a reasonable way to communicate between threads, so let's start there. What would a trivial implementation look like?BlockingQueue queue = ...
Title title = producer.produce();
queue.put(title);
Title queuedTitle = queue.take();
consumer.consume(title);Yeah, it's a complicated way to do a single thread hand off. But it affords a key insight -
queue.put() looks just like consume() it, and queue.take() looks just like produce(). So let's codify that.```
QueueTitleProducer implements TitleProducer {
private final BlockingQueue queue;
QueueTitleProducer(BlockingQueue queue) {
this.queue = queue;
}
public Title produce () {
return this.queue.take();
}
}
QueueTitleConsumer implements TitleConsumer {
private final BlockingQueue queue;
QueueTitleConsumer(BlockingQueue queue) {
this.queue = queue;
}
public void consume(Title title) {
this.queue.put(title);
}
}
BlockingQueue queue = ...
Consumer queueConsumer = new QueueTitleConsumer(queue);
Producer queueProducer = new QueueTitleProducer(que
Code Snippets
Title title = producer.produce();
consumer.consume(title);interface TitleProducer {
Title produce();
}
interface TitleConsumer {
void consume(Title title);
}class TrivialTitleProducer implements TitleProducer {
private final Title title;
TrivialTitleProducer(Title title) {
this.title = title;
}
public Title produce () {
return title;
}
}
class TrivialTitleConsumer implements TitleConsumer {
public void consume(Title title) {
System.out.println(title.toString());
}
}Producer producer = new TrivialTitleProducer(new Title());
Consumer consumer = new TrivialTitleConsumer();
Title title = producer.produce();
consumer.consume(title);StreamTitleProducer implements TitleProducer {
private final ObjectInputStream in;
StreamTitleProducer(ObjectInputStream in) {
this.in = in;
}
public Title produce () {
return (Title) in.readObject();
}
}
StreamTitleConsumer implements TitleConsumer {
private final ObjectOutputStream out;
StreamTitleConsumer(ObjectOutputStream out) {
this.out = out;
}
public void consume(Title title) {
out.writeObject(title);
}
}Context
StackExchange Code Review Q#56814, answer score: 2
Revisions (0)
No revisions yet.