A pipeline chain – Process async values like Java 8 Streams

What is pipeline?

Pipeline is an implementation of a chain of nodes for processing values asynchronously, that can be configured in an elegant way by fluent interface.
Similar to java 8 stream api, but with stream api you must have your values in advance (in a collection). So if you don’t have them (eg.: you receive values from user input), then you need something else. This is what pipeline chain is, a structure of chained processor nodes and a given output. Then putting values to the pipeline, that will go through it, and triggers the given output (or not if a node filter value out). You can filter, map, peek values like with Streams, but can’t aggregate because the value count is infinite, not like Streams.

pipe1

A simple example

Suppose that you want to trim, lowercase, and skip repeating values from an input:

pipe2

You can define this process in an elegant way (like Java 8 Stream API):

Consumer<String> pipeline = Pipeline.ofInputType(String.class)
        .map(s -> s.trim())
        .map(s -> s.toLowerCase())
        .filter(new DuplicateSkipper<>())
        .to(s -> System.out.println(s))
        ;

then you put inputs to the pipeline (that you get from user inputs or whatever) :

pipeline.accept(" a");
pipeline.accept(" ab");
pipeline.accept(" aB");
pipeline.accept(" aBc");

resulting the processed output:

a
ab
abc

A more advanced example

This example is the real rendering mechanism of log4jtester.com. The input is the textbox value changes, the output is directed to the rendered content html div element.

pipe3

The pipeline is defined simply:

pipeline = Pipeline.ofInputType(String.class)
    .filter(new RepeatSkipper<>())
    .through(AsyncChain.ofTypes(String.class, Result.class)
                .through(floodFilter)
                .through(new EmptyInputResolver())
                .to(new ServiceCallFunction())
            )
    .map(new HtmlFormatter())
    .to(output)
    ;

(pipeline sources are prepared to use with GWT.)

Download source

You can download sources from my GITHUB functions project (by using kinolien’s GITZIP download github folder tool).
Examples are here.

Written by Dániel Hári

Dániel Hári is the founder of log4jtester.com, cleancodejava.com. Who is an enthusiastic Java developer who enhances his knowledge by continously searching for best practices and modern technologies. Engaged to clean coding, and honors the engineering profession as releasing quality work.