Skip to content
AgentEnsemble AgentEnsemble
Get Started

MapReduce for AI Agents: Scaling Multi-Agent Workloads on the JVM

Not every agent workload is a pipeline. Sometimes you have a list of items — documents to summarize, products to review, regions to analyze — and you need the same kind of analysis applied to each one, then a synthesis of all the results.

That’s MapReduce. And it turns out to be one of the most practical patterns for production agent systems.

Most agent frameworks don’t have a first-class abstraction for this. You end up writing a loop, creating agents and tasks dynamically, wiring up dependencies by hand, and hoping you got the concurrency right. AgentEnsemble provides MapReduceEnsemble — a dedicated builder that handles the fan-out, parallel execution, and fan-in with the same typed, observable API as everything else.

MapReduce for agents works exactly like you’d expect:

  1. Map phase: Take a list of items. For each item, create an agent and a task. Run all map tasks in parallel.
  2. Reduce phase: Collect all map results. Feed them to a reduce agent that synthesizes the final output.

The key insight is that map tasks are embarrassingly parallel. Each one is independent. The reduce task depends on all of them.

The simplest form: you know the items upfront.

List<String> quarterlyReports = List.of(
"Q1 2024 Financial Report",
"Q2 2024 Financial Report",
"Q3 2024 Financial Report",
"Q4 2024 Financial Report"
);
MapReduceOutput<String, String> output = MapReduceEnsemble.<String, String>builder()
.items(quarterlyReports)
.mapAgentFactory(report -> Agent.builder()
.role("Financial Analyst")
.goal("Analyze " + report + " for key trends and anomalies")
.background("Expert in financial analysis and reporting")
.build())
.mapTaskFactory((report, agent) -> Task.builder()
.description("Analyze " + report + ", identifying revenue trends, "
+ "cost changes, and notable items")
.expectedOutput("Detailed analysis with key findings")
.agent(agent)
.build())
.reduceAgent(Agent.builder()
.role("Chief Financial Officer")
.goal("Synthesize quarterly analyses into an annual review")
.background("Senior executive with deep financial expertise")
.build())
.reduceTaskFactory((results, agent) -> Task.builder()
.description("Combine all quarterly analyses into a comprehensive "
+ "annual financial review, highlighting year-over-year trends")
.expectedOutput("Annual financial review report")
.agent(agent)
.build())
.chatLanguageModel(model)
.build()
.run();
System.out.println(output.getReduceOutput());

What happens under the hood:

  1. Four agents are created, one per report.
  2. Four map tasks run in parallel.
  3. When all four complete, the reduce agent receives all four outputs as context.
  4. The reduce task produces the final synthesis.

The factory functions (mapAgentFactory, mapTaskFactory) receive the item as input, so you can customize the agent’s role, goal, and task description per item.

When you need structured output from the reduce phase, add an outputType:

record AnnualReview(
String fiscalYear,
double totalRevenue,
double totalExpenses,
List<String> keyTrends,
String outlook
) {}
MapReduceOutput<String, AnnualReview> output =
MapReduceEnsemble.<String, AnnualReview>builder()
.items(quarterlyReports)
.mapAgentFactory(report -> Agent.builder()
.role("Financial Analyst")
.goal("Analyze " + report)
.build())
.mapTaskFactory((report, agent) -> Task.builder()
.description("Analyze " + report)
.expectedOutput("Detailed financial analysis")
.agent(agent)
.build())
.reduceAgent(Agent.builder()
.role("CFO")
.goal("Produce an annual financial review")
.build())
.reduceTaskFactory((results, agent) -> Task.builder()
.description("Synthesize quarterly analyses into an annual review")
.expectedOutput("Structured annual review")
.agent(agent)
.outputType(AnnualReview.class)
.build())
.chatLanguageModel(model)
.build()
.run();
AnnualReview review = output.getReduceStructuredOutput(AnnualReview.class);
System.out.println(review.totalRevenue());
System.out.println(review.keyTrends());

The type parameter on MapReduceEnsemble.<String, AnnualReview>builder() specifies the input item type and the reduce output type. The framework handles JSON schema generation, LLM instruction, and deserialization.

Sometimes you don’t know the items upfront. Maybe you have a large document that needs to be partitioned, or you want the LLM to decide how to break up the work.

Adaptive mode lets a planning agent determine the partitioning:

MapReduceOutput<String, String> output = MapReduceEnsemble.<String, String>builder()
.adaptive(true)
.sourceDescription("A comprehensive market analysis covering 8 "
+ "industry sectors in the APAC region")
.plannerAgent(Agent.builder()
.role("Research Director")
.goal("Determine the best way to partition the analysis")
.background("Expert at breaking complex research into manageable pieces")
.build())
.mapAgentFactory(partition -> Agent.builder()
.role("Sector Analyst")
.goal("Analyze the " + partition + " sector in detail")
.build())
.mapTaskFactory((partition, agent) -> Task.builder()
.description("Conduct a thorough analysis of " + partition)
.expectedOutput("Sector analysis report")
.agent(agent)
.build())
.reduceAgent(Agent.builder()
.role("Chief Strategist")
.goal("Synthesize all sector analyses into a unified report")
.build())
.reduceTaskFactory((results, agent) -> Task.builder()
.description("Create a comprehensive cross-sector APAC market report")
.expectedOutput("Unified market analysis")
.agent(agent)
.build())
.chatLanguageModel(model)
.build()
.run();

In adaptive mode:

  1. The planner agent receives the sourceDescription and decides how to partition the work.
  2. The planner’s output is parsed into a list of items.
  3. The map phase proceeds as normal with those items.
  4. The reduce phase synthesizes everything.

This is useful when the decomposition itself requires intelligence — when you can’t hardcode the partitioning logic.

MapReduce for agents isn’t just a pattern exercise. Here are concrete use cases where it shines:

List<String> documents = loadDocumentsFromDirectory("contracts/");
MapReduceEnsemble.<String, RiskSummary>builder()
.items(documents)
.mapAgentFactory(doc -> Agent.builder()
.role("Legal Analyst")
.goal("Identify risks and obligations in " + doc)
.build())
.mapTaskFactory((doc, agent) -> Task.builder()
.description("Review " + doc + " for contractual risks, "
+ "unusual clauses, and compliance concerns")
.expectedOutput("Risk analysis for " + doc)
.agent(agent)
.build())
.reduceAgent(Agent.builder()
.role("General Counsel")
.goal("Produce a portfolio-wide risk assessment")
.build())
.reduceTaskFactory((results, agent) -> Task.builder()
.description("Synthesize all contract analyses into a "
+ "portfolio-wide risk report")
.expectedOutput("Portfolio risk assessment")
.agent(agent)
.outputType(RiskSummary.class)
.build())
.chatLanguageModel(model)
.build()
.run();

Ten contracts analyzed in parallel, one unified risk summary.

List<String> regions = List.of("North America", "Europe",
"Asia Pacific", "Latin America");
MapReduceEnsemble.<String, GlobalReport>builder()
.items(regions)
.mapAgentFactory(region -> Agent.builder()
.role(region + " Market Specialist")
.goal("Analyze market conditions in " + region)
.build())
// ...
List<String> competitors = List.of("Company A", "Company B",
"Company C", "Company D", "Company E");
MapReduceEnsemble.<String, CompetitiveLandscape>builder()
.items(competitors)
.mapAgentFactory(competitor -> Agent.builder()
.role("Competitive Intelligence Analyst")
.goal("Build a detailed profile of " + competitor)
.build())
// ...
List<String> pullRequests = fetchOpenPRs();
MapReduceEnsemble.<String, ReviewSummary>builder()
.items(pullRequests)
.mapAgentFactory(pr -> Agent.builder()
.role("Code Reviewer")
.goal("Review " + pr + " for quality and correctness")
.build())
// ...

MapReduceEnsemble supports the same observability stack as regular ensembles:

MapReduceEnsemble.<String, String>builder()
.items(items)
// ... agent and task factories ...
.chatLanguageModel(model)
.listener(event -> {
if (event instanceof TaskCompleteEvent e) {
logger.info("Completed: {} ({}ms)",
e.taskDescription(), e.durationMs());
}
})
.traceExporter(TraceExporter.json(Path.of("traces/")))
.costConfiguration(CostConfiguration.builder()
.inputTokenCostPer1k(0.01)
.outputTokenCostPer1k(0.03)
.build())
.build()
.run();

You can see each map task complete individually, track token consumption per item, and get a full trace of the fan-out/fan-in execution.

What happens when one map task fails? By default, the entire MapReduce run fails. But you can configure it to continue:

MapReduceEnsemble.<String, String>builder()
.items(items)
// ...
.errorStrategy(ParallelErrorStrategy.CONTINUE_ON_ERROR)
.build()
.run();

With CONTINUE_ON_ERROR, the reduce phase receives results from all map tasks that succeeded. You can check which items failed and handle them separately.

When to Use MapReduce vs. Parallel Workflows

Section titled “When to Use MapReduce vs. Parallel Workflows”

Both run tasks concurrently. The difference is in the structure:

Use parallel workflows when you have a fixed set of heterogeneous tasks with known dependencies:

  • “Run market analysis AND financial analysis, then produce a SWOT.”
  • Different agents, different task descriptions, explicit dependency wiring.

Use MapReduce when you have a homogeneous operation over a collection:

  • “Analyze each of these 10 documents the same way, then synthesize.”
  • Same agent template, same task template, applied to different items.

MapReduce is the right abstraction when the word “each” appears in your requirements.

MapReduce for agents follows the same principle as MapReduce for data: decompose, parallelize, aggregate. The difference is that each mapper is an LLM-powered agent, not a stateless function. It can reason, use tools, retry, and produce structured output.

Combined with AgentEnsemble’s type safety, observability, and error handling, this gives you a production-grade pattern for scaling agent workloads across collections of items — without writing concurrency code, without managing thread pools, and without leaving the JVM.


Get started:


AgentEnsemble is MIT-licensed and available on GitHub.