
FlatMap, the Egg of Reactive Programming
Many problems we face in software development usually end up being stories in the same category as the egg of Columbus story. If you are unfamiliar with the story, here is a very quick summary (from Wikipedia):
An egg of Columbus or Columbus’ egg (Italian: uovo di Colombo [ˈwɔːvo di koˈlombo]) refers to a brilliant idea or discovery that seems simple or easy after the fact. The expression refers to an apocryphal story in which Christopher Columbus, having been told that discovering the Americas was inevitable and no great accomplishment, challenges his critics to make an egg stand on its tip. After his challengers give up, Columbus does it himself by tapping the egg on the table to flatten its tip.
When flatMap doesn’t make sense… until it does!
In a synchronous context, the behavior of applying a flatMap
operation over a stream of data is consistent and deterministic. Iterating over e.g. the stream [1, 2]
, generating 2 elements out each initial number e.g. [[11, 21], [12, 22]]
and flattening to [11, 21, 12, 22]
will always return the same result for the same input stream [1, 2]
no matter how many times we execute the transformation. But in a reactive context where concepts of concurrency and asynchronicity are introduced, the rules we assumed to be true might not be anymore. Invoking the same flatMap
operation twice over the same stream [1, 2]
could return [11, 12, 21, 22]
and [21, 12, 22, 11]
respectively. So how we can simplify the understanding of a reactive flatMap
operation to a point where it feels like “tapping the egg on the table to flatten its tip”? Keep on reading…
A concrete example first
First, a bit of context. As I was preparing a talk for Oracle Code One about the inner working of the Assembler library, I came across an issue where some of my unit tests related to the Flux
support (used to integrate the Assembler library with Project Reactor) started to fail when parallelism was introduced in the FluxAdapter
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
Assembler<Customer, Flux<Transaction>> assembler = assemblerOf(Transaction.class) .withIdExtractor(Customer::getCustomerId) .withAssemblerRules( oneToOne(this::getBillingInfoForCustomers, BillingInfo::getCustomerId), oneToManyAsList(this::getAllOrdersForCustomers, OrderItem::getCustomerId), Transaction::new) .using(fluxAdapter(parallel())); StepVerifier.create(Flux.fromIterable(getCustomers()) .buffer(3) .flatMap(assembler::assemble)) .expectSubscription() .expectNext(transaction1, transaction2, transaction3, transaction1, transaction2) .verifyComplete(); |
Suddenly the aggregated transaction instances returned from the Flux
were out of order e.g. [transaction2, transaction3, transaction1, transaction2, transaction1]
and JUnit started complaining.
No, really! A trivial example please!
The code above was the trigger that led to this post, but it doesn’t help much in understanding how flatMap
behaves underneath the surface. So to isolate the behavior of flatMap
and understand how we can end up with out of order elements, let’s create a very simple and basic example. We will transform a series of characters from lowercase to uppercase. We will group those characters into sub-groups of 3 characters, then we will apply our lowercase to uppercase function on those sub-groups. In a real world scenario, this kind of grouping could be used to e.g. decrease the number of database calls.
For the purpose of better observing how flatMap
behaves in Project Reactor, our transforming function will introduce a delay of 1 second, it will also attach to each character the name of the current thread in which the lowercase to uppercase transformation was executed (using List
as a Tuple
just for testing purpose, not recommended in production code):
1 2 3 4 5 6 7 8 |
private List<String> toUpperCase(String s) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return List.of(s.toUpperCase(), Thread.currentThread().getName()); } |
So let’s try:
As we can see, for each sub-group of 3 characters, we capitalize each character ([A, B, C], [D, E, F], [G, H, I]
), then we flatten those sub-lists into a continuous Flux
(A, B, C, D, E, F, G, H, I
). One interesting thing here is that everything is executed on the main thread. It’s expected, but it illustrates the fact that it’s not because we are using a reactive framework like Project Reactor or RxJava that it will automatically take care of magically process data in parallel and asynchronously.
But flatMap is way too slow
I hear you, 9 seconds is a bit too much. In a real world scenario, sequentially calling REST services or making a database call for each sub-group of data might significantly impact performance. This would be a good use case to introduce parallelism between our sub-groups:
Great! What took 9 seconds is now cut to 3 seconds. We can also see that by assigning a parallel scheduler to each sub-group through the subscribeOn(parallel())
call, we indeed distribute the processing of each sub-group on separate threads.
Out of order? Hold on a second here…
It’s indeed much faster, but the output is now [G, A, D, E, B, H, F, C, I]
, WHAT THE…? It looks like flatMap
doesn’t keep ordering of elements. Sometimes the order doesn’t matter, which gives a lot more flexibility for optimizations, but what if it does? It really kills me but I guess I have no choice… I have to read the documentation now… ;-)!
A New Hope
I’m not talking about Star Wars episode IV, but it seems there is actually something from the Project Reactor documentation that might help fix my ordering issue, what is that concatMap
method? From the documentation:
Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, sequentially and preserving order using concatenation.
That’s exactly what we want, let’s try it:
Awesome, order is restored!! And our sub-groups are still executed on separate threads… but…
Why is it slow again?
We are back to 9 seconds processing! What’s wrong? Maybe I’ll have to read more than the first line in the documentation after all…
There it is, this operator waits for one inner to complete before generating the next one and subscribing to it
. So it is really serializing the execution of our sub-groups no matter if they were scheduled to run sequentially or in parallel. But what is that flatMapSequential
link that I’m seeing?
The Last Hope?
Based on the documentation of flatMapSequential
, “this time” it might be what we are looking for:
Eager execution of all inner Publishers
(our sub-groups), buffering of late inner Publishers
to let earlier Publishers
catch up if needed, looks promising:
We kept the ordering, we see each sub-group is ran in parallel, we are back to 3 seconds execution, we did it!!!
Conclusion
Project Reactor provides a very rich reactive api to support many concurrency scenarios. The documentation is very well written, but sometimes there is nothing like an actual code demo to fully understand the true meaning behind each word from the documentation. Since Java 8 with the new stream api, we were used to hardcode the word “flatMap
” in our minds when thinking about the concept of flattening inner streams created from each element of our initial stream. But in reactive programming, many reactive frameworks (including Project Reactor) expose different flavors and names for flattening operations. flatMap
, concatMap
and flatMapSequential
are good examples of those flavors to handle different concurrency scenarios. Once you know about them, handling concurrency becomes like “tapping the egg on the table to flatten its tip.”