HiveBrain v1.2.0
Get Started
← Back to all entries
patternjavaMinor

Java InfiniteStream with Queue

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
withinfinitestreamqueuejava

Problem

I did InfiniteStream by implementing Stream and Consumer. You can initiate the stream then consume an object using it. I've added queueing for it to handle storing more than one value from accept. Please provide feedback.

```
public class InfiniteStream implements Consumer, Stream {
private static final int LENGTH = 1000;
private final Stream stream;
private final Queueing q;
private final int length;

public InfiniteStream(int length) {
this.length = length;
this.q = new Queueing(this.length);
this.stream = Stream.generate(q);
}

public InfiniteStream() {
this(LENGTH);
}

@Override
public void accept(T t) {
q.accept(t);
}

@Override
public Iterator iterator() {
return stream.iterator();
}

@Override
public Spliterator spliterator() {
return stream.spliterator();
}

@Override
public boolean isParallel() {
return stream.isParallel();
}

@Override
public Stream sequential() {
return stream.sequential();
}

@Override
public Stream parallel() {
return stream.parallel();
}

@Override
public Stream unordered() {
return stream.unordered();
}

@Override
public Stream onClose(Runnable closeHandler) {
return stream.onClose(closeHandler);
}

@Override
public void close() {
stream.close();
}

@Override
public Stream filter(Predicate predicate) {
return stream.filter(predicate);
}

@Override
public Stream map(Function mapper) {
return stream.map(mapper);
}

@Override
public IntStream mapToInt(ToIntFunction mapper) {
return stream.mapToInt(mapper);
}

@Override
public LongStream mapToLong(ToLongFunction mapper) {
return stream.mapToLong(mapper);
}

@Override
public DoubleStream mapToDouble(ToDoubleFunction mapper) {
return stream.mapToDou

Solution

What you have implemented here is a "wrapper" class for a stream. In almost all instances you just pass through the behaviour you want to the wrapped stream instance.

A better solution would be to use the existing Stream code supplied in StreamSupport. This allows you to use code that is well tested, and part of the library. When I played with your code, I ended up changing the API a little, and it helps a lot.

The best way I can think to review this, is to show what I have done, and why it is "better".

import java.util.ArrayList;
import java.util.List;
import java.util.Spliterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/**
 * Implement a buffered consumer that feeds consumed items in to a stream
 * 
 * @author rolf
 *
 * @param 
 *            the generic type of the data being streamed.
 */
public class ConsumerToStream implements Consumer {

    private static final class QSpliterator implements Spliterator {

        private final BlockingQueue queue;

        public QSpliterator(BlockingQueue queue) {
            this.queue = queue;
        }

        @Override
        public boolean tryAdvance(Consumer action) {
            try {
                action.accept(queue.take());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException("Take interrupted.", e);
            }
            return true;
        }

        @Override
        public Spliterator trySplit() {
            try {
                final int size = queue.size();
                List vals = new ArrayList<>(size + 1);
                vals.add(queue.take());
                queue.drainTo(vals);
                return vals.spliterator();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException("Thread interrupted during trySplit.", e);
            }
        }

        @Override
        public long estimateSize() {
            return Long.MAX_VALUE;
        }

        @Override
        public int characteristics() {
            return Spliterator.CONCURRENT;
        }

    }

    private final Stream outstream;
    private final BlockingQueue blockingQueue;

    private final Spliterator splitter;

    /**
     * Construct an instance of the consumer buffer with the supplied maximum
     * capacity
     * 
     * @param bufferSize
     *            the amount of space to set aside for buffered items.
     */
    public ConsumerToStream(int bufferSize) {
        this.blockingQueue = new LinkedBlockingQueue<>(bufferSize);
        this.splitter = new QSpliterator<>(blockingQueue);
        this.outstream = StreamSupport.stream(splitter, false);
    }

    /**
     * Get the stream this buffer outputs to.
     * 
     * @return the output stream.
     */
    public Stream stream() {
        return outstream;
    }

    @Override
    public void accept(V t) {
        try {
            blockingQueue.put(t);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IllegalStateException("Interrupted accepting a new value.", e);
        }
    }

}


Let's run through the parts there:

  • there's the main class ConsumerToStream which is a Consumer, and has an accept method which it overrides from the interface.



  • there's the QSpliterator class which is a private class - it is what is used to feed the output Stream - by being "wrapped" by StreamSupport.



  • there's the queue, which is used to buffer and thread-manage the inputs to the stream.



Note that the QSpliterator supports trySplit() by feeding off a possibly sized-1 partition, leaving nothing in the main split.

Of significance in this implementation, is that all the methods are useful - there's no blind "wrapping" code... it's "clean".

The use case would look like:

public static void main(String[] args) {
    ConsumerToStream cts = new ConsumerToStream<>(10);
    CompletableFuture.runAsync(() -> {
        while (true) {
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
            IntStream.range(0, 100).boxed().forEach(cts);
        }
    });
    cts.stream().parallel().forEach(System.out::println);
}


Oh, and by-the-way - your handling of interrupted exceptions was a pleasure to see. Well done.

Code Snippets

import java.util.ArrayList;
import java.util.List;
import java.util.Spliterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/**
 * Implement a buffered consumer that feeds consumed items in to a stream
 * 
 * @author rolf
 *
 * @param <V>
 *            the generic type of the data being streamed.
 */
public class ConsumerToStream<V> implements Consumer<V> {

    private static final class QSpliterator<T> implements Spliterator<T> {

        private final BlockingQueue<T> queue;

        public QSpliterator(BlockingQueue<T> queue) {
            this.queue = queue;
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            try {
                action.accept(queue.take());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException("Take interrupted.", e);
            }
            return true;
        }

        @Override
        public Spliterator<T> trySplit() {
            try {
                final int size = queue.size();
                List<T> vals = new ArrayList<>(size + 1);
                vals.add(queue.take());
                queue.drainTo(vals);
                return vals.spliterator();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException("Thread interrupted during trySplit.", e);
            }
        }


        @Override
        public long estimateSize() {
            return Long.MAX_VALUE;
        }

        @Override
        public int characteristics() {
            return Spliterator.CONCURRENT;
        }

    }

    private final Stream<V> outstream;
    private final BlockingQueue<V> blockingQueue;

    private final Spliterator<V> splitter;

    /**
     * Construct an instance of the consumer buffer with the supplied maximum
     * capacity
     * 
     * @param bufferSize
     *            the amount of space to set aside for buffered items.
     */
    public ConsumerToStream(int bufferSize) {
        this.blockingQueue = new LinkedBlockingQueue<>(bufferSize);
        this.splitter = new QSpliterator<>(blockingQueue);
        this.outstream = StreamSupport.stream(splitter, false);
    }

    /**
     * Get the stream this buffer outputs to.
     * 
     * @return the output stream.
     */
    public Stream<V> stream() {
        return outstream;
    }

    @Override
    public void accept(V t) {
        try {
            blockingQueue.put(t);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IllegalStateException("Interrupted accepting a new value.", e);
        }
    }

}
public static void main(String[] args) {
    ConsumerToStream<Integer> cts = new ConsumerToStream<>(10);
    CompletableFuture.runAsync(() -> {
        while (true) {
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
            IntStream.range(0, 100).boxed().forEach(cts);
        }
    });
    cts.stream().parallel().forEach(System.out::println);
}

Context

StackExchange Code Review Q#105287, answer score: 6

Revisions (0)

No revisions yet.