MapReduce for AI Agents: Scaling Multi-Agent Workloads on the JVM
Not every agent workload is a pipeline. Sometimes you have a list of items — documents to summarize, products to review, regions to analyze — and you need the same kind of analysis applied to each one, then a synthesis of all the results.
That’s MapReduce. And it turns out to be one of the most practical patterns for production agent systems.
Most agent frameworks don’t have a first-class abstraction for this. You end up writing a loop, creating agents and tasks dynamically, wiring up dependencies by hand, and hoping you got the concurrency right. AgentEnsemble provides MapReduceEnsemble — a dedicated builder that handles the fan-out, parallel execution, and fan-in with the same typed, observable API as everything else.
The Pattern
Section titled “The Pattern”MapReduce for agents works exactly like you’d expect:
- Map phase: Take a list of items. For each item, create an agent and a task. Run all map tasks in parallel.
- Reduce phase: Collect all map results. Feed them to a reduce agent that synthesizes the final output.
The key insight is that map tasks are embarrassingly parallel. Each one is independent. The reduce task depends on all of them.
Static MapReduce
Section titled “Static MapReduce”The simplest form: you know the items upfront.
List<String> quarterlyReports = List.of( "Q1 2024 Financial Report", "Q2 2024 Financial Report", "Q3 2024 Financial Report", "Q4 2024 Financial Report");
MapReduceOutput<String, String> output = MapReduceEnsemble.<String, String>builder() .items(quarterlyReports) .mapAgentFactory(report -> Agent.builder() .role("Financial Analyst") .goal("Analyze " + report + " for key trends and anomalies") .background("Expert in financial analysis and reporting") .build()) .mapTaskFactory((report, agent) -> Task.builder() .description("Analyze " + report + ", identifying revenue trends, " + "cost changes, and notable items") .expectedOutput("Detailed analysis with key findings") .agent(agent) .build()) .reduceAgent(Agent.builder() .role("Chief Financial Officer") .goal("Synthesize quarterly analyses into an annual review") .background("Senior executive with deep financial expertise") .build()) .reduceTaskFactory((results, agent) -> Task.builder() .description("Combine all quarterly analyses into a comprehensive " + "annual financial review, highlighting year-over-year trends") .expectedOutput("Annual financial review report") .agent(agent) .build()) .chatLanguageModel(model) .build() .run();
System.out.println(output.getReduceOutput());What happens under the hood:
- Four agents are created, one per report.
- Four map tasks run in parallel.
- When all four complete, the reduce agent receives all four outputs as context.
- The reduce task produces the final synthesis.
The factory functions (mapAgentFactory, mapTaskFactory) receive the item as input, so you can customize the agent’s role, goal, and task description per item.
Typed MapReduce
Section titled “Typed MapReduce”When you need structured output from the reduce phase, add an outputType:
record AnnualReview( String fiscalYear, double totalRevenue, double totalExpenses, List<String> keyTrends, String outlook) {}
MapReduceOutput<String, AnnualReview> output = MapReduceEnsemble.<String, AnnualReview>builder() .items(quarterlyReports) .mapAgentFactory(report -> Agent.builder() .role("Financial Analyst") .goal("Analyze " + report) .build()) .mapTaskFactory((report, agent) -> Task.builder() .description("Analyze " + report) .expectedOutput("Detailed financial analysis") .agent(agent) .build()) .reduceAgent(Agent.builder() .role("CFO") .goal("Produce an annual financial review") .build()) .reduceTaskFactory((results, agent) -> Task.builder() .description("Synthesize quarterly analyses into an annual review") .expectedOutput("Structured annual review") .agent(agent) .outputType(AnnualReview.class) .build()) .chatLanguageModel(model) .build() .run();
AnnualReview review = output.getReduceStructuredOutput(AnnualReview.class);System.out.println(review.totalRevenue());System.out.println(review.keyTrends());The type parameter on MapReduceEnsemble.<String, AnnualReview>builder() specifies the input item type and the reduce output type. The framework handles JSON schema generation, LLM instruction, and deserialization.
Adaptive MapReduce
Section titled “Adaptive MapReduce”Sometimes you don’t know the items upfront. Maybe you have a large document that needs to be partitioned, or you want the LLM to decide how to break up the work.
Adaptive mode lets a planning agent determine the partitioning:
MapReduceOutput<String, String> output = MapReduceEnsemble.<String, String>builder() .adaptive(true) .sourceDescription("A comprehensive market analysis covering 8 " + "industry sectors in the APAC region") .plannerAgent(Agent.builder() .role("Research Director") .goal("Determine the best way to partition the analysis") .background("Expert at breaking complex research into manageable pieces") .build()) .mapAgentFactory(partition -> Agent.builder() .role("Sector Analyst") .goal("Analyze the " + partition + " sector in detail") .build()) .mapTaskFactory((partition, agent) -> Task.builder() .description("Conduct a thorough analysis of " + partition) .expectedOutput("Sector analysis report") .agent(agent) .build()) .reduceAgent(Agent.builder() .role("Chief Strategist") .goal("Synthesize all sector analyses into a unified report") .build()) .reduceTaskFactory((results, agent) -> Task.builder() .description("Create a comprehensive cross-sector APAC market report") .expectedOutput("Unified market analysis") .agent(agent) .build()) .chatLanguageModel(model) .build() .run();In adaptive mode:
- The planner agent receives the
sourceDescriptionand decides how to partition the work. - The planner’s output is parsed into a list of items.
- The map phase proceeds as normal with those items.
- The reduce phase synthesizes everything.
This is useful when the decomposition itself requires intelligence — when you can’t hardcode the partitioning logic.
Real-World Use Cases
Section titled “Real-World Use Cases”MapReduce for agents isn’t just a pattern exercise. Here are concrete use cases where it shines:
Document Processing Pipeline
Section titled “Document Processing Pipeline”List<String> documents = loadDocumentsFromDirectory("contracts/");
MapReduceEnsemble.<String, RiskSummary>builder() .items(documents) .mapAgentFactory(doc -> Agent.builder() .role("Legal Analyst") .goal("Identify risks and obligations in " + doc) .build()) .mapTaskFactory((doc, agent) -> Task.builder() .description("Review " + doc + " for contractual risks, " + "unusual clauses, and compliance concerns") .expectedOutput("Risk analysis for " + doc) .agent(agent) .build()) .reduceAgent(Agent.builder() .role("General Counsel") .goal("Produce a portfolio-wide risk assessment") .build()) .reduceTaskFactory((results, agent) -> Task.builder() .description("Synthesize all contract analyses into a " + "portfolio-wide risk report") .expectedOutput("Portfolio risk assessment") .agent(agent) .outputType(RiskSummary.class) .build()) .chatLanguageModel(model) .build() .run();Ten contracts analyzed in parallel, one unified risk summary.
Multi-Region Market Analysis
Section titled “Multi-Region Market Analysis”List<String> regions = List.of("North America", "Europe", "Asia Pacific", "Latin America");
MapReduceEnsemble.<String, GlobalReport>builder() .items(regions) .mapAgentFactory(region -> Agent.builder() .role(region + " Market Specialist") .goal("Analyze market conditions in " + region) .build()) // ...Competitive Intelligence
Section titled “Competitive Intelligence”List<String> competitors = List.of("Company A", "Company B", "Company C", "Company D", "Company E");
MapReduceEnsemble.<String, CompetitiveLandscape>builder() .items(competitors) .mapAgentFactory(competitor -> Agent.builder() .role("Competitive Intelligence Analyst") .goal("Build a detailed profile of " + competitor) .build()) // ...Code Review
Section titled “Code Review”List<String> pullRequests = fetchOpenPRs();
MapReduceEnsemble.<String, ReviewSummary>builder() .items(pullRequests) .mapAgentFactory(pr -> Agent.builder() .role("Code Reviewer") .goal("Review " + pr + " for quality and correctness") .build()) // ...Observability in MapReduce
Section titled “Observability in MapReduce”MapReduceEnsemble supports the same observability stack as regular ensembles:
MapReduceEnsemble.<String, String>builder() .items(items) // ... agent and task factories ... .chatLanguageModel(model) .listener(event -> { if (event instanceof TaskCompleteEvent e) { logger.info("Completed: {} ({}ms)", e.taskDescription(), e.durationMs()); } }) .traceExporter(TraceExporter.json(Path.of("traces/"))) .costConfiguration(CostConfiguration.builder() .inputTokenCostPer1k(0.01) .outputTokenCostPer1k(0.03) .build()) .build() .run();You can see each map task complete individually, track token consumption per item, and get a full trace of the fan-out/fan-in execution.
Error Handling
Section titled “Error Handling”What happens when one map task fails? By default, the entire MapReduce run fails. But you can configure it to continue:
MapReduceEnsemble.<String, String>builder() .items(items) // ... .errorStrategy(ParallelErrorStrategy.CONTINUE_ON_ERROR) .build() .run();With CONTINUE_ON_ERROR, the reduce phase receives results from all map tasks that succeeded. You can check which items failed and handle them separately.
When to Use MapReduce vs. Parallel Workflows
Section titled “When to Use MapReduce vs. Parallel Workflows”Both run tasks concurrently. The difference is in the structure:
Use parallel workflows when you have a fixed set of heterogeneous tasks with known dependencies:
- “Run market analysis AND financial analysis, then produce a SWOT.”
- Different agents, different task descriptions, explicit dependency wiring.
Use MapReduce when you have a homogeneous operation over a collection:
- “Analyze each of these 10 documents the same way, then synthesize.”
- Same agent template, same task template, applied to different items.
MapReduce is the right abstraction when the word “each” appears in your requirements.
The Bigger Picture
Section titled “The Bigger Picture”MapReduce for agents follows the same principle as MapReduce for data: decompose, parallelize, aggregate. The difference is that each mapper is an LLM-powered agent, not a stateless function. It can reason, use tools, retry, and produce structured output.
Combined with AgentEnsemble’s type safety, observability, and error handling, this gives you a production-grade pattern for scaling agent workloads across collections of items — without writing concurrency code, without managing thread pools, and without leaving the JVM.
Get started:
- Documentation — guides, examples, and API reference
- MapReduce Guide — full API reference for MapReduce ensembles
- Getting Started — up and running in 5 minutes
- GitHub — source, issues, and contributions
AgentEnsemble is MIT-licensed and available on GitHub.