Java 8 Advanced Streams

Hello there!

In this post I will give you in-depth overview about Java 8 Streams. Just to introduce, Java 8 Streams are Monads that applies functional programming in Java, allowing us to do immutable computing data pipelines, for example transforming, mapping, reducing, collecting etc. Java 8 Stream API is just to processing collections and is not about reactive streaming processing like RxJava, Reactor, Akka Streams etc (I will make some posts about it later :)) but it is still powerful when we need to process large length of collections data and other common routines.

To start, let’s see a simple Stream sequence:

Arrays.asList("come on!", "I", "love", "lisp", "because", "is", "cool")
        .stream()
        .filter(s -> s.startsWith("c"))
        .map(String::toUpperCase)
        .sorted()
        .forEach(System.out::println);

The above code will print “COOL” and “COME ON!” Strings according to applied filter(s -> s.startsWith(“c”) what will retrieve just Strings that starts with c, after map(String::toUpperCase) that will takes two filtered Strings and apply upperCase, after a sorted() operation in natural order and a println() for each processed String as well.

This example shows two important concepts. Stream operations can be intermediate or terminal. Intermediate operations will always returns the Stream, so we can continuing chaining more intermediate operations on the Stream own data pipeline. In the above example,  filter, map and sorted are all intermediate operations, whereas forEach is a terminal operation, because it finishes the Stream pipeline processing. This is not only about Java 8 Stream API, but is a concept for any streaming processing, for example, let’s see an example in Scala Akka Streams just to clarify this concept:

implicit val system = ActorSystem("MySystemActor")
implicit val materializer = ActorMaterializer()

val text =
"""|Lorem Ipsum is simply dummy text of the printing and typesetting industry.
|Lorem Ipsum has been the industry's standard dummy text ever since the 1500s,
|when an unknown printer took a galley of type and scrambled it to make a type
|specimen book.""".stripMargin

Source.fromIterator(() => text.split("\\s").iteratoroUpperCase).
runForeach(println).
onComplete(_ => system.terminate())

Don’t worry about the two first lines, but that just create an actor system reference in order to instantiate actors (I will talk about Akka Actors later) and an ActorMaterializer to be possible to processing a list of transformations asynchronously.

As you can see above, we are iterating and transforming the constant text, applying intermediate operations and in the end, we call the onComplete function that asynchronously is a terminal operation of the stream.

Now that  we’ve learned how to create and work with streams, let’s dive deeper into how stream operations are processed.

An important characteristic of intermediate operations is laziness. For example:

Stream.of("h", "l", "l", "o", "s", "t", "r", "e", "a", "m", "s", "!")
        .filter(s -> {
            System.out.println("Cuteer: " + s);
            return true;
        });

When the above code is executed, nothing is printed to the console. That’s because intermediate operations will only be executed when a terminal operation is present. For example:

Cute filter: h
foreach: h
Cute filter: e
foreach: e
Cute filter: l
foreach: l
Cute filter: l
foreach: l
Cute filter: o
foreach: o
Cute filter: s
foreach: s
Cute filter: t
foreach: t
Cute filter: r
foreach: r
Cute filter: e
foreach: e
Cute filter: a
foreach: a
Cute filter: m
foreach: m
Cute filter: s
foreach: s
Cute filter: !
foreach: !

The order of the result might be surprising. A naive approach would be to execute the operations horizontally one after another on all elements of the stream. But instead each element moves along the chain vertically. The first string “h” passes filter then forEach, only then the second string “e” is processed.

This behavior can reduce number of operations performed on each element, as you can see in the next example:

map: a1
anyMatch: A1

The operation anyMatch returns true as soon as the predicate applies to the given input element. This is true for the second element passed “A2”. Due to the vertical execution of the stream chain, map has only to be executed twice in this case. So instead of mapping all elements of the stream, map will be called as few as possible.

I think that the order matters and you understood that, right? Let’s see one more example when order make difference:

The next example consists of two intermediate operations map and filter, finishing with the terminal operation forEach:

Stream.of("a13", "b1", "c2", "b2")
        .map(s -> {
            System.out.println("map s);
            return s.toUpperCase();
        })
        .filter(s -> {
            System.out.println("filter s);
            return s.startsWith("A");
        })
        .forEach(s -> System.out.println("forEach s));
map: a1
filter: A1
forEach: A1
map: d3
filter: D3
map: b1
filter: B1
map: c2
filter: C2
map: b2
filter: B2

As you might have guessed both map and filter are called five times for every string in the underlying collection whereas forEach is only called once.

But don’t worry, we can greatly reduce the actual number of executions if we change the order of the operations:

Stream.of("a13", "b1", "c2", "b2")
        .filter(s -> {
            System.out.println("filter s);
            return s.startsWith("a");
        })
        .map(s -> {
            System.out.println("map s);
            return s.toUpperCase();
        })
        .forEach(s -> System.out.println("forEach s));
filter: a1
map: a1
forEach: A1
filter: d3
filter: b1
filter: c2
filter: b2

with the above changes, the map operation is only called once so the operation pipeline performs much faster. Let’s extend the above example by an additional operation, sorted:

Stream.of("a13", "b1", "c2", "b2")
        .sorted((s1, s2) -> {
            System.out.printf("sort %s\n", s1, s2);
            return s1.compareTo(s2);
        })
        .filter(s -> {
            System.out.println("filter s);
            return s.startsWith("a");
        })
        .map(s -> {
            System.out.println("map s);
            return s.toUpperCase();
        })
        .forEach(s -> System.out.println("forEach s));

Sorting is a special kind of intermediate operation. It’s a so called stateful operation since in order to sort a collection of elements you have to maintain state during ordering.

sort: d3; a1
sort: b1; d3
sort: b1; d3
sort: b1; a1
sort: c2; b1
sort: c2; d3
sort: b2; c2
sort: b2; b1
filter: a1
map: a1
forEach: A1
filter: b1
filter: b2
filter: c2
filter: d3

First, the sort operation is executed on the entire input collection. In other words sorted is executed horizontally. So in this case sorted is called eight times for multiple combinations on every element in the input collection.

And again, we can optimize the performance by reordering the chain:

Stream.of("a13", "b1", "c2", "b2")
        .filter(s -> {
            System.out.println("filter s);
            return s.startsWith("a");
        })
        .sorted((s1, s2) -> {
            System.out.printf("sort %s\n", s1, s2);
            return s1.compareTo(s2);
        })
        .map(s -> {
            System.out.println("map s);
            return s.toUpperCase();
        })
        .forEach(s -> System.out.println("forEach s));
filter: a1
filter: d3
filter: b1
filter: c2
filter: b2
map: a1
forEach: A1

In the above output, sorted is never been called because filter reduces the input collection to just one element. So the performance is greatly increased🙂.

Ok, you already know about the stream operations order, so I think that we can go to the next level, right? Let’s see dive deeper into more complex operations: collect, flatMap and reduce.

Collect

Collect is an extremely useful terminal operation that transforms and collect the elements of the stream into a different result, such as List, Set or Map. the collect operation receives a Collector which consists in four operations: a supplier, a accumulator, a combiner and a finisher. Many parameters, right? But Java 8 help us with various built-in collectors via Collectors class. For the common operations is not necessary to implement your own collector. Let’s see some examples using the Collectors class.

List<Fruit> filtered =
    fruits
        .stream()
        .filter(f -> f.getName().startsWith("B"))
        .collect(Collectors.toList());

//        Banana
//        Blackberry
//        Blueberry
//        Black Currant

The above code just transforms and collects fruits objects into a List. If you need to remove duplicates, just use Collectors.toSet() or distinct operation.

The next example groups fruits by color

Map<String, List<Fruit>> fruitsByColor = fruits
        .stream()
        .collect(Collectors.groupingBy(f -> f.getColor()));

//       Yellow - [Banana]
//       Black  - [Blackberry, Black Currant]
//       Purple - [Blueberry]

You can also determine the average price of all fruits:

Double averagePrice = fruits
        .stream()
        .collect(Collectors.averagingInt(f -> f.getPrice()));

//       7.5

The next example joins all fruits into a single String:

String phrase = fruits
        .stream()
        .filter(f -> f.getColor() == "Black")
        .map(f -> f.getName())
        .collect(Collectors.joining(" and ", "In Brazil ", " are unusual fruits."));

//      In Brazil Blackberry and Black Currant are unusual fruits.

The joining operation receives a delimiter as well as an optional prefix and suffix.

To transform the stream elements into a map, is necessary to specify how both the keys and the values should be mapped. Keep in mind that the mapped keys must be unique, otherwise an IllegalStateException is thrown. You can optionally pass a merge function as an additional parameter to bypass the exception:

Map<String, String> map = fruits
        .stream()
        .collect(Collectors.toMap(
                f -> f.getColor(),
                f -> f.getName(),
                (name1, name2) -> name1 + ";" + name2));

// {Yellow=Banana, Black=Blackberry;Black Currant, Purple=Blueberry}

Ok, we already know some of the most powerful built-in collectors, so let’s build our own collector. We want to transform all fruits of the stream into a single string consisting of all names in upper case letters separated by the | pipe character. In order to achieve this we create a new collector via Collector.of(). We have to pass the four ingredients of a collector: a supplier, an accumulator, a combiner and a finisher.

Collector<Fruit, StringJoiner, String> personNameCollector =
        Collector.of(
                () -> new StringJoiner(" | "),          // supplier
                (j, f) -> j.add(f.getName().toUpperCase()),  // accumulator
                (j1, j2) -> j1.merge(j2),               // combiner
                StringJoiner::toString);                // finisher

String names = fruits
        .stream()
        .collect(personNameCollector);

// BANANA | BLACKBERRY | BLACK CURRANT | BLUEBERRY

Since Strings in Java are immutable, we need a helper class like StringJoiner to let the collector construct our String. The supplier initially constructs such a StringJoiner with the appropriate delimiter. The accumulator is used to add each fruits upper-cased name to the StringJoiner. The combiner knows how to merge two StringJoiners into one. In the last step the finisher constructs the desired String from the StringJoiner.

FlatMap

We already know how to transform objects of a stream into another type of objects with map. The map operation is limited because every object can only be mapped to exactly one other object. But what if we want to transform one object into multiple others or none at all? This is where flatMap comes to the rescue.

To to some code with flatMap, let’s codify a type hierarchy.

class Foo {
    String name;
    List<Bar> bars = new ArrayList<>();

    Foo(String name) {
        this.name = name;
    }
}

class Bar {
    String name;

    Bar(String name) {
        this.name = name;
    }
}

Let’s instantiate a couple of objects:

List<Foo> foos = new ArrayList<>();

//    Yes, there are Stream classes for numeric object types, 
//    see java.util.stream API for more information
IntStream
    .range(1, 4)
    .forEach(i -> foos.add(new Foo("Foo" + i)));

foos.forEach(f ->
    IntStream
        .range(1, 4)
        .forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name))));

Now we have a list of three foos each consisting of three bars.

FlatMap accepts a function which returns a stream of objects. So in order to resolve the bar objects of each foo, we just pass the appropriate function:

foos.stream()
    .flatMap(f -> f.bars.stream())
    .forEach(b -> System.out.println(b.name));

// Bar1 <- Foo1
// Bar2 <- Foo1
// Bar3 <- Foo1
// Bar1 <- Foo2
// Bar2 <- Foo2
// Bar3 <- Foo2
// Bar1 <- Foo3
// Bar2 <- Foo3
// Bar3 <- Foo3

As you can see, we’ve successfully transformed the stream of three foo objects into a stream of nine bar objects.

In order to simplify, let’s chain the above code example into a single stream pipeline:

IntStream.range(1, 4)
    .mapToObj(i -> new Foo("Foo" + i))
    .peek(f -> IntStream.range(1, 4)
        .mapToObj(i -> new Bar("Bar" + i + " <- " f.name))
        .forEach(f.bars::add))
    .flatMap(f -> f.bars.stream())
    .forEach(b -> System.out.println(b.name));

So beauty, right?

FlatMap is also available for the Optional class, because Optional is also a monad. So in Optionals you can return a Optional of another type using flatMap operation.

Let’s work on a highly hierarchy:

class Outer {
    Nested nested;
}

class Nested {
    Inner inner;
}

class Inner {
    String foo;
}

To resolve the inner String foo of an outer instance you have to add multiple null checks to prevent possible NullPointerExceptions. Let’s compare with a imperative approach:

Outer outer = new Outer();
if (outer != null && outer.nested != null && outer.nested.inner != null) {
    System.out.println(outer.nested.inner.foo);
}

The same behavior can be safety optimized using flatMap:

Optional.of(new Outer())
    .flatMap(o -> Optional.ofNullable(o.nested))
    .flatMap(n -> Optional.ofNullable(n.inner))
    .flatMap(i -> Optional.ofNullable(i.foo))
    .ifPresent(System.out::println);

Much better, right?

Reduce

The reduce operation combines all objects of the stream into a single result. There are three different kind of reduce methods. The first reduces a stream to exactly one element of the stream:

fruits
    .stream()
    .reduce((f1, f2) -> p1.price > p2.price ? p1 : p2)
    .ifPresent(System.out::println);

The reduce method accepts a BinaryOperator function that’s a BiFunction where both operands share the same type, Fruit in the above. The above example compares both fruits prices in order to return the fruit with the maximum price.

The second reduce method accepts both an identity value and a BinaryOperator accumulator. We can utilize this method to construct a new Fruit object with all names and prices from other fruits in the stream:

Fruit result =
    fruits
        .stream()
        .reduce(new Fruit("", 0), (f1, f2) -> {
            f1.price += f2.price;
            f1.name += f2.name;
            return f1;
        });

System.out.format("name=%s; price=%s", result.name, result.price);

//   name=BananaBlackberryBlueberryBlackCurrant; price:67

The third reduce method accepts three parameters: an identity value, a BiFunction accumulator and a combiner function of type BinaryOperator. Since the identity values type is not restricted to the Fruit type, we can utilize this reduction to determine the sum of prices from all fruits:

Double priceSum = fruits
    .stream()
    .reduce(0, (sum, f) -> sum += f.price, (sum1, sum2) -> sum1 + sum2);

System.out.println(priceSum);  // 67

As you can see the result is 67, but what’s happening exactly under the hood? Let’s extend the above code by some debug output:

Double priceSum = fruits
    .stream()
    .reduce(0,
        (sum, f) -> {
            System.out.format("accumulator: sum=%s; fruit=%s\n", sum, f);
            return sum += f.price;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
            return sum1 + sum2;
        });

// accumulator: sum=0; fruit=Banana
// accumulator: sum=18; fruit=Blackberry
// accumulator: sum=41; fruit=Blueberry
// accumulator: sum=64; fruit=BlackCurrant

As you can see the accumulator function does all the work. It first get called with the initial identity value 0 and the first fruit Banana. In the next three steps sum continually increases by the price of the last steps fruit up to a total price 67.9.

Wait wat? The combiner never gets called? Executing the same stream in parallel will lift the secret:

Double priceSum = fruits
    .stream()
    .reduce(0,
        (sum, f) -> {
            System.out.format("accumulator: sum=%s; fruit=%s\n", sum, f);
            return sum += f.price;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
            return sum1 + sum2;
        });

// accumulator: sum=0. fruit=Banana
// accumulator: sum=0; fruit=Blackberry
// accumulator: sum=0; fruit=Blueberry
// accumulator: sum=0; fruit=BlackCurrant
// combiner: sum1=18; sum2=23
// combiner: sum1=23; sum2=12
// combiner: sum1=41; sum2=35

Executing this stream in parallel results in an entirely different execution behavior. Now the combiner is actually called. Since the accumulator is called in parallel, the combiner is needed to sum up the separate accumulated values.

Let’s dive deeper into parallel streams in the last chapter :).

Parallel Streams

Yes, Streams can be executed in parallel to increase runtime performance to process large amount of data. Parallel streams use a common ForkJoinPool (see https://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html to understand more about ForkJoin) via the static ForkJoinPool.commonPool() method. The size of the underlying thread-pool uses up to five threads depending on the amount of available CPU cores:

ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism());    // 7

The above code was executed in my machine and the common pool was initialized with a parallelism of 7 per default, but that value can be configured by setting as JVM parameter:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=5

You can use parallelStream() on Collections to create a parallel stream of elements (You also can call the intermediate method parallel() on a given stream to convert a sequential stream to a parallel one.

So let’s see an example to understand parallel execution behavior:

filter: d1 [main]
map: d1 [main]
forEach: D1 [main]
filter: e1 [ForkJoinPool.commonPool-worker-7]
map: e1 [ForkJoinPool.commonPool-worker-7]
filter: a2 [ForkJoinPool.commonPool-worker-3]
map: a2 [ForkJoinPool.commonPool-worker-3]
forEach: A2 [ForkJoinPool.commonPool-worker-3]
filter: b2 [ForkJoinPool.commonPool-worker-6]
map: b2 [ForkJoinPool.commonPool-worker-6]
forEach: B2 [ForkJoinPool.commonPool-worker-6]
filter: d2 [ForkJoinPool.commonPool-worker-2]
map: d2 [ForkJoinPool.commonPool-worker-2]
forEach: D2 [ForkJoinPool.commonPool-worker-2]
filter: c1 [ForkJoinPool.commonPool-worker-5]
map: c1 [ForkJoinPool.commonPool-worker-5]
forEach: C1 [ForkJoinPool.commonPool-worker-5]
filter: b1 [ForkJoinPool.commonPool-worker-1]
map: b1 [ForkJoinPool.commonPool-worker-1]
forEach: B1 [ForkJoinPool.commonPool-worker-1]
filter: a1 [ForkJoinPool.commonPool-worker-4]
map: a1 [ForkJoinPool.commonPool-worker-4]
forEach: A1 [ForkJoinPool.commonPool-worker-4]
filter: e2 [ForkJoinPool.commonPool-worker-3]
map: e2 [ForkJoinPool.commonPool-worker-3]
forEach: E2 [ForkJoinPool.commonPool-worker-3]
forEach: E1 [ForkJoinPool.commonPool-worker-7]
filter: c2 [main]
map: c2 [main]
forEach: C2 [main]

As you can see in above output, the parallel stream utilizes all available threads from the ForkJoinPool for execute the stream operations. The output may differ in consecutive runs because the behavior which particular thread is actually used is non-deterministic.

Let’s add a sort operation to the parallel stream:

filter: a1 [ForkJoinPool.commonPool-worker-3]
map: a1 [ForkJoinPool.commonPool-worker-3]
filter: a2 [ForkJoinPool.commonPool-worker-2]
map: a2 [ForkJoinPool.commonPool-worker-2]
filter: b1 [ForkJoinPool.commonPool-worker-1]
map: b1 [ForkJoinPool.commonPool-worker-1]
filter: d2 [ForkJoinPool.commonPool-worker-4]
map: d2 [ForkJoinPool.commonPool-worker-4]
filter: b2 [ForkJoinPool.commonPool-worker-6]
map: b2 [ForkJoinPool.commonPool-worker-6]
filter: d1 [main]
map: d1 [main]
filter: e1 [ForkJoinPool.commonPool-worker-7]
map: e1 [ForkJoinPool.commonPool-worker-7]
filter: c1 [ForkJoinPool.commonPool-worker-5]
map: c1 [ForkJoinPool.commonPool-worker-5]
filter: e2 [ForkJoinPool.commonPool-worker-2]
map: e2 [ForkJoinPool.commonPool-worker-2]
filter: c2 [ForkJoinPool.commonPool-worker-3]
map: c2 [ForkJoinPool.commonPool-worker-3]
sort: A2 <> A1 [main]
sort: B1 <> A2 [main]
sort: C2 <> B1 [main]
sort: C1 <> C2 [main]
sort: C1 <> B1 [main]
sort: C1 <> C2 [main]
sort: B2 <> B1 [main]
sort: B2 <> C2 [main]
sort: B2 <> C1 [main]
sort: D1 <> B2 [main]
sort: D1 <> C2 [main]
sort: E1 <> B2 [main]
sort: E1 <> C2 [main]
sort: E1 <> D1 [main]
sort: D2 <> C1 [main]
sort: D2 <> D1 [main]
sort: D2 <> E1 [main]
sort: E2 <> C1 [main]
sort: E2 <> D2 [main]
sort: E2 <> E1 [main]
forEach: D1 [main]
forEach: B2 [ForkJoinPool.commonPool-worker-6]
forEach: A1 [ForkJoinPool.commonPool-worker-7]
forEach: C1 [ForkJoinPool.commonPool-worker-3]
forEach: A2 [ForkJoinPool.commonPool-worker-2]
forEach: C2 [ForkJoinPool.commonPool-worker-5]
forEach: B1 [ForkJoinPool.commonPool-worker-1]
forEach: E1 [ForkJoinPool.commonPool-worker-4]
forEach: E2 [ForkJoinPool.commonPool-worker-6]
forEach: D2 [main]

Look strange, right? As you can see, sort operations are executed sequentially on the main thread only. Actually, sort on a parallel stream uses the Java 8 method Arrays.parallelSort() under the hood. As stated in Javadoc this method decides on the length of the array if sorting will be performed sequentially or in parallel.

If the length of the specified array is less than the minimum granularity, then it is sorted using the appropriate Arrays.sort method.

Remember the reduce example of the last chapter? We know that the combiner function is only called in parallel but not in sequential streams. Let’s see which threads are actually used:

accumulator: sum=0; fruit=br.com.learn.java8.Fruit@512fbfcb [ForkJoinPool.commonPool-worker-3]
accumulator: sum=0; fruit=br.com.learn.java8.Fruit@4e50df2e [main]
accumulator: sum=0; fruit=br.com.learn.java8.Fruit@2228f983 [ForkJoinPool.commonPool-worker-2]
accumulator: sum=0; fruit=br.com.learn.java8.Fruit@6f10ed20 [ForkJoinPool.commonPool-worker-1]
combiner: sum1=2; sum2=1 [ForkJoinPool.commonPool-worker-1]
combiner: sum1=7; sum2=8 [ForkJoinPool.commonPool-worker-2]
combiner: sum1=3; sum2=15 [ForkJoinPool.commonPool-worker-2]

Summarizing, parallel streams can really increase the performance to streams with a large amount of input elements. But keep in mind that some parallel stream operations like reduce and collect need additional computations (combine operations) that isn’t needed when executed sequentially.

Furthermore we’ve learned that all parallel stream operations share the same JVM-wide common ForkJoinPool. So you probably want to avoid implementing slow blocking stream operations since that could potentially slow down other parts of your application which rely heavily on parallel streams.

Thanks!

Este conteúdo foi publicado primeiro em: viniciusluisr.wordpress.com

Related Articles:

—————————————-­—————————————-­—-

Este Post é um oferecimento de Acelerato – Gestão de Projetos ágeis e Help Desk

Não perca tempo, acesse acelerato.com, cadastre-se gratuitamente e descubra como podemos ajudá-lo.

Post Footer automatically generated by Add Post Footer Plugin for wordpress.

Leave a comment