In Part 1, Part 2, and Part 3, we covered hazards, primitives, and execution models. In this final part, we will focus on structured concurrency, fan-out/fan-in, fail-fast cancellations, timeout propagation, resource scoping, and observability with thread dumps and Java Flight Recorder (JFR), emphasizing when to choose which pattern.
1. Structured Concurrency #
Structured concurrency treats a group of related tasks as a single unit whose lifetime is bounded by a lexical scope, rather than a set of detached threads that outlive the caller.
In Java 21, this is embodied in StructuredTaskScope, which lets a parent thread fork substasks, wait for them as a group, and then guarantees that all subtasks are either completed or cancelled when the scope exits.
At a high level, structured concurrency gives three key benefits-
- Unified Error Handling - exceptions from subtasks are aggregated and rethrown to the parent in a controlled way.
- Prompt cancellation - failure or success conditions can automatically cancel sibling tasks via policies like ShutdownOnFailure or ShutdownOnSuccess.
- Improved observability - scopes create natural units for logging, metrics and profiling.
When to choose structured concurrency?
-
You have a request‑scoped orchestration that fans out to multiple backends (e.g., pricing + inventory + recommendations) and you want a single, well‑defined lifecycle for all the work associated with that request.
-
You want fail‑fast semantics: as soon as one backend fails or violates an SLO, you cancel the rest and return an error or partial result.
-
You need deadline and cancellation propagation: a user cancels, or an upstream deadline expires, and every in‑flight subtask must stop quickly.
-
You care about operability: it should be easy to answer “what are we doing for this request right now?” from logs, metrics, or JFR recordings.
import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.StructuredTaskScope;
// Domain results
record Price(double amount) {}
record Inventory(int available) {}
record Recommendation(String text) {}
// Aggregated response
record ProductView(Price price, Inventory inventory, Recommendation recommendation) {}
class ProductService {
private final PriceClient priceClient;
private final InventoryClient inventoryClient;
private final RecommendationClient recommendationClient;
ProductService(PriceClient priceClient,
InventoryClient inventoryClient,
RecommendationClient recommendationClient) {
this.priceClient = priceClient;
this.inventoryClient = inventoryClient;
this.recommendationClient = recommendationClient;
}
public ProductView buildProductView(String productId, Duration deadline)
throws InterruptedException, ExecutionException {
// Scoped to this request; all child tasks must finish or be cancelled
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var priceTask = scope.fork(() -> priceClient.fetchPrice(productId));
var inventoryTask = scope.fork(() -> inventoryClient.fetchInventory(productId));
var recTask = scope.fork(() -> recommendationClient.fetchRecommendations(productId));
// Wait for all tasks or first failure; you could combine with a timer thread for deadline
scope.join();
scope.throwIfFailed(); // propagate first failure if any
// After this point, all subtasks are either successful or cancelled
Price price = priceTask.get();
Inventory inventory = inventoryTask.get();
Recommendation rec = recTask.get();
return new ProductView(price, inventory, rec);
}
}
}
interface PriceClient {
Price fetchPrice(String productId) throws Exception;
}
interface InventoryClient {
Inventory fetchInventory(String productId) throws Exception;
}
interface RecommendationClient {
Recommendation fetchRecommendations(String productId) throws Exception;
}- The try‑with‑resources block defines the lifetime of all subtasks; nothing leaks beyond it.
- ShutdownOnFailure encodes a policy: this request only makes sense if all subtasks succeed.
- Cancellation is automatic: if any task fails, the scope cancels the rest, which is easier than manually tracking and cancelling Futures.
2. Fan‑Out / Fan‑In #
Fan‑out/fan‑in is a concurrency pattern where a parent splits work into independent subtasks (fan‑out), runs them in parallel, then aggregates their results (fan‑in).
Fan‑out/fan‑in is primarily a latency optimization pattern: instead of calling backends sequentially, you call them in parallel and pay only the slowest latency, plus a small orchestration overhead. It also helps articulate parallelism vs. concurrency: you want to fan out only when work is independent and safely parallelizable.
In a system, you choose fan-out/fan-in when:
-
You have several independent IO‑bound calls (e.g., to microservices, caches, or databases) that can execute safely in parallel.
-
You are optimizing p99 latency by shaving off sequential waits.
-
You need to aggregate results into a single response (e.g., search results from multiple shards, pricing from multiple providers, or recommendations from different sources).
This can be implemented using both Java 8+ style CompletableFuture or Java 21+ StructuredTaskScope.
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
// CompletableFuture
class SearchService {
private final ExecutorService ioExecutor = Executors.newFixedThreadPool(32);
private final ShardClient shardClient;
SearchService(ShardClient shardClient) {
this.shardClient = shardClient;
}
public List<SearchResult> search(String query, List<String> shardIds) {
// fan-out: issue one async call per shard
List<CompletableFuture<SearchResult>> futures = shardIds.stream()
.map(shardId -> CompletableFuture.supplyAsync(
() -> shardClient.searchShard(shardId, query), ioExecutor))
.collect(Collectors.toList());
// fan-in: join all results
CompletableFuture<Void> all = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0]));
// this blocks the current thread; in real code you might return the CF instead
all.join();
return futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
}
interface ShardClient {
SearchResult searchShard(String shardId, String query);
}
record SearchResult(String shardId, List<String> documents) {}import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.StructuredTaskScope;
// StructuredTaskScope
class SearchService21 {
private final ShardClient shardClient;
SearchService21(ShardClient shardClient) {
this.shardClient = shardClient;
}
public List<SearchResult> search(String query, List<String> shardIds)
throws InterruptedException, ExecutionException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
List<StructuredTaskScope.Subtask<SearchResult>> subtasks = shardIds.stream()
.map(shardId -> scope.fork(() -> shardClient.searchShard(shardId, query)))
.toList();
scope.join(); // wait for all or first failure
scope.throwIfFailed(); // propagate first failure
return subtasks.stream()
.map(StructuredTaskScope.Subtask::get)
.toList();
}
}
}3. Fail‑Fast Cancellation and Timeout Propagation #
Fail‑fast cancellation is a policy where a composite operation aborts as soon as a critical subtask fails or a constraint is violated, rather than waiting for all subtasks to complete.
In Java 21, StructuredTaskScope.ShutdownOnFailure embodies this policy: when any subtask fails, the scope cancels remaining tasks and rethrows the error to the caller.
In classic Java, this has to be coded manually by cancelling Futures or using orchestration logic around CompletableFutures.
import java.util.concurrent.ExecutionException;
import java.util.concurrent.StructuredTaskScope;
class Aggregator {
String aggregate() throws InterruptedException, ExecutionException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var fast = scope.fork(() -> slowCall("fast", 100));
var slow = scope.fork(() -> slowCall("slow", 5_000));
var failing = scope.fork(() -> failCall());
scope.join(); // wait for all tasks to finish or be cancelled
scope.throwIfFailed(); // rethrow first failure
// If we reach here, no subtask failed
return fast.get() + slow.get() + failing.get();
}
}
private String slowCall(String name, long millis) throws InterruptedException {
Thread.sleep(millis);
return name;
}
private String failCall() {
throw new IllegalStateException("Upstream service failure");
}
}import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;
class AggregatorCF {
String aggregate(List<String> ids) {
List<CompletableFuture<String>> futures = ids.stream()
.map(this::callServiceAsync)
.collect(Collectors.toList());
CompletableFuture<Void> all = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0]));
try {
all.join(); // blocks until all complete or one fails
} catch (CompletionException ex) {
// Cancel all remaining work on first failure
futures.forEach(f -> f.cancel(true));
throw ex;
}
return futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.joining(","));
}
private CompletableFuture<String> callServiceAsync(String id) {
return CompletableFuture.supplyAsync(() -> {
// do remote call, possibly throwing
return "value-" + id;
});
}
}4. Timeout Propagation #
Timeout propagation is the practice of computing a deadline at the outermost layer of a request (e.g., HTTP server) and passing it explicitly to all downstream operations so that they can enforce consistent time limits.
In Java this often shows up as a Duration or deadline Instant parameter, combined with APIs like orTimeout, completeOnTimeout, or executor methods that take timeouts.
Java 9+ CompletableFuture added methods like orTimeout and completeOnTimeout to help enforce per‑operation timeouts.
Structured concurrency complements this by allowing a single deadline for an entire scope; a common pattern is to race the scope against a timer task and cancel the scope if the deadline expires.
Timeout propagation prevents “zombie” work that continues after the client has given up, which is critical for throughput, fairness, and avoiding cascading failures.
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.StructuredTaskScope;
class TimeoutOrchestrator21 {
public List<Result> orchestrate(List<String> ids, Duration overallTimeout)
throws InterruptedException, ExecutionException {
Instant deadline = Instant.now().plus(overallTimeout);
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var tasks = ids.stream()
.map(id -> scope.fork(() -> callServiceRespectingDeadline(id, deadline)))
.toList();
// Simple approach: loop until deadline or scope completion
while (true) {
scope.joinUntil(deadline); // waits until subtasks done or deadline
if (scope.isShutdown()) {
break; // all tasks completed or failed
}
if (Instant.now().isAfter(deadline)) {
scope.shutdown(); // cancel remaining tasks
throw new RuntimeException("Overall timeout expired");
}
}
scope.throwIfFailed();
return tasks.stream().map(StructuredTaskScope.Subtask::get).toList();
}
}
private Result callServiceRespectingDeadline(String id, Instant deadline) throws Exception {
long remainingMillis = Duration.between(Instant.now(), deadline).toMillis();
if (remainingMillis <= 0) {
throw new java.util.concurrent.TimeoutException("Deadline already expired");
}
// Use remainingMillis with your HTTP client / DB driver timeouts
return new Result(id);
}
}import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
class TimeoutOrchestrator {
public CompletableFuture<List<Result>> orchestrate(List<String> ids, Duration overallTimeout) {
Instant deadline = Instant.now().plus(overallTimeout);
List<CompletableFuture<Result>> futures = ids.stream()
.map(id -> callServiceWithDeadline(id, deadline))
.toList();
CompletableFuture<Void> all = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
return all.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.toList()
);
}
private CompletableFuture<Result> callServiceWithDeadline(String id, Instant deadline) {
long remainingMillis = Duration.between(Instant.now(), deadline).toMillis();
if (remainingMillis <= 0) {
CompletableFuture<Result> failed = new CompletableFuture<>();
failed.completeExceptionally(new TimeoutException("Deadline already expired"));
return failed;
}
return serviceCallAsync(id)
.orTimeout(remainingMillis, java.util.concurrent.TimeUnit.MILLISECONDS);
}
private CompletableFuture<Result> serviceCallAsync(String id) {
// Implementation calls remote service asynchronously
return CompletableFuture.supplyAsync(() -> new Result(id));
}
}
record Result(String id) {}5. Resource Scoping #
Resource scoping ties the lifetime of resources (threads, thread pools, database connections, file handles, metrics contexts, etc.) to clearly defined lexical scopes or ownership boundaries.
In Java, this shows up as try‑with‑resources, ExecutorService that is created and shut down within a component, and now StructuredTaskScope which ensures that spawned tasks cannot outlive the scope.
Bad patterns include globally shared executors that are never shut down, or threads started in one layer that accidentally keep running after the caller has timed out. Resource scoping reduces leaks, contention, and coordination bugs.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
class ScopedExecutorExample {
void processBatch(Runnable task, int parallelism) {
// Executor is scoped to this batch only
try (var executor = Executors.newFixedThreadPool(parallelism)) {
for (int i = 0; i < parallelism; i++) {
executor.submit(task);
}
// executor is automatically shutdown at end of try-with-resources
}
}
}import java.util.concurrent.StructuredTaskScope;
class ResourceScopedService {
public void handleRequest(String requestId) throws InterruptedException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// All subtasks are logically scoped to this request
scope.fork(() -> logAuditTrail(requestId));
scope.fork(() -> updateAnalytics(requestId));
scope.join();
scope.throwIfFailed();
// Any unfinished work is cancelled here
}
}
private Void logAuditTrail(String requestId) {
// perform logging, possibly slow IO
return null;
}
private Void updateAnalytics(String requestId) {
// send events to analytics pipeline
return null;
}
}6. Observability with Thread Dumps and JFR #
A thread dump is a snapshot of all Java threads and their stack traces at a moment in time, typically captured using tools like jstack or Java Mission Control.
JFR is a low‑overhead profiling and diagnostics tool built into the JVM that records events such as method samples, lock contention, GC, and IO, which can be started at launch or attached to a running process.