Java

Java Streams API: Advanced Patterns and Performance

Go beyond map/filter/collect. Master Java Streams API with flatMap, collectors, parallel streams, custom collectors, and performance considerations for production code.

Sachin Sarawgi·March 21, 2025·9 min read
#java#streams#functional programming#collections#performance

Most engineers use Java Streams for map/filter/collect and stop there. The full Streams API is significantly more powerful: custom collectors that aggregate in a single pass, flatMap for flattening nested structures, parallel streams that scale across CPU cores, and collector combinators that compose complex aggregations. This article covers the patterns that separate proficient from expert Java developers.

FlatMap: Working with Nested Structures

flatMap transforms each element into a stream and merges all those streams. Think of it as a two-step operation: first map each element to a stream, then flatten all those streams into one. This is the correct tool whenever you have a collection of collections and need to work with the inner elements directly.

// Problem: List of orders, each with List of items — get all items
List<Order> orders = getOrders();

// Wrong (returns Stream<List<OrderItem>>):
Stream<List<OrderItem>> wrong = orders.stream().map(Order::getItems);

// Correct (returns Stream<OrderItem>):
Stream<OrderItem> allItems = orders.stream()
    .flatMap(order -> order.getItems().stream());

// Practical: find all unique product IDs ordered by a customer
Set<String> productIds = orders.stream()
    .flatMap(order -> order.getItems().stream())
    .map(OrderItem::getProductId)
    .collect(Collectors.toSet());

// Chain: orders with items above threshold
List<OrderItem> expensiveItems = orders.stream()
    .flatMap(order -> order.getItems().stream()
        .filter(item -> item.getPrice().compareTo(BigDecimal.valueOf(100)) > 0)
        .map(item -> item.withOrderId(order.getId()))  // Add context from outer
    )
    .collect(Collectors.toList());

// Optional flatMap: chain of optional operations
Optional<String> customerEmail = findOrder("order-123")
    .flatMap(order -> findCustomer(order.getCustomerId()))
    .flatMap(customer -> Optional.ofNullable(customer.getEmail()));
// Returns empty if any step is empty — clean Optional chaining

The Optional.flatMap usage at the end is a particularly useful pattern: it lets you chain a sequence of operations where any step might produce an empty result, without nested null checks or if statements.

Collectors: Beyond toList()

The real power of the Streams API lies in its collectors. While toList() handles the common case, Collectors includes a rich set of aggregation operations that can replace what would otherwise be multiple passes over the data or complex imperative loops. The key insight is that collectors are composable — you can nest them to express sophisticated aggregations in a single, readable pipeline:

// groupingBy: most powerful collector
Map<String, List<Order>> ordersByStatus = orders.stream()
    .collect(Collectors.groupingBy(Order::getStatus));

// groupingBy with downstream collector
Map<String, Long> countByStatus = orders.stream()
    .collect(Collectors.groupingBy(
        Order::getStatus,
        Collectors.counting()
    ));

// Multi-level grouping
Map<String, Map<String, BigDecimal>> totalByCustomerAndStatus = orders.stream()
    .collect(Collectors.groupingBy(
        Order::getCustomerId,
        Collectors.groupingBy(
            Order::getStatus,
            Collectors.reducing(
                BigDecimal.ZERO,
                Order::getTotal,
                BigDecimal::add
            )
        )
    ));

// partitioningBy: split into true/false
Map<Boolean, List<Order>> activeVsCancelled = orders.stream()
    .collect(Collectors.partitioningBy(
        order -> !order.getStatus().equals("CANCELLED")
    ));
List<Order> activeOrders = activeVsCancelled.get(true);

// toMap with merge function (handles duplicate keys)
Map<String, BigDecimal> totalByCustomer = orders.stream()
    .collect(Collectors.toMap(
        Order::getCustomerId,
        Order::getTotal,
        BigDecimal::add  // Merge: add totals for same customer
    ));

// Statistics
IntSummaryStatistics priceStats = products.stream()
    .mapToInt(Product::getPriceCents)
    .summaryStatistics();
System.out.printf("Min: %d, Max: %d, Avg: %.2f, Count: %d%n",
    priceStats.getMin(), priceStats.getMax(),
    priceStats.getAverage(), priceStats.getCount());

Custom Collectors

When built-in collectors don't fit, build your own. A custom collector is composed of four functions: a supplier that creates the mutable accumulator, an accumulator that folds each element into it, a combiner that merges two accumulators (needed for parallel streams), and a finisher that converts the accumulator to the final result. This structure might seem verbose at first, but it gives you complete control over how the aggregation behaves:

// Custom collector: find top N elements by a comparator in a single pass
// (more efficient than sorting all then taking first N)
public static <T> Collector<T, ?, List<T>> topN(int n, Comparator<T> comparator) {
    return Collector.of(
        () -> new PriorityQueue<>(n + 1, comparator),  // Supplier: create accumulator
        (queue, element) -> {                            // Accumulator: add element
            queue.offer(element);
            if (queue.size() > n) queue.poll();         // Keep only top N
        },
        (q1, q2) -> {                                   // Combiner: merge two accumulators (parallel)
            q2.forEach(q1::offer);
            while (q1.size() > n) q1.poll();
            return q1;
        },
        queue -> {                                       // Finisher: convert to result
            List<T> result = new ArrayList<>(queue);
            result.sort(comparator.reversed());
            return result;
        }
    );
}

// Usage: top 5 most expensive products in a single pass
List<Product> top5 = products.stream()
    .collect(topN(5, Comparator.comparing(Product::getPriceCents)));

// Another useful custom collector: running total
public static Collector<BigDecimal, ?, List<BigDecimal>> runningTotal() {
    return Collector.of(
        () -> new ArrayList<BigDecimal>() {{ add(BigDecimal.ZERO); }},
        (list, amount) -> list.add(list.get(list.size() - 1).add(amount)),
        (list1, list2) -> {  // Not meaningful for parallel, but required
            BigDecimal lastTotal = list1.get(list1.size() - 1);
            list2.stream().skip(1).forEach(d -> list1.add(lastTotal.add(d)));
            return list1;
        },
        list -> list.subList(1, list.size())  // Remove initial zero
    );
}

// Running total of daily revenue
List<BigDecimal> cumulativeRevenue = dailyRevenue.stream()
    .collect(runningTotal());

The topN collector is more efficient than the naive sorted().limit(n) approach because it maintains a bounded priority queue of size n — it never needs to sort the entire input, making it O(n log k) instead of O(n log n).

teeing: Combine Two Collectors

Now that you have a feel for how collectors compose, Java 12's Collectors.teeing takes composability one step further by letting you apply two collectors to the same stream simultaneously and combine their results. This eliminates the need to iterate over the data twice when you need two independent aggregations:

// Get count AND total in one pass
record OrderStats(long count, BigDecimal total) {}

OrderStats stats = orders.stream()
    .collect(Collectors.teeing(
        Collectors.counting(),
        Collectors.reducing(BigDecimal.ZERO, Order::getTotal, BigDecimal::add),
        OrderStats::new
    ));

System.out.printf("Count: %d, Total: %s%n", stats.count(), stats.total());

// Split stream into two lists in one pass (more efficient than two filter calls)
record ActiveAndCancelled(List<Order> active, List<Order> cancelled) {}

ActiveAndCancelled split = orders.stream()
    .collect(Collectors.teeing(
        Collectors.filtering(o -> !o.isCancelled(), Collectors.toList()),
        Collectors.filtering(Order::isCancelled, Collectors.toList()),
        ActiveAndCancelled::new
    ));

The single-pass guarantee is what makes teeing valuable at scale — when your input is a large stream from a database or file, avoiding a second pass over the data has a meaningful impact on both time and memory.

Parallel Streams: When and How

With single-threaded streams well understood, it's tempting to reach for parallelStream() everywhere. Resist that urge. Parallel streams split the stream across ForkJoinPool.commonPool() threads and have measurable overhead — they help only when the per-element work is expensive enough to outweigh the cost of splitting and merging:

// When parallel streams help:
// - Large data sets (> 100K elements)
// - CPU-intensive operations per element
// - Stateless, independent operations
// - Ordered output doesn't matter (order = merge cost)

// Good candidate: CPU-intensive transformation of large list
List<ProcessedReport> reports = rawReports.parallelStream()
    .map(r -> processReport(r))          // CPU-intensive
    .filter(r -> r.isSignificant())
    .collect(Collectors.toList());       // Order doesn't matter

// Benchmark: this MIGHT be 4x faster on 4 cores

// When parallel is SLOWER:
// Small data: thread overhead > computation benefit (< 1000 elements)
List<String> small = Arrays.asList("a", "b", "c");
small.parallelStream().map(String::toUpperCase).collect(Collectors.toList());
// Slower than sequential: creating ForkJoin tasks has 20μs overhead

// Stateful operations (bad for parallel):
List<Integer> result = new ArrayList<>();  // NOT thread-safe!
numbers.parallelStream()
    .filter(n -> n > 0)
    .forEach(result::add);  // RACE CONDITION — don't do this
// Correct: use collectors, not forEach with external state

// IO-bound operations (no benefit from parallel):
List<String> fetched = urls.parallelStream()
    .map(url -> fetchUrl(url))  // Blocked on I/O, not CPU
    .collect(Collectors.toList());
// Use virtual threads or CompletableFuture instead

// Custom thread pool for parallel streams:
ForkJoinPool customPool = new ForkJoinPool(8);
List<Result> results = customPool.submit(() ->
    items.parallelStream()
        .map(this::expensiveOperation)
        .collect(Collectors.toList())
).get();

Using a custom ForkJoinPool as shown at the end is an important technique when you need to isolate parallel stream work from the JVM's shared common pool — for example, when your application also uses other libraries that rely on ForkJoinPool.commonPool() and you do not want them to compete for threads.

Stream Performance Pitfalls

Even sequential streams have performance traps worth knowing. These pitfalls are easy to miss during code review because the code looks idiomatic, but each one introduces unnecessary overhead that compounds at scale:

// Pitfall 1: Unboxing overhead with boxed streams
// BAD: Integer stream (boxing/unboxing overhead)
int sum = numbers.stream()
    .map(Integer::intValue)    // unnecessary
    .reduce(0, Integer::sum);  // boxes again

// GOOD: Use primitive streams (IntStream, LongStream, DoubleStream)
int sum = numbers.stream()
    .mapToInt(Integer::intValue)  // Returns IntStream (unboxed)
    .sum();                        // No boxing

// Pitfall 2: Collecting to list then re-streaming
// BAD: Creates intermediate list
List<String> intermediate = products.stream()
    .filter(p -> p.getPrice() > 100)
    .map(Product::getName)
    .collect(Collectors.toList());  // Materialized here

long count = intermediate.stream().count();  // Re-stream just to count!

// GOOD: Chain in one pipeline
long count = products.stream()
    .filter(p -> p.getPrice() > 100)
    .count();  // Terminal operation — no intermediate list

// Pitfall 3: sorted() on large streams
// Sorting requires all elements → breaks laziness
// Only sort when the result actually needs ordering

// Pitfall 4: distinct() with expensive equals/hashCode
// distinct() maintains a HashSet internally — expensive for complex objects
// Consider: sort first, then deduplicate (more cache-friendly)

// Pitfall 5: findFirst() vs findAny() in parallel
numbers.parallelStream().filter(n -> n > 100).findFirst(); // Must find first — expensive ordering
numbers.parallelStream().filter(n -> n > 100).findAny();   // Any match — much faster parallel

The findFirst() vs findAny() distinction in parallel streams is particularly counter-intuitive: findFirst() looks like it should be faster because it stops early, but in a parallel context it forces the JVM to coordinate across threads to guarantee the encounter-order result, which is significantly more expensive than findAny().

Real-World Data Processing Patterns

The previous sections covered individual features in isolation. This final example brings them together into a realistic reporting pipeline that you might encounter in a production analytics service. Notice how it uses nested teeing inside groupingBy to compute two aggregates per group in a single pass, then transforms the map into a sorted, limited result set:

// Pattern: Transform order data for reporting
record ReportLine(String customerId, String customerName,
                  long orderCount, BigDecimal totalRevenue) {}

List<ReportLine> report = orders.stream()
    .collect(Collectors.groupingBy(
        Order::getCustomerId,
        Collectors.teeing(
            Collectors.counting(),
            Collectors.reducing(BigDecimal.ZERO, Order::getTotal, BigDecimal::add),
            (count, total) -> new Object[]{count, total}  // Temp holder
        )
    ))
    .entrySet().stream()
    .map(entry -> {
        String customerId = entry.getKey();
        Object[] stats = (Object[]) entry.getValue();
        Customer customer = customerCache.get(customerId);
        return new ReportLine(
            customerId,
            customer.getName(),
            (Long) stats[0],
            (BigDecimal) stats[1]
        );
    })
    .sorted(Comparator.comparing(ReportLine::totalRevenue).reversed())
    .limit(100)  // Top 100 customers
    .collect(Collectors.toList());

The Streams API is functional programming applied to Java. The core insight: describe what you want (filter active orders, group by customer, sum totals), not how to do it (loop, if, accumulate, sort). Once you internalize this declarative style, you write code that's shorter, more readable, and easier to parallelize. The advanced pieces — custom collectors, teeing, flatMap — handle the 20% of use cases that map/filter/collect can't cover.

📚

Recommended Resources

Java Masterclass — UdemyBest Seller

Comprehensive Java course covering Java 17+, OOP, concurrency, and modern APIs.

View Course
Effective Java, 3rd EditionMust Read

Joshua Bloch's classic guide to writing clear, correct, and efficient Java code.

View on Amazon
Java Concurrency in Practice

The authoritative book on writing thread-safe, concurrent Java programs.

View on Amazon

Found this useful? Share it: