DEV Community

YAMASAKI Masahide
YAMASAKI Masahide

Posted on

6

πŸš€ Building Dynamic Parallel Workflows in Google ADK

In real-world systems, you rarely know the exact number of tasks or agents you'll need ahead of time.

How can we dynamically assign and parallelize tasks based on runtime conditions?

In Google's Agent Development Kit (ADK),

the answer is simple but powerful:

Dynamically create a ParallelAgent inside a Custom Agent, and use session.state as your shared communication bus.

In this post, we'll cover:

βœ… Why dynamic parallelism matters

βœ… A minimal working example

βœ… How to run and verify it

βœ… Common pitfalls and best practices

βœ… Extension ideas for scaling


πŸ€” Why Static Parallel Agents Aren't Enough

ADK provides several workflow primitives:

  • SequentialAgent (sequential execution) ⏩
  • ParallelAgent (parallel execution) πŸ”€
  • LoopAgent (repetitive execution) πŸ”„

ParallelAgent, in particular, is extremely lightweight β€”

it simply concurrently executes the given sub-agents.

Perfect for speeding up I/O-bound operations.

However, it has a key limitation:

βœ… The list of child agents must be predefined at construction time.

That means:

  • You can't dynamically change how many workers run
  • You can't adapt to different task types at runtime

πŸ‘‰ To solve this, we build a fresh ParallelAgent dynamically inside a Custom Agent every time.


πŸ—’οΈ Example: A 60-line Dynamic Fanout Pattern

Let's start with a minimal working example:

import random, secrets
from typing import ClassVar, List
from google.adk.events import Event, EventActions
from google.adk.agents import BaseAgent, ParallelAgent, SequentialAgent
from google.genai import types

class Worker(BaseAgent):
    """Simple worker that calculates nΒ²."""
    def __init__(self, *, name: str, run_id: str):
        super().__init__(name=name); self._run_id = run_id
    async def _run_async_impl(self, ctx):
        n = ctx.session.state.get(f"task:{self._run_id}:{self.name}", 0)
        result = n * n
        yield Event(
            author=self.name,
            content=types.Content(role=self.name,
                                  parts=[types.Part(text=f"{n}Β² = {result}")]),
            actions=EventActions(
                state_delta={f"result:{self._run_id}:{self.name}": result})
        )

class PlannerAndRunner(BaseAgent):
    """Distributes tasks and dynamically creates a ParallelAgent."""
    POOL: ClassVar[List[str]] = ["w0", "w1", "w2"]
    async def _run_async_impl(self, ctx):
        run_id = secrets.token_hex(2)
        picked = random.sample(self.POOL,
                               k=random.randint(1, len(self.POOL)))
        task_delta = {f"task:{run_id}:{name}": random.randint(1, 9)
                      for name in picked}
        yield Event(
            author=self.name,
            content=types.Content(role=self.name,
                   parts=[types.Part(text=f"Run {run_id} tasks {task_delta}")]),
            actions=EventActions(state_delta={"current_run": run_id, **task_delta})
        )
        parallel = ParallelAgent(
            name=f"block_{run_id}",
            sub_agents=[Worker(name=n, run_id=run_id) for n in picked]
        )
        async for ev in parallel.run_async(ctx):
            yield ev

class Aggregator(BaseAgent):
    """Aggregates results from workers."""
    async def _run_async_impl(self, ctx):
        run_id = ctx.session.state.get("current_run")
        vals = [v for k, v in ctx.session.state.items()
                if run_id and k.startswith(f"result:{run_id}:")]
        yield Event(
            author=self.name,
            content=types.Content(role=self.name,
                   parts=[types.Part(text=f"Sum = {sum(vals)}")]),
            actions=EventActions(escalate=True)
        )

root_agent = SequentialAgent(
    name="root",
    sub_agents=[PlannerAndRunner(name="planner"), Aggregator(name="collector")]
)
Enter fullscreen mode Exit fullscreen mode

βœ… This simple pipeline dynamically fans out tasks to random workers, runs them in parallel, and aggregates the results β€” all in about 60 lines!


πŸƒ Running It

Launch your agent using the standard adk run command:

$ adk run .
Log setup complete: /tmp/agents_log/agent.20250427_122520.log
To access latest log: tail -F /tmp/agents_log/agent.latest.log
Running agent root, type exit to exit.
Enter fullscreen mode Exit fullscreen mode

When prompted (user:), type anything (e.g., a, b) to trigger a new task round.


πŸ“’ Sample Output

Here's what you'll see:

user: a
[planner]: Run 84e9 tasks {'task:84e9:w0': 3, 'task:84e9:w1': 5}
[w1]: 5Β² = 25
[w0]: 3Β² = 9
[collector]: Sum = 34
user: b
[planner]: Run 35d1 tasks {'task:35d1:w1': 6, 'task:35d1:w0': 7, 'task:35d1:w2': 2}
[w1]: 6Β² = 36
[w0]: 7Β² = 49
[w2]: 2Β² = 4
[collector]: Sum = 89
user:
Enter fullscreen mode Exit fullscreen mode
  • planner logs which workers received tasks
  • Each Worker logs its result
  • collector sums up the results per round

All steps are cleanly traceable thanks to ADK's event logs! πŸ“ˆ


🧩 Common Pitfalls and Best Practices

❌ Reusing Worker Instances

In ADK,

an agent instance can only have one parent (Multi-Agent Systems docs).

➑ Always create fresh Worker instances each time you build a new ParallelAgent.

πŸ—„οΈ Designing session.state Safely

Since all agents share the same session.state,

prefix your keys (e.g., task:{run_id}:{worker_name}) to avoid collisions.

🏷️ Declaring ClassVar Properly

Because ADK agents are Pydantic models internally,

you must use ClassVar annotations for constants like POOL.

from typing import ClassVar
class PlannerAndRunner(BaseAgent):
    POOL: ClassVar[list[str]] = ["w0", "w1", "w2"]
Enter fullscreen mode Exit fullscreen mode

Otherwise, Pydantic will treat them as model fields and raise an error.


πŸ“Š Comparing to Other Approaches

Approach Limitations
Static ParallelAgent Fixed set of children, no runtime flexibility
LLM transfer_to_agent() Routing is flexible but serialized and LLM errors can break it
Manual asyncio.gather() Breaks ADK's observability and session state management

βœ… By dynamically building ParallelAgents,

you get the best of both worlds:

true concurrency + native ADK observability!


🌟 Extensions and Next Steps

Goal Idea
Support multiple task types Pass operation type to Workers
Summarize results with LLM Replace Aggregator with an LlmAgent
LLM-assisted task dispatching Use LlmAgent to select workers dynamically
Production deployment Launch root agents with external triggers on Vertex AI

πŸ“ Final Thoughts

Dynamic parallelism with Custom Agents

lets you build scalable, flexible, and highly performant workflows in ADK.

As long as you:

  • Handle Single-Parent Rule carefully
  • Design a clean session.state schema

you can achieve powerful architectures β€”

without losing any of ADK's built-in tracing and UI support.

If you're building dynamic, data-driven agent pipelines,

this technique belongs in your toolbox. πŸ› οΈ


πŸ”½ Bonus Links

Redis image

62% faster than every other vector database

Tired of slow, inaccurate vector search?
Redis delivers top recall and low latency, outperforming leading vector databases in recent benchmarks. With built-in ANN and easy scaling, it’s a fast, reliable choice for real-time AI apps.

Get started

Top comments (0)

Some comments may only be visible to logged-in visitors. Sign in to view all comments.

AWS Q Developer image

Build your favorite retro game with Amazon Q Developer CLI in the Challenge & win a T-shirt!

Feeling nostalgic? Build Games Challenge is your chance to recreate your favorite retro arcade style game using Amazon Q Developer’s agentic coding experience in the command line interface, Q Developer CLI.

Participate Now

πŸ‘‹ Kindness is contagious

If this **helped, please leave a ❀️ or a friendly comment!

Okay