Skip to content
AgentEnsemble AgentEnsemble
Get Started

Parallel Workstreams in Java Agent Pipelines: Phase-Level Grouping and DAG Execution

Flat task lists work well for sequential pipelines. You define tasks, wire context dependencies, and the framework runs them in order. For a research-then-write pipeline, that’s all you need.

The problem shows up when you have work that genuinely doesn’t need to happen in sequence — multiple independent workstreams that can proceed in parallel, followed by a convergence point where all of them must finish before the next stage can begin.

The task-level context() mechanism can model some of this. The parallel workflow executor infers a dependency graph from context declarations and runs independent tasks concurrently. But it operates at the individual task level. There’s no concept of a named workstream, no defined barrier, and no way to say “these three groups of tasks run at the same time, and this fourth group waits for all three.”

That gap is what the Phase abstraction addresses in AgentEnsemble.

Before getting to code, it helps to have a concrete example of what a flat task list can’t express.

Imagine three diners order different main courses — steak, salmon, and pasta. Each dish has its own sequential preparation steps: prep, cook, plate. All three dishes are prepared simultaneously and served together once every dish is ready.

A flat task list with context-chaining can’t express this cleanly. You’d need to manually wire cross-task dependencies between the three workstreams, and the dependency graph would become a tangle of context references. More importantly, the intent is lost: a reader looking at the task list can’t immediately see that you have three independent workstreams converging into one.

A phase is a named group of tasks. Phases declare dependencies on each other via after(). Independent phases execute in parallel — each in its own virtual thread. A phase only starts when all of its declared predecessors have completed.

Phase steak = Phase.builder()
.name("steak")
.task(Task.of("Prepare steak", "Seasoned and at room temperature"))
.task(Task.of("Sear steak", "Medium-rare, rested for 5 minutes"))
.task(Task.of("Plate steak", "Plated with garnish and sauce"))
.build();
Phase salmon = Phase.builder()
.name("salmon")
.task(Task.of("Prepare salmon", "Skin removed, seasoned"))
.task(Task.of("Cook salmon", "Crispy skin, fully cooked"))
.task(Task.of("Plate salmon", "Plated with lemon and herbs"))
.build();
Phase pasta = Phase.builder()
.name("pasta")
.task(Task.of("Boil pasta", "Al dente"))
.task(Task.of("Make sauce", "Reduced tomato sauce, seasoned"))
.task(Task.of("Plate pasta", "Pasta and sauce combined, topped with basil"))
.build();
Phase serve = Phase.builder()
.name("serve")
.after(steak, salmon, pasta) // barrier: waits for all three
.task(Task.of("Deliver all plates", "All three dishes delivered simultaneously"))
.build();
EnsembleOutput output = Ensemble.builder()
.chatLanguageModel(llm)
.phase(steak)
.phase(salmon)
.phase(pasta)
.phase(serve)
.build()
.run();

Execution timeline:

t=0 [steak starts] [salmon starts] [pasta starts]
t=? [serve starts when last finishes]

The intent is now legible from the structure. You can see that three workstreams run in parallel and converge before serving. The framework handles the concurrency — you declare the shape.

Phases form a directed acyclic graph. Phases with no after() declaration are root phases and start immediately. When a phase completes, AgentEnsemble checks whether any successors now have all predecessors satisfied, and if so starts them immediately.

Phase A = Phase.of("A", taskA);
Phase B = Phase.of("B", taskB);
Phase C = Phase.of("C", taskC);
Phase D = Phase.builder().name("D").after(B, C).task(taskD).build();
// Execution:
// [A] [B] [C] -- root phases, all start simultaneously
// | |
// +--+--+
// |
// [D] -- starts when B and C both complete

The DAG is validated at build time. Cycles, duplicate phase names, and cross-phase context() references to non-predecessor phases all produce a ValidationException before any execution begins.

Each phase can use a different internal workflow strategy for its tasks. This is useful when different stages of a pipeline have different concurrency characteristics.

Phase dataGathering = Phase.builder()
.name("data-gathering")
.workflow(Workflow.PARALLEL) // all three fetch tasks run concurrently
.task(Task.of("Fetch sales data", "Sales CSV"))
.task(Task.of("Fetch inventory data", "Inventory CSV"))
.task(Task.of("Fetch customer data", "Customer CSV"))
.build();
Phase analysis = Phase.builder()
.name("analysis")
.workflow(Workflow.SEQUENTIAL) // each step depends on the previous
.after(dataGathering)
.task(Task.of("Merge datasets", "Combined dataset"))
.task(Task.of("Compute metrics", "KPI summary"))
.task(Task.of("Generate report", "Final report"))
.build();
EnsembleOutput output = Ensemble.builder()
.chatLanguageModel(llm)
.phase(dataGathering)
.phase(analysis)
.build()
.run();

The gather phase fetches all three data sources in parallel. The analysis phase sequences its tasks because each depends on the output of the previous one. Both phases run with the right strategy for their internal structure, without any global configuration change.

If no workflow is set on a phase, it inherits the ensemble-level workflow, which itself defaults to SEQUENTIAL.

Tasks in a later phase can reference tasks from an earlier phase using the standard context() mechanism. The phase DAG guarantees that earlier phases complete before later phases start, so cross-phase context is always safe to resolve.

Task marketTask = Task.builder()
.description("Research market positioning for a Java developer tooling product")
.expectedOutput("Bullet-point market positioning summary")
.build();
Task technicalTask = Task.builder()
.description("Assess implementation complexity for a Java multi-agent framework")
.expectedOutput("Complexity score with rationale and integration checklist")
.build();
Phase marketResearch = Phase.of("market-research", marketTask);
Phase technicalResearch = Phase.of("technical-research", technicalTask);
Phase report = Phase.builder()
.name("report")
.after(marketResearch, technicalResearch)
.task(Task.builder()
.description("Write a product feasibility report combining market and technical findings")
.expectedOutput("One-page feasibility report with go/no-go recommendation")
.context(List.of(marketTask, technicalTask)) // cross-phase references
.build())
.build();

market-research and technical-research run in parallel. When both complete, report starts and the report task can read their outputs via context(). The ordering is enforced by the DAG — there’s no risk of the report task starting before both inputs are ready.

When a phase fails, only its direct and transitive dependents are affected. Independent phases continue running.

[A] [B] [C]
B fails
|
[D depends on B] <-- skipped
[E depends on A] <-- still runs, A is independent

This is important for real workloads. If one data-gathering workstream fails (say, a rate limit or timeout), an unrelated workstream should complete normally. The EnsembleOutput records failures and skips so you can inspect exactly what happened.

If you need all workstreams to succeed before proceeding, structure the DAG so that the final phase depends on all of them — that barrier naturally enforces it.

EnsembleOutput adds a phase-keyed map alongside the existing flat list of task outputs:

EnsembleOutput output = ensemble.run();
// Backward-compatible flat list
List<TaskOutput> all = output.getTaskOutputs();
// Phase-keyed map
Map<String, List<TaskOutput>> byPhase = output.getPhaseOutputs();
List<TaskOutput> marketResults = byPhase.get("market-research");
List<TaskOutput> technicalResults = byPhase.get("technical-research");
List<TaskOutput> reportResults = byPhase.get("report");
// Final output: last task of the last phase to complete
String finalText = output.getFinalOutput();

Flat-task ensembles return an empty map from getPhaseOutputs(), so existing code that reads getTaskOutputs() continues to work unchanged.

Use thisWhen
Flat tasksSimple sequential pipeline, linear context-chaining
Parallel workflowFine-grained task-level concurrency, tasks depend on each other within the same conceptual group
PhasesNamed workstreams with clear start/end, multiple groups that run in parallel, explicit convergence barriers, per-group workflow strategy

The key question is whether you think in terms of named stages. If you do — “first we gather data in parallel, then we analyze” — phases express that directly. If the dependency structure is purely task-to-task with no meaningful grouping, context-chaining in a flat list is simpler.

Do not use phases for simple sequential pipelines. A two-task research-write pipeline doesn’t need phases — the extra structure adds noise without adding clarity.

Phases are additive. The task() and phase() builder methods are mutually exclusive on the same ensemble — you can’t mix them — but all existing flat-task ensembles continue to work exactly as before. Phase is a new layer on top, not a replacement.

The Task, Agent, and WorkflowExecutor types are unchanged. Per-task configuration (model, tools, output type, max iterations, context), agent synthesis, deterministic handlers — all of it works inside phases exactly as it does outside them.

The parallel workflow strategy behind PhaseDagExecutor uses virtual threads, consistent with how ParallelWorkflowExecutor handles task-level concurrency. There’s one virtual thread per phase, coordinated through a CountDownLatch that tracks completion across the DAG.

Phases add structure. Structure is most valuable when the problem is genuinely structured — when you have multiple independent workstreams, named stages, or convergence barriers that matter.

For simple pipelines, the structure is overhead. A Phase.of("research", task1, task2) wrapping two sequential tasks is just ceremony. Flat tasks are cleaner when the pipeline is essentially linear.

The useful abstraction here is the named barrier: a place where multiple concurrent workstreams must all finish before work continues. That’s where phases earn their place.


Documentation:

  • Phases Guide — when to use phases, builder API, cross-phase context, error handling
  • Phases Examples — runnable code: sequential phases, kitchen scenario, per-phase workflow, diamond DAG
  • Getting Started — up and running in 5 minutes
  • GitHub — source, issues, and contributions