<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>Forem: Godson Ajodo</title>
    <description>The latest articles on Forem by Godson Ajodo (@godson_ajodo_ff4313d88c24).</description>
    <link>https://forem.com/godson_ajodo_ff4313d88c24</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F3638466%2F8cdf8338-d216-458d-848e-357e8235980b.png</url>
      <title>Forem: Godson Ajodo</title>
      <link>https://forem.com/godson_ajodo_ff4313d88c24</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://forem.com/feed/godson_ajodo_ff4313d88c24"/>
    <language>en</language>
    <item>
      <title>Looking for contributors: Automata_Diags (Python automata toolkit)</title>
      <dc:creator>Godson Ajodo</dc:creator>
      <pubDate>Mon, 26 Jan 2026 05:41:25 +0000</pubDate>
      <link>https://forem.com/godson_ajodo_ff4313d88c24/looking-for-contributors-automatadiags-python-automata-toolkit-k1m</link>
      <guid>https://forem.com/godson_ajodo_ff4313d88c24/looking-for-contributors-automatadiags-python-automata-toolkit-k1m</guid>
      <description>&lt;p&gt;Hi everyone! I’m looking for contributors to &lt;strong&gt;Automata_Diags&lt;/strong&gt;, an open‑source Python toolkit for learning and experimenting with automata theory (DFA, NFA, PDA, CFG, TM) with diagram generation.&lt;/p&gt;

&lt;h2&gt;
  
  
  What the project does
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;Build and simulate common automata types&lt;/li&gt;
&lt;li&gt;Validate alphabets, states, and transitions&lt;/li&gt;
&lt;li&gt;Generate state diagrams (Graphviz)&lt;/li&gt;
&lt;li&gt;Include example scripts for quick demos&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Why I’m posting
&lt;/h2&gt;

&lt;p&gt;I want to make the project more useful for students and educators, improve testing, and expand examples. If you like automata theory or want a focused Python OSS project to contribute to, I’d love your help.&lt;/p&gt;

&lt;h2&gt;
  
  
  Where I’d love help
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;More examples and use cases (especially TM, PDA, CFG)&lt;/li&gt;
&lt;li&gt;Tests and edge‑case coverage&lt;/li&gt;
&lt;li&gt;Documentation and tutorials&lt;/li&gt;
&lt;li&gt;Bug fixes and small refactors&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  How to contribute
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;Repo: &lt;a href="https://github.com/Ajodo-Godson/automata_diags" rel="noopener noreferrer"&gt;https://github.com/Ajodo-Godson/automata_diags&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;PyPI: &lt;a href="https://pypi.org/project/automata-diags/" rel="noopener noreferrer"&gt;https://pypi.org/project/automata-diags/&lt;/a&gt;
&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Open an issue with ideas or pick a small improvement and send a PR. Clear repro steps and small, focused changes are ideal.&lt;/p&gt;

&lt;p&gt;Thanks for taking a look—feedback is welcome, even if you don’t contribute code.&lt;/p&gt;

</description>
      <category>opensource</category>
      <category>automatatheory</category>
      <category>pythonpackage</category>
      <category>github</category>
    </item>
    <item>
      <title>I Built MapReduce from Scratch</title>
      <dc:creator>Godson Ajodo</dc:creator>
      <pubDate>Fri, 12 Dec 2025 23:28:57 +0000</pubDate>
      <link>https://forem.com/godson_ajodo_ff4313d88c24/i-built-mapreduce-from-scratch-d8g</link>
      <guid>https://forem.com/godson_ajodo_ff4313d88c24/i-built-mapreduce-from-scratch-d8g</guid>
      <description>&lt;h2&gt;
  
  
  Why I did this to myself and why distributed systems are hard
&lt;/h2&gt;

&lt;p&gt;You can read the Google MapReduce paper in an afternoon. It's 13 pages. I've read it a few times now, and every time I thought I understood it—until I tried to build it.&lt;/p&gt;

&lt;p&gt;The scenario that got me started: you have a 1TB log file. You need word counts. What do you do?&lt;/p&gt;

&lt;p&gt;Option 1: You load it all into memory and your laptop catches fire.&lt;br&gt;
Option 2: Process line by line on one machine, but you graduate before it finishes.&lt;br&gt;
Option 3: Split the work across machines and combine their results at the end.&lt;/p&gt;

&lt;p&gt;I went with option 3. Google figured this out in 2004. Twenty years later, I'm in my room late at night wondering why worker 2 keeps dying.&lt;/p&gt;

&lt;p&gt;The formal problem is this: given input too large for one machine's memory or too slow for one machine's CPU, partition it into &lt;code&gt;M&lt;/code&gt; splits, process each in parallel, then combine results. MapReduce formalizes this as two functions (map and reduce) with the framework handling distribution, fault tolerance, and the shuffle phase where data moves across the network. Reading the paper took an hour. Making it work ate multiple weekends.&lt;/p&gt;


&lt;h2&gt;
  
  
  Clone and run it yourself (5 minutes)
&lt;/h2&gt;

&lt;p&gt;Before we dive into the theory, here's how to actually run this. The whole point of building it was to make distributed systems tangible.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;# Get the code&lt;/span&gt;
git clone https://github.com/Ajodo-Godson/MapReduce
&lt;span class="nb"&gt;cd &lt;/span&gt;MapReduce

&lt;span class="c"&gt;# Start the cluster with web dashboard&lt;/span&gt;
docker-compose &lt;span class="nt"&gt;-f&lt;/span&gt; docker-compose.benchmark.yml up &lt;span class="nt"&gt;--build&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Open your browser to &lt;strong&gt;&lt;a href="http://localhost:5000" rel="noopener noreferrer"&gt;http://localhost:5000&lt;/a&gt;&lt;/strong&gt; to see the real-time dashboard showing:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Worker status (idle, busy, or failed)&lt;/li&gt;
&lt;li&gt;Job progress through map and reduce phases&lt;/li&gt;
&lt;li&gt;Live event log as tasks are assigned and completed&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;You should see output like:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;[2025-12-05 10:30:15] Master server started on port 50051
[2025-12-05 10:30:17] Worker worker1 registered
[2025-12-05 10:30:17] Worker worker2 registered
[2025-12-05 10:30:18] Worker worker3 registered
[2025-12-05 10:30:18] Created 179 map tasks from input
[2025-12-05 10:30:18] Assigned map_0 to worker1
[2025-12-05 10:30:19] Task map_0 completed by worker1
...
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now break it (in a new terminal):&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;# Kill a worker mid-job&lt;/span&gt;
docker stop benchmark_worker2

&lt;span class="c"&gt;# Watch recovery in the dashboard and logs&lt;/span&gt;
&lt;span class="c"&gt;# Worker2 turns "failed", its tasks get reassigned to surviving workers&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Clean up:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker-compose &lt;span class="nt"&gt;-f&lt;/span&gt; docker-compose.benchmark.yml down
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Other ways to run it
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;# Run automated benchmarks (1 worker vs 3 workers, failure recovery)&lt;/span&gt;
python3 examples/benchmark.py

&lt;span class="c"&gt;# Run DAG scheduler tests (unit tests, no Docker needed)&lt;/span&gt;
python3 examples/dag_integration_test.py &lt;span class="nt"&gt;--test&lt;/span&gt; unit

&lt;span class="c"&gt;# Run full end-to-end pipeline with real data chaining&lt;/span&gt;
python3 examples/dag_integration_test.py &lt;span class="nt"&gt;--test&lt;/span&gt; pipeline
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If you want to run without the dashboard (basic docker-compose):&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;pip &lt;span class="nb"&gt;install&lt;/span&gt; &lt;span class="nt"&gt;-r&lt;/span&gt; requirements.txt
./scripts/generate_proto.sh
docker-compose up &lt;span class="nt"&gt;-d&lt;/span&gt;
docker-compose logs &lt;span class="nt"&gt;-f&lt;/span&gt; master
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;h2&gt;
  
  
  Map, Shuffle, Reduce: the part that actually makes sense
&lt;/h2&gt;

&lt;p&gt;The core loop is almost embarrassingly simple. I kept telling myself "this is easy" until things started breaking.&lt;/p&gt;

&lt;p&gt;Three steps:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Map&lt;/strong&gt;: Take input key-value pairs, emit intermediate key-value pairs&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Shuffle&lt;/strong&gt;: Group all values by key across all mappers (this is where your network cries)&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Reduce&lt;/strong&gt;: For each unique key, combine all its values into a final result&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;The paper writes it like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Map    : (k1, v1)       → list(k2, v2)
Reduce : (k2, list(v2)) → list(v2)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Notice the types change. Your input keys (k_1, v_1) are different from your intermediate keys (k_2, list(v_2)). That tripped me up initially because I kept trying to make them match. But this is intentional. The map function transforms your data into a new shape that makes aggregation possible. Your input might be (filename, file_contents), but your output is (word, count). Completely different domains.&lt;/p&gt;

&lt;p&gt;The partitioning matters too. After map emits pairs, they're bucketed using &lt;code&gt;hash(key) mod R&lt;/code&gt; where R is the number of reducers. This guarantees all occurrences of the same key end up at the same reducer, which is critical for correctness.&lt;/p&gt;

&lt;p&gt;Word count, the "hello world" of MapReduce:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;MAP PHASE (parallel across workers):
  Input: "the cat sat on the mat"
  Mapper emits: ("the", 1), ("cat", 1), ("sat", 1), ("on", 1), ("the", 1), ("mat", 1)

SHUFFLE PHASE (group by key):
  "the" -&amp;gt; [1, 1]
  "cat" -&amp;gt; [1]
  "sat" -&amp;gt; [1]
  "on"  -&amp;gt; [1]
  "mat" -&amp;gt; [1]

REDUCE PHASE (aggregate):
  "the" -&amp;gt; 2
  "cat" -&amp;gt; 1
  ...
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Here's the actual implementation. This is what runs in each worker:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# src/framework/mapper.py
&lt;/span&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;word_count_map&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;filename&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;contents&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Map function: (filename, contents) -&amp;gt; list(word, 1)&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="n"&gt;words&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;contents&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;lower&lt;/span&gt;&lt;span class="p"&gt;().&lt;/span&gt;&lt;span class="nf"&gt;split&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;word&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;words&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;word&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;''&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;join&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;c&lt;/span&gt; &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;c&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;word&lt;/span&gt; &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;c&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;isalnum&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;word&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="nf"&gt;yield &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;word&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;


&lt;span class="c1"&gt;# src/framework/reducer.py
&lt;/span&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;word_count_reduce&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;word&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;counts&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Reduce function: (word, [1,1,1,...]) -&amp;gt; total_count&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nf"&gt;sum&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;int&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;c&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;c&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;counts&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The map phase applies the map function and partitions the output:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# src/framework/mapper.py
&lt;/span&gt;&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;MapPhase&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;__init__&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;map_function&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;partitioner&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;map_function&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;map_function&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;partitioner&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;partitioner&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;execute&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;input_key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;input_value&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Execute map function and partition results&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
        &lt;span class="c1"&gt;# Execute user's map function
&lt;/span&gt;        &lt;span class="n"&gt;intermediate_pairs&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;map_function&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;input_key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;input_value&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="c1"&gt;# Partition the intermediate key-value pairs
&lt;/span&gt;        &lt;span class="n"&gt;partitions&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nf"&gt;defaultdict&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;list&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;_&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="nf"&gt;range&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;partitioner&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;num_partitions&lt;/span&gt;&lt;span class="p"&gt;)]&lt;/span&gt;

        &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;intermediate_pairs&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;partition_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;partitioner&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get_partition&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="n"&gt;partitions&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;partition_id&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;].&lt;/span&gt;&lt;span class="nf"&gt;append&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nf"&gt;dict&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;p&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;p&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;partitions&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Notice how the partitioner decides which reducer will handle each key. All occurrences of "the" from all mappers end up in the same partition, so one reducer can sum them all.&lt;/p&gt;

&lt;p&gt;But word count is almost too simple to be interesting. The paper lists other examples that helped me understand the generality of this model:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Distributed Grep&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;You want to find all lines in a massive log file that match a regex pattern. The map function checks each line against the pattern and emits the line if it matches. The reduce function is just the identity function: it passes through the intermediate data unchanged. No aggregation needed because you're not combining anything, just filtering.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Map:    (filename, line) → if regex.match(line): emit(line, "")
Reduce: (line, [""]) → line
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This is the simplest possible MapReduce job. The "reduce" does nothing. But you still get parallelism across all your input files.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;URL Access Frequency&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;You have terabytes of web server logs and want to know which URLs get the most traffic. Each log entry contains a URL. Map emits (URL, 1) for each request. Reduce sums all the 1s for each URL.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Map:    (log_file, entry) → emit(entry.url, 1)
Reduce: (url, [1, 1, 1, ...]) → (url, sum)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This is exactly word count, but for URLs instead of words. Same pattern, different domain.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Reverse Web-Link Graph&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Given a collection of web pages, you want to find all pages that link TO a given page. For each page, map scans for outgoing links and emits (target_url, source_url). Reduce collects all source URLs for each target.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Map:    (source_url, page_html) → for each link in page: emit(link.target, source_url)
Reduce: (target_url, [source1, source2, ...]) → (target_url, list_of_sources)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The output tells you: "These 47 pages link to google.com." This is useful for PageRank-style algorithms where you need to know the incoming link structure.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Inverted Index&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;This is how search engines work. Given a collection of documents, build an index that maps each word to the list of documents containing it. Map emits (word, document_id) for each word in each document. Reduce collects all document IDs per word.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Map:    (doc_id, contents) → for each word in contents: emit(word, doc_id)
Reduce: (word, [doc1, doc2, doc3, ...]) → (word, sorted_list_of_docs)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now when someone searches for "distributed systems," you look up "distributed" and "systems" in your index, intersect the document lists, and return the results. Google processes petabytes of web pages this way.&lt;/p&gt;

&lt;p&gt;The key insight: all these problems follow the same computational pattern. You write a map function and a reduce function. The framework handles distribution, fault tolerance, and shuffling. That separation of concerns is what made MapReduce revolutionary in 2004. Not the algorithm itself, but the programming model that let ordinary engineers write distributed programs without thinking about distributed systems.&lt;/p&gt;

&lt;p&gt;Okay cool. Now do that across three machines where any of them might die mid-sentence.&lt;/p&gt;




&lt;h2&gt;
  
  
  How I wired the boxes together
&lt;/h2&gt;

&lt;p&gt;Classic master-worker setup. One boss node tells everyone what to do. Workers do the actual math.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;                    +------------------+
                    |      Master      |
                    |    (the boss)    |
                    +--------+---------+
                             |
           +-----------------+-----------------+
           |                 |                 |
           v                 v                 v
    +----------+      +----------+      +----------+
    | Worker 1 |      | Worker 2 |      | Worker 3 |
    +----------+      +----------+      +----------+
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This is a centralized architecture. The master maintains all state: which tasks exist, which workers are alive, what's pending, what's running, what's done. Workers don't coordinate with each other. They only talk to the master. Ask for work, do it, report back, repeat. This keeps the worker logic simple: no consensus protocols, no peer-to-peer negotiation, no complex state to maintain between tasks.&lt;/p&gt;

&lt;p&gt;The upside is simplicity. There's one source of truth. No consensus needed between workers. No Paxos, no Raft, no complex protocols for agreeing on who does what. The master decides, and that's that.&lt;/p&gt;

&lt;p&gt;The downside is obvious: if the master dies, everything stops. This is a single point of failure. The paper basically shrugs at this, says master failure is "unlikely" and that clients can just retry. A real fix would be something like Raft, where workers vote on a new leader if the master disappears. The master could checkpoint its state to persistent storage, and a standby could take over.&lt;/p&gt;

&lt;p&gt;I thought about adding it. Then I thought about my deadline, and planned to add this later on. &lt;br&gt;
Moving on...&lt;/p&gt;
&lt;h3&gt;
  
  
  gRPC: How the boxes actually talk
&lt;/h3&gt;

&lt;p&gt;I hadn't touched protobuf before this project. Think JSON, but binary and way grumpier about types.&lt;/p&gt;

&lt;p&gt;The original MapReduce paper uses custom RPC mechanisms. Google in 2004 had their own infrastructure for everything. For my implementation, gRPC made sense: it handles serialization, connection management, and generates type-safe client/server stubs from a &lt;code&gt;.proto&lt;/code&gt; file.&lt;/p&gt;

&lt;p&gt;Here's the deal: gRPC lets workers call functions on the master like they're local. No manual socket code. No parsing HTTP responses. You define a contract and the tooling generates the boilerplate.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight protobuf"&gt;&lt;code&gt;&lt;span class="c1"&gt;// protos/mapreduce.proto&lt;/span&gt;

&lt;span class="kd"&gt;service&lt;/span&gt; &lt;span class="n"&gt;MasterService&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;rpc&lt;/span&gt; &lt;span class="n"&gt;RegisterWorker&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;WorkerInfo&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;returns&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;RegisterResponse&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="k"&gt;rpc&lt;/span&gt; &lt;span class="n"&gt;SendHeartbeat&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;Heartbeat&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;returns&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;HeartbeatAck&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="k"&gt;rpc&lt;/span&gt; &lt;span class="n"&gt;RequestTask&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;TaskRequest&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;returns&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;TaskAssignment&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="k"&gt;rpc&lt;/span&gt; &lt;span class="n"&gt;ReportTaskComplete&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;TaskResult&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;returns&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;TaskAck&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="k"&gt;rpc&lt;/span&gt; &lt;span class="n"&gt;ReportIntermediateFiles&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;IntermediateReport&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;returns&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;TaskAck&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="k"&gt;rpc&lt;/span&gt; &lt;span class="n"&gt;GetIntermediateLocations&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;PartitionRequest&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;returns&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;IntermediateLocations&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Six RPCs total. The first four handle core worker-master communication. The last two handle the shuffle phase (map outputs → reduce inputs).&lt;/p&gt;

&lt;p&gt;Here are the key message structures:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight protobuf"&gt;&lt;code&gt;&lt;span class="kd"&gt;message&lt;/span&gt; &lt;span class="nc"&gt;TaskAssignment&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="na"&gt;task_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="na"&gt;task_type&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;        &lt;span class="c1"&gt;// "map" or "reduce"&lt;/span&gt;
    &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="na"&gt;input_data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;       &lt;span class="c1"&gt;// Text chunk for map, file locations for reduce&lt;/span&gt;
    &lt;span class="kt"&gt;int32&lt;/span&gt; &lt;span class="na"&gt;partition_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;4&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;      &lt;span class="c1"&gt;// Which partition this task handles&lt;/span&gt;
    &lt;span class="kt"&gt;int32&lt;/span&gt; &lt;span class="na"&gt;num_reduce_tasks&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;  &lt;span class="c1"&gt;// R value for partitioning&lt;/span&gt;
    &lt;span class="kt"&gt;bool&lt;/span&gt; &lt;span class="na"&gt;has_task&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;6&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;           &lt;span class="c1"&gt;// False if no work available&lt;/span&gt;
    &lt;span class="kt"&gt;int32&lt;/span&gt; &lt;span class="na"&gt;task_sequence_number&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;7&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;  &lt;span class="c1"&gt;// For idempotency&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="kd"&gt;message&lt;/span&gt; &lt;span class="nc"&gt;TaskResult&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="na"&gt;worker_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="na"&gt;task_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="kt"&gt;bool&lt;/span&gt; &lt;span class="na"&gt;success&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="na"&gt;output_data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;4&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;      &lt;span class="c1"&gt;// JSON-encoded reduce output&lt;/span&gt;
    &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="na"&gt;error_message&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="kt"&gt;int32&lt;/span&gt; &lt;span class="na"&gt;task_sequence_number&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;6&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The &lt;code&gt;task_sequence_number&lt;/code&gt; field is interesting. It's my defense against the "zombie worker" problem. Each task reassignment increments this number. If worker A receives task map_0 (seq=0), fails, and worker B receives the same task (seq=1), a late completion report from A (seq=0) is recognized as stale and ignored. This prevents duplicate work from corrupting results. More on this later when I talk about failure handling.&lt;/p&gt;

&lt;p&gt;For the shuffle phase:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight protobuf"&gt;&lt;code&gt;&lt;span class="kd"&gt;message&lt;/span&gt; &lt;span class="nc"&gt;IntermediateReport&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="na"&gt;task_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="na"&gt;worker_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;repeated&lt;/span&gt; &lt;span class="n"&gt;FileLocation&lt;/span&gt; &lt;span class="na"&gt;locations&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;  &lt;span class="c1"&gt;// One file per partition&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="kd"&gt;message&lt;/span&gt; &lt;span class="nc"&gt;FileLocation&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="kt"&gt;int32&lt;/span&gt; &lt;span class="na"&gt;partition_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="na"&gt;file_path&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="kt"&gt;int64&lt;/span&gt; &lt;span class="na"&gt;file_size&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="kd"&gt;message&lt;/span&gt; &lt;span class="nc"&gt;IntermediateLocations&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="kt"&gt;int32&lt;/span&gt; &lt;span class="na"&gt;partition_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;repeated&lt;/span&gt; &lt;span class="n"&gt;WorkerFileLocation&lt;/span&gt; &lt;span class="na"&gt;locations&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;  &lt;span class="c1"&gt;// All files for this partition&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;When a map task completes, it reports locations for R intermediate files. When a reduce task starts, it queries for all locations for its partition.&lt;/p&gt;

&lt;p&gt;Worker calls &lt;code&gt;stub.RequestTask()&lt;/code&gt;. Master responds with a task. Magic? No. But it felt like it after years of writing REST endpoints and manually serializing JSON.&lt;/p&gt;

&lt;p&gt;Protocol Buffers (and the generated gRPC stubs) buy you more than a stricter type system. The proto compiler emits strongly-typed classes and client/server stubs so you no longer write repetitive serialization and parsing code by hand — you call methods and pass objects. That reduces boring bugs and speeds development. The protobuf wire format is compact and faster to parse than JSON, which matters when you are doing M × R intermediate-file lookups and many small RPCs during the shuffle.&lt;/p&gt;

&lt;p&gt;Protobufs also make schema evolution and rolling upgrades easier: unknown fields are ignored by older clients, you can add optional fields safely, and you can reserve field numbers to avoid accidental reuse. Those properties let you evolve message shapes without bringing the whole cluster down. On the runtime side, pairing protobufs with gRPC (HTTP/2) gives useful features for this project — streaming RPCs for continuous heartbeats or long file-location fetches, built-in support for deadlines and cancellation, and multiplexed connections that reduce connection churn.&lt;/p&gt;

&lt;p&gt;There are trade-offs: the binary format is less human readable than JSON and you must manage &lt;code&gt;.proto&lt;/code&gt; changes carefully (don’t rename or reuse field numbers). You also need the toolchain (protoc) in your dev flow. Even so, for a small distributed system where correctness and clear contracts between processes matter, the schema-driven, generated-stub model removed a huge class of serialization bugs and made the master/worker surface area easy to reason about and test.&lt;/p&gt;

&lt;p&gt;A few more practical gRPC notes that mattered while building this project:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Unary vs streaming RPCs. gRPC supports standard (unary) calls and three streaming modes (server-, client-, and bi-directional streaming). For this project I used simple unary RPCs for the core interactions (register, request task, report completion, report intermediate locations) because the semantics are easy to reason about and they matched the request/response nature of the protocol. Streaming is handy for other patterns though. For example, a server-stream could push status updates to a long-lived client connection, or a bidirectional stream could carry a heartbeat and task-control messages over a single multiplexed channel.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;HTTP/2 benefits. gRPC runs over HTTP/2, which gives connection multiplexing and lower per-call overhead compared to opening lots of short HTTP/1.1 connections. That matters during the shuffle where many small RPCs and file-location lookups happen: fewer TCP/TLS handshakes, flow control, and header compression reduce latency and CPU overhead on busy nodes.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Operational trade-offs. gRPC is a dependency and its binary format is less human-friendly than JSON logs, so you often pair it with good logging and health-check endpoints. Also, some network middleboxes and load balancers are not gRPC-aware and can interfere with streaming or HTTP/2 features.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;All told, gRPC didn't just save lines of code, it also gave clear runtime primitives (streams, deadlines, status codes, and multiplexing) that matched the needs of a master/worker system and made testing and evolution far less annoying.&lt;/p&gt;

&lt;h3&gt;
  
  
  The full worker loop (this is the good stuff)
&lt;/h3&gt;

&lt;p&gt;Here's what actually runs in each container. This is the complete worker lifecycle:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# src/worker/client.py (simplified for clarity)
&lt;/span&gt;&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;Worker&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;run&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="c1"&gt;# 1. Register with master
&lt;/span&gt;        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="ow"&gt;not&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;register&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
            &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Failed to register. Exiting.&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt;

        &lt;span class="c1"&gt;# 2. Start heartbeat thread (runs in background)
&lt;/span&gt;        &lt;span class="n"&gt;heartbeat_thread&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;threading&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;Thread&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;target&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;send_heartbeat&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;daemon&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;heartbeat_thread&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;start&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

        &lt;span class="c1"&gt;# 3. Main task loop
&lt;/span&gt;        &lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;running&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="c1"&gt;# Request work from master
&lt;/span&gt;            &lt;span class="n"&gt;task&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;request_task&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;task&lt;/span&gt; &lt;span class="ow"&gt;and&lt;/span&gt; &lt;span class="n"&gt;task&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;task_type&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;map&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                &lt;span class="c1"&gt;# Execute map, partition results into R files
&lt;/span&gt;                &lt;span class="n"&gt;partitions&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;executor&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;execute_map&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
                    &lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;task&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                    &lt;span class="n"&gt;input_data&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;task&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;input_data&lt;/span&gt;
                &lt;span class="p"&gt;)&lt;/span&gt;

                &lt;span class="c1"&gt;# Write partitioned output to local disk
&lt;/span&gt;                &lt;span class="n"&gt;file_paths&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;intermediate_manager&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;write_partitioned_output&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
                    &lt;span class="n"&gt;task&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;partitions&lt;/span&gt;
                &lt;span class="p"&gt;)&lt;/span&gt;

                &lt;span class="c1"&gt;# Tell master where intermediate files are
&lt;/span&gt;                &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;report_intermediate_files&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;task&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;file_paths&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;report_result&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;task&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;success&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

            &lt;span class="k"&gt;elif&lt;/span&gt; &lt;span class="n"&gt;task&lt;/span&gt; &lt;span class="ow"&gt;and&lt;/span&gt; &lt;span class="n"&gt;task&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;task_type&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;reduce&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                &lt;span class="c1"&gt;# SHUFFLE: Fetch intermediate data from ALL mappers for this partition
&lt;/span&gt;                &lt;span class="n"&gt;intermediate_locations&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;loads&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;task&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;input_data&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

                &lt;span class="c1"&gt;# Merge all intermediate files for this partition
&lt;/span&gt;                &lt;span class="n"&gt;grouped_data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;executor&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;_fetch_and_group_intermediate&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
                    &lt;span class="n"&gt;intermediate_locations&lt;/span&gt;
                &lt;span class="p"&gt;)&lt;/span&gt;

                &lt;span class="c1"&gt;# Sort by key, then reduce
&lt;/span&gt;                &lt;span class="n"&gt;sorted_data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;sorted&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;grouped_data&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;items&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
                &lt;span class="n"&gt;result&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;executor&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;reduce_phase&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;execute&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;sorted_data&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

                &lt;span class="c1"&gt;# Report final result to master
&lt;/span&gt;                &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;report_result&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;task&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;success&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;output&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;dumps&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;result&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;

            &lt;span class="k"&gt;else&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;sleep&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;  &lt;span class="c1"&gt;# No tasks available, wait and retry
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;What makes this interesting:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Map tasks create R intermediate files (one per reduce partition)&lt;/li&gt;
&lt;li&gt;Reduce tasks fetch files from ALL map workers (M × R transfers total)&lt;/li&gt;
&lt;li&gt;Everything is asynchronous. Workers don't wait for each other&lt;/li&gt;
&lt;li&gt;The heartbeat runs in a separate thread so work doesn't block the "still alive" signal&lt;/li&gt;
&lt;/ol&gt;




&lt;h2&gt;
  
  
  The master's brain
&lt;/h2&gt;

&lt;h3&gt;
  
  
  What it keeps track of
&lt;/h3&gt;

&lt;p&gt;For every task (map or reduce), the master stores:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Current state: Pending, Running, Completed, or Failed&lt;/li&gt;
&lt;li&gt;Which worker has it&lt;/li&gt;
&lt;li&gt;Where the intermediate files ended up&lt;/li&gt;
&lt;li&gt;Timestamps. Attempt counts. Stuff for debugging at 1 a.m.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Here's the actual data structure:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# src/master/state.py
&lt;/span&gt;
&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;MasterState&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;__init__&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;workers&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{}&lt;/span&gt;  &lt;span class="c1"&gt;# worker_id -&amp;gt; worker_info
&lt;/span&gt;        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;tasks&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{}&lt;/span&gt;    &lt;span class="c1"&gt;# task_id -&amp;gt; task_info
&lt;/span&gt;        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;completed_task_ids&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;set&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;  &lt;span class="c1"&gt;# For idempotency checking
&lt;/span&gt;        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;intermediate_files&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{}&lt;/span&gt;  &lt;span class="c1"&gt;# task_id -&amp;gt; {partition_id -&amp;gt; location}
&lt;/span&gt;        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;lock&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;threading&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;Lock&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;  &lt;span class="c1"&gt;# Thread safety for concurrent access
&lt;/span&gt;        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;events&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[]&lt;/span&gt;  &lt;span class="c1"&gt;# Recent events for visualization
&lt;/span&gt;
    &lt;span class="c1"&gt;# Worker tracking
&lt;/span&gt;    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;add_worker&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;worker_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;info&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;lock&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;workers&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;worker_id&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;host&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;info&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;host&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
                &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;port&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;info&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;port&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
                &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;status&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;idle&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;last_heartbeat&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;time&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
                &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;current_tasks&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;[],&lt;/span&gt;
                &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;tasks_completed&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;
            &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="c1"&gt;# Task management
&lt;/span&gt;    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;add_task&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;task_info&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;lock&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;tasks&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="o"&gt;**&lt;/span&gt;&lt;span class="n"&gt;task_info&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;status&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;pending&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;created_at&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;time&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
                &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;assigned_worker&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;assigned_at&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;completed_at&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;attempts&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;sequence_number&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;  &lt;span class="c1"&gt;# Incremented on each assignment
&lt;/span&gt;                &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;duplicate_completions&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;[]&lt;/span&gt;  &lt;span class="c1"&gt;# Track late arrivals
&lt;/span&gt;            &lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Each task tracks its full lifecycle. The &lt;code&gt;sequence_number&lt;/code&gt; is particularly important: it's incremented every time a task is reassigned. This lets the master distinguish between a legitimate completion and a stale completion from a worker that was presumed dead but actually just slow.&lt;/p&gt;

&lt;h3&gt;
  
  
  Thread safety matters
&lt;/h3&gt;

&lt;p&gt;The master handles concurrent requests from multiple workers. Worker 1 might be reporting a task complete while worker 2 is requesting a new task and worker 3 is sending a heartbeat. Without proper locking, you get race conditions:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# BAD: Race condition
&lt;/span&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;assign_task&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;worker_id&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;task&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;tasks&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
    &lt;span class="n"&gt;task&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;status&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;running&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;  &lt;span class="c1"&gt;# Worker 1 reads 'pending' here
&lt;/span&gt;    &lt;span class="n"&gt;task&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;assigned_worker&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;worker_id&lt;/span&gt;  &lt;span class="c1"&gt;# Worker 2 also reads 'pending', gets same task
&lt;/span&gt;
&lt;span class="c1"&gt;# GOOD: Lock protects the critical section
&lt;/span&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;assign_task&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;worker_id&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;lock&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;task&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;tasks&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;task&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;status&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;pending&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="bp"&gt;False&lt;/span&gt;  &lt;span class="c1"&gt;# Task already assigned
&lt;/span&gt;        &lt;span class="n"&gt;task&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;status&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;running&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
        &lt;span class="n"&gt;task&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;assigned_worker&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;worker_id&lt;/span&gt;
        &lt;span class="n"&gt;task&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;sequence_number&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;+=&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;I use a single coarse-grained lock for simplicity. A production system might use finer-grained locking or lock-free data structures for better concurrency, but for this scale, a single lock works fine.&lt;/p&gt;

&lt;h3&gt;
  
  
  How a task lives and dies
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;PENDING --&amp;gt; RUNNING --&amp;gt; COMPLETED
              |
              +--&amp;gt; FAILED --&amp;gt; PENDING (try again, buddy)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Worker fails? Its tasks go back to pending. Some other worker picks them up. This is the entire fault tolerance story. Everything else is details.&lt;/p&gt;




&lt;h2&gt;
  
  
  When workers disappear (and they will)
&lt;/h2&gt;

&lt;p&gt;Your cluster &lt;em&gt;will&lt;/em&gt; eat dirt. Usually when you're not looking. The question isn't "will something fail" but "how badly will I handle it when it does."&lt;/p&gt;

&lt;p&gt;In distributed systems, we talk about failure models. The simplest is crash-stop: a node either works correctly or stops entirely. No weird half-states, no corrupted messages. MapReduce assumes crash-stop failures for workers. If a worker dies, it just disappears. The master detects this absence and reacts.&lt;/p&gt;

&lt;p&gt;The harder failure model is Byzantine: nodes might behave arbitrarily, including maliciously. MapReduce doesn't handle this. If a worker starts returning garbage data, you've got problems. The assumption is that you control your cluster and your machines aren't actively lying to you.&lt;/p&gt;

&lt;h3&gt;
  
  
  Heartbeats: Are You Still Alive?
&lt;/h3&gt;

&lt;p&gt;Workers ping the master every few seconds. "Hey, still here." If the master stops hearing from a worker, it assumes the worst.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;check_worker_health&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;current_time&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;time&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="n"&gt;dead_workers&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[]&lt;/span&gt;

    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;worker_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;last_heartbeat&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;worker_heartbeats&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;items&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;current_time&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="n"&gt;last_heartbeat&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;heartbeat_timeout&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;dead_workers&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;append&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;worker_id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;worker_id&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;dead_workers&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;handle_worker_failure&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;worker_id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The timeout is a tuning parameter. Too short and you get false positives. A worker pauses for GC and suddenly it's "dead." Too long and actual failures take forever to detect, leaving tasks stuck.&lt;/p&gt;

&lt;p&gt;I set mine to 10 seconds with heartbeats every 3 seconds. That gives a worker three chances to check in before the master gives up on it. In production, you'd tune this based on your network latency and acceptable detection time.&lt;/p&gt;

&lt;p&gt;This part nearly broke me. Threading and concurrency in Python is... not fun. I had this bug where the code would correctly identify a dead worker but then just... not reassign its tasks. The failure detection worked. The recovery didn't.&lt;/p&gt;

&lt;p&gt;The deadlock bug (with before/after code):&lt;/p&gt;

&lt;p&gt;Here's what the broken code looked like:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# BEFORE (broken) - causes deadlock
&lt;/span&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;handle_worker_failure&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;worker_id&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;lock&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;workers&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;worker_id&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;status&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;failed&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;reassign_worker_tasks&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;worker_id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;  &lt;span class="c1"&gt;# Tries to acquire same lock!
&lt;/span&gt;
&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;reassign_worker_tasks&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;worker_id&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;lock&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;  &lt;span class="c1"&gt;# DEADLOCK: already holding this lock from above
&lt;/span&gt;        &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;task&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;tasks&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;items&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;task&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;assigned_worker&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="n"&gt;worker_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                &lt;span class="n"&gt;task&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;status&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;pending&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The monitor thread acquired the lock, detected the dead worker, then tried to reassign tasks, but &lt;code&gt;reassign_worker_tasks&lt;/code&gt; also tried to acquire the same lock. Classic deadlock. The code just... hung. No error, no progress, nothing.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# AFTER (fixed) - call reassignment outside the lock
&lt;/span&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;handle_worker_failure&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;worker_id&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;lock&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;workers&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;worker_id&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;status&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;failed&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
    &lt;span class="c1"&gt;# Call OUTSIDE the lock to avoid deadlock
&lt;/span&gt;    &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;reassign_worker_tasks&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;worker_id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;I still don't fully understand why Python's threading module didn't just throw an error. Reentrant locks (RLock) are supposed to handle this. But well, it worked so I'm not going to touch it :)&lt;/p&gt;

&lt;h3&gt;
  
  
  The slow worker problem
&lt;/h3&gt;

&lt;p&gt;Here's a fun scenario. Worker A grabs a task. Worker A gets slow (bad network, garbage collection, who knows). Worker A misses the heartbeat timeout. Master marks A as dead and gives the task to Worker B. Now &lt;em&gt;both&lt;/em&gt; workers might finish the same task.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;t=0:   Master assigns map_0 to Worker A
t=3:   Worker A starts (but it's being slow about it)
t=10:  Worker A misses heartbeat deadline
t=11:  Master: "A is dead." Assigns map_0 to Worker B
t=12:  Worker B starts map_0
t=15:  Worker A finishes, sends completion
t=16:  Worker B finishes, sends completion  &amp;lt;-- uh oh
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This is the classic problem with failure detection in distributed systems. You cannot distinguish between "dead" and "slow." The network could be partitioned. The worker could be swapping to disk. It might recover and finish its work after you've already reassigned it.&lt;/p&gt;

&lt;p&gt;&lt;del&gt;I thought this would break everything.&lt;/del&gt; Turns out it's fine, as long as you track completions properly. Here's the actual implementation:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# src/master/state.py
&lt;/span&gt;&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;MasterState&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Manages the state of workers and tasks with idempotency support.

    The scenario: Worker A takes task, goes slow, misses heartbeat, gets marked failed.
    Task is reassigned to Worker B. Now both A and B might complete the same task.

    Solution: Track task completion by task_id. First completion wins,
    later completions are logged but ignored.
    &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;__init__&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;workers&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{}&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;tasks&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{}&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;completed_task_ids&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;set&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;  &lt;span class="c1"&gt;# The key to idempotency
&lt;/span&gt;        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;lock&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;threading&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;Lock&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;complete_task&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;sequence_number&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;output_data&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;worker_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Mark task as completed (idempotent).

        Returns: True if this was a duplicate completion, False otherwise
        &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
        &lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;lock&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="c1"&gt;# Check if task already completed (primary idempotency check)
&lt;/span&gt;            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;task_id&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;completed_task_ids&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                &lt;span class="c1"&gt;# This is a duplicate completion - log it but don't fail
&lt;/span&gt;                &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;tasks&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;duplicate_completions&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;].&lt;/span&gt;&lt;span class="nf"&gt;append&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;
                    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;worker_id&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;worker_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;timestamp&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;time&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
                    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;sequence_number&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;sequence_number&lt;/span&gt;
                &lt;span class="p"&gt;})&lt;/span&gt;
                &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;[&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;now&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;] DUPLICATE completion for task &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt; &lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
                      &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;from worker &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;worker_id&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt; (already completed by &lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
                      &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;tasks&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="p"&gt;].&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;completion_worker&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;)&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;  &lt;span class="c1"&gt;# Is duplicate
&lt;/span&gt;
            &lt;span class="c1"&gt;# First completion - record it
&lt;/span&gt;            &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;tasks&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;status&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;completed&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
            &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;tasks&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;completed_at&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;time&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
            &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;tasks&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;output_data&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;output_data&lt;/span&gt;
            &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;tasks&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;completion_worker&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;worker_id&lt;/span&gt;

            &lt;span class="c1"&gt;# Mark task as completed (idempotency tracking)
&lt;/span&gt;            &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;completed_task_ids&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;add&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="bp"&gt;False&lt;/span&gt;  &lt;span class="c1"&gt;# Not a duplicate
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;First one to finish wins. Later arrivals get logged and ignored. This is called idempotent handling: applying the same operation multiple times has the same effect as applying it once.&lt;/p&gt;

&lt;p&gt;This works because map and reduce are deterministic. Given the same input, they produce the same output. So if Worker A and Worker B both complete map_0, they write the same intermediate files. Doesn't matter who "wins." The result is identical. Running a task twice doesn't corrupt anything. It just wastes some CPU.&lt;/p&gt;

&lt;h3&gt;
  
  
  Something from the paper that finally clicked
&lt;/h3&gt;

&lt;p&gt;The original paper says:&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;"Any map tasks completed by the worker are reset back to their initial idle state."&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;The reasoning: outputs live on the failed worker's local disk, so they're gone.&lt;/p&gt;

&lt;p&gt;This confused me for a while. When a task completes, doesn't the worker report back to master with the file locations? If it already reported, why re-run?&lt;/p&gt;

&lt;p&gt;After implementing it myself, I finally understood: "completed" does not mean "durable."&lt;/p&gt;

&lt;p&gt;Here's the distinction. In Google's implementation, map outputs live on the worker's local disk. The worker finishes, reports the file location to the master, and the master records it. But that data isn't replicated anywhere. It's sitting on one machine's disk. If that machine dies before reducers pull the data, the data is gone. The master knows &lt;em&gt;where&lt;/em&gt; the data was, but it can't get to it anymore.&lt;/p&gt;

&lt;p&gt;So "completed" means the computation finished. But the result isn't durable until it's been consumed by the reduce phase or copied to stable storage. This is the difference between:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Computed: The work is done&lt;/li&gt;
&lt;li&gt;Durable: The result will survive failures&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;In my implementation, I track intermediate file locations centrally and assume shared storage (or that the files were copied). If a worker dies after reporting completion, the files are still accessible. So I decided: if the master already knows about the output and can still access it, don't re-run. If it doesn't, re-run.&lt;/p&gt;

&lt;p&gt;This might be a simplification compared to Google's implementation, but it taught me something important: in distributed systems, you have to think carefully about when a write "counts." Completing a computation is not the same as persisting the result.&lt;/p&gt;




&lt;h2&gt;
  
  
  The shuffle: where your network bill comes from
&lt;/h2&gt;

&lt;p&gt;After map finishes, you've got key-value pairs scattered across workers. The shuffle gets all values for a given key to the same reducer. This is the most expensive phase of MapReduce, and understanding why helps you write better jobs.&lt;/p&gt;

&lt;h3&gt;
  
  
  The all-to-all problem
&lt;/h3&gt;

&lt;p&gt;Consider what happens with M mappers and R reducers. Each mapper produces R output files (one per reducer partition). Each reducer needs to fetch data from all M mappers. That's M * R file transfers.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Mappers              Reducers
   M1  ----+----+      R1
           |    |
   M2  ----+----+      R2
           |    |
   M3  ----+----+      R3
        (M * R connections)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If you have 1000 mappers and 100 reducers, that's 100,000 network transfers. This is why the paper talks about bandwidth being a scarce resource. In a typical data center, machines have much more disk bandwidth than network bandwidth. The shuffle can easily become the bottleneck.&lt;/p&gt;

&lt;p&gt;Google's original implementation tried to mitigate this with data locality. The master tries to schedule map tasks on machines that already have the input data (or on machines in the same network rack). This way, map reads are local or nearly-local. The shuffle still requires network transfer, but at least you're not paying network costs twice.&lt;/p&gt;

&lt;p&gt;In my Docker-based setup, locality doesn't matter much since everything's on the same machine. But in a real cluster, this optimization is critical.&lt;/p&gt;

&lt;h3&gt;
  
  
  Partitioning (aka "Which Reducer Gets This Key?")
&lt;/h3&gt;

&lt;p&gt;Here's the actual implementation:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# src/utils/partitioner.py
&lt;/span&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;hashlib&lt;/span&gt;

&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;Partitioner&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Hash-based partitioning for intermediate keys&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;__init__&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;num_partitions&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;num_partitions&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;num_partitions&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;get_partition&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Get partition ID for a key using hash function&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
        &lt;span class="n"&gt;key_str&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;str&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="c1"&gt;# Use MD5 for consistent hashing across all Python processes
&lt;/span&gt;        &lt;span class="n"&gt;hash_value&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;int&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;hashlib&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;md5&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;key_str&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;encode&lt;/span&gt;&lt;span class="p"&gt;()).&lt;/span&gt;&lt;span class="nf"&gt;hexdigest&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="mi"&gt;16&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;hash_value&lt;/span&gt; &lt;span class="o"&gt;%&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;num_partitions&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Same key, same partition, every time. Doesn't matter which mapper produced it.&lt;/p&gt;

&lt;p&gt;The bug that cost me 4 hours:&lt;/p&gt;

&lt;p&gt;My first version looked like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# BROKEN - Don't do this
&lt;/span&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;get_partition&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nf"&gt;hash&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;%&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;num_partitions&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Python's built-in &lt;code&gt;hash()&lt;/code&gt; is randomized across processes by default (for security reasons, to prevent hash collision attacks). Worker 1 would send "hello" to partition 0. Worker 2 would send "hello" to partition 2. Word counts were wrong and I couldn't figure out why.&lt;/p&gt;

&lt;p&gt;The symptom: "the" was being counted three different times instead of once. The fix: use &lt;code&gt;hashlib.md5()&lt;/code&gt; which is deterministic across all processes.&lt;/p&gt;

&lt;p&gt;Get this wrong and "the" might get counted separately on three different reducers. Ask me how I know.&lt;/p&gt;

&lt;h3&gt;
  
  
  Why fine-grained tasks matter
&lt;/h3&gt;

&lt;p&gt;Google recommends M (map tasks) and R (reduce tasks) much larger than your worker count. The paper suggests M = 200,000 and R = 5,000 for a cluster of 2,000 workers. That's 100 map tasks per worker on average.&lt;/p&gt;

&lt;p&gt;Two reasons:&lt;/p&gt;

&lt;p&gt;First, load balancing. Tasks don't all take the same time. Some input splits are bigger. Some keys are more common (the "hot key" problem). If you have one task per worker, a slow task holds everything up. With 100 tasks per worker, a fast worker finishes its share and steals more work from the pending queue. The load naturally balances.&lt;/p&gt;

&lt;p&gt;Second, fault tolerance. Worker dies? You lose one tiny task, not 30% of your data. Reassigning a small task is cheap. Reassigning a massive task wastes all the work that was already done.&lt;/p&gt;

&lt;p&gt;I went with M = number of input chunks and R = 3 (configurable). Works fine for demo purposes. Production would want way more. The paper notes that having too many tasks also has overhead (the master has to track state for each one), so there's a sweet spot.&lt;/p&gt;




&lt;h2&gt;
  
  
  Chaining jobs together (the DAG scheduler)
&lt;/h2&gt;

&lt;p&gt;After getting basic MapReduce working, I ran into a real problem: my word count worked, but what if I wanted to do something with the results?&lt;/p&gt;

&lt;p&gt;Say I'm analyzing server logs. First job: count error types. Second job: find the top 10 errors. Third job: correlate with timestamps. Each job depends on the previous one's output. I could run them manually, one at a time, checking completion and copying outputs. That's tedious and error-prone.&lt;/p&gt;

&lt;p&gt;Real pipelines aren't one job. You've got:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Preprocess logs&lt;/li&gt;
&lt;li&gt;Preprocess events
&lt;/li&gt;
&lt;li&gt;Combine both outputs (has to wait for first two)&lt;/li&gt;
&lt;li&gt;Generate report (has to wait for the combine)
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;   preprocess_logs ----+
                       +--&amp;gt; combine_data --&amp;gt; final_report
   preprocess_events --+
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;That's a DAG. Directed Acyclic Graph. Directed because arrows have direction (A must finish before C). Acyclic because no cycles allowed.&lt;/p&gt;

&lt;p&gt;This is where my implementation goes beyond the original MapReduce paper. Google's paper describes single jobs. But real systems like Apache Airflow, Apache Oozie, and Google's own internal tools handle multi-job workflows. I wanted to understand how that orchestration layer works, and building it taught me a lot about dependency management and deadlock prevention.&lt;/p&gt;

&lt;p&gt;The DAG scheduler added roughly 400 lines of code on top of the core MapReduce implementation. Not trivial, but manageable once the foundation was solid.&lt;/p&gt;

&lt;h3&gt;
  
  
  Why DAGs?
&lt;/h3&gt;

&lt;p&gt;A DAG represents dependencies. Node = job. Edge = "must complete before." The structure guarantees that if you execute jobs in the right order, every job's inputs are ready when it starts.&lt;/p&gt;

&lt;p&gt;The key property is that a DAG has at least one topological ordering: a sequence where every job appears after all its dependencies. This is what lets us execute without deadlock.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;Job&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;__init__&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;job_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;input_data&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;num_reduce_tasks&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;job_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;job_id&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;status&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;JobStatus&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;PENDING&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;dependencies&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[]&lt;/span&gt;    &lt;span class="c1"&gt;# Jobs that must complete before this one
&lt;/span&gt;        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;dependents&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[]&lt;/span&gt;      &lt;span class="c1"&gt;# Jobs waiting for this one
&lt;/span&gt;        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;input_data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;input_data&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;output_data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  No cycles. Seriously.
&lt;/h3&gt;

&lt;p&gt;If Job A depends on B, B depends on C, and C depends on A, you've got a deadlock. Everyone's waiting for everyone else. Nothing runs. Ever.&lt;/p&gt;

&lt;p&gt;This is a fundamental property: cycles in a dependency graph create circular wait, one of the four conditions for deadlock (along with mutual exclusion, hold-and-wait, and no preemption). If you prevent any one of these conditions, you prevent deadlock. DAGs prevent circular wait by definition.&lt;/p&gt;

&lt;p&gt;The scheduler rejects cycles at submission time. Before adding a job with dependencies, I run a cycle detection check:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;_would_create_cycle&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;new_job_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;dependencies&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;List&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;bool&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Check if adding these dependencies would create a cycle.&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="n"&gt;visited&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;set&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;dfs&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;job_id&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;job_id&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="n"&gt;new_job_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;  &lt;span class="c1"&gt;# Found a path back to the new job
&lt;/span&gt;        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;job_id&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;visited&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="bp"&gt;False&lt;/span&gt;
        &lt;span class="n"&gt;visited&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;add&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;job_id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="n"&gt;job&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;jobs&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;job_id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;job&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;dep_id&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;job&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;dependents&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="nf"&gt;dfs&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;dep_id&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
                    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="bp"&gt;False&lt;/span&gt;

    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;dep_id&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;dependencies&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="nf"&gt;dfs&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;dep_id&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="bp"&gt;False&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If you try to create a cycle, the scheduler says no and throws an error. Better to fail fast at job submission than to discover a deadlock at runtime.&lt;/p&gt;

&lt;h3&gt;
  
  
  Running jobs in order
&lt;/h3&gt;

&lt;p&gt;Topological sort. Fancy name, simple idea: order jobs so every job comes after its dependencies. I used Kahn's algorithm because it naturally handles parallel jobs at the same "level."&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;get_topological_order&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;List&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;]:&lt;/span&gt;
    &lt;span class="n"&gt;in_degree&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;job_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nf"&gt;len&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;job&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;dependencies&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;job_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;job&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;jobs&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;items&lt;/span&gt;&lt;span class="p"&gt;()}&lt;/span&gt;
    &lt;span class="n"&gt;queue&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;deque&lt;/span&gt;&lt;span class="p"&gt;([&lt;/span&gt;&lt;span class="n"&gt;job_id&lt;/span&gt; &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;job_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;deg&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;in_degree&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;items&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;deg&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
    &lt;span class="n"&gt;order&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[]&lt;/span&gt;

    &lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="n"&gt;queue&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;job_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;queue&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;popleft&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="n"&gt;order&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;append&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;job_id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;dependent_id&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;jobs&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;job_id&lt;/span&gt;&lt;span class="p"&gt;].&lt;/span&gt;&lt;span class="n"&gt;dependents&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;in_degree&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;dependent_id&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;-=&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;
            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;in_degree&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;dependent_id&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                &lt;span class="n"&gt;queue&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;append&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;dependent_id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;order&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The algorithm starts with jobs that have no dependencies (in-degree zero). As each job "completes" (gets added to the order), it decrements the in-degree of its dependents. When a dependent's in-degree hits zero, all its dependencies are satisfied and it can run.&lt;/p&gt;

&lt;p&gt;If the output order is shorter than the number of jobs, you have a cycle. (This is another way to detect cycles, but I check earlier at submission time.)&lt;/p&gt;

&lt;h3&gt;
  
  
  Propagating completion
&lt;/h3&gt;

&lt;p&gt;At runtime, job completion triggers dependency checks:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;_propagate_completion&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;job_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;job&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;jobs&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;job_id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;dependent_id&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;job&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;dependents&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;_check_dependencies_met&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;dependent_id&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
            &lt;span class="n"&gt;dependent&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;jobs&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;dependent_id&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
            &lt;span class="n"&gt;dependent&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;status&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;JobStatus&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;READY&lt;/span&gt;
            &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ready_queue&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;append&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;dependent_id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;When a job finishes, we look at everything that was waiting for it. For each dependent, we check: are &lt;em&gt;all&lt;/em&gt; of its dependencies now complete? If yes, mark it ready and add it to the execution queue.&lt;/p&gt;

&lt;p&gt;This cascades through the DAG. If A and B both finish, and C depends on both, then C becomes ready after the second one completes. Parallelism where possible, dependencies respected. That's the whole game.&lt;/p&gt;

&lt;h3&gt;
  
  
  Job chaining: output becomes input
&lt;/h3&gt;

&lt;p&gt;One nice feature: the output of one job can automatically become the input of the next.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;scheduler&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;add_job&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;preprocess&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;input_data&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;raw_files&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;scheduler&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;add_job&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;aggregate&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; 
                  &lt;span class="n"&gt;dependencies&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;preprocess&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
                  &lt;span class="n"&gt;use_dependency_output&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;  &lt;span class="c1"&gt;# &amp;lt;-- magic
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;When "preprocess" completes, its output location gets passed to "aggregate" as input. No manual wiring. This makes it easy to build pipelines where each stage transforms the previous stage's output.&lt;/p&gt;

&lt;h3&gt;
  
  
  Example: setting up a job DAG
&lt;/h3&gt;

&lt;p&gt;Here's how you'd set up a multi-stage pipeline with the scheduler:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;src.master.job_scheduler&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;DAGJobScheduler&lt;/span&gt;

&lt;span class="n"&gt;scheduler&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;DAGJobScheduler&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="c1"&gt;# Stage 1: Two independent jobs (can run in parallel)
&lt;/span&gt;&lt;span class="n"&gt;scheduler&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;add_job&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;preprocess_logs&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;input_data&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;log1.txt&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;log2.txt&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
&lt;span class="n"&gt;scheduler&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;add_job&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;preprocess_events&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;input_data&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;events.json&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;

&lt;span class="c1"&gt;# Stage 2: Job that depends on both (waits for both to finish)
&lt;/span&gt;&lt;span class="n"&gt;scheduler&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;add_job&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;combine_data&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;dependencies&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;preprocess_logs&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;preprocess_events&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
    &lt;span class="n"&gt;use_dependency_output&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;True&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# Stage 3: Final job
&lt;/span&gt;&lt;span class="n"&gt;scheduler&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;add_job&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;final_report&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;dependencies&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;combine_data&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;

&lt;span class="c1"&gt;# Check execution order (topological sort)
&lt;/span&gt;&lt;span class="n"&gt;order&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;scheduler&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get_topological_order&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="c1"&gt;# Returns: ['preprocess_logs', 'preprocess_events', 'combine_data', 'final_report']
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The scheduler exposes methods like &lt;code&gt;get_next_ready_job()&lt;/code&gt;, &lt;code&gt;mark_job_started()&lt;/code&gt;, and &lt;code&gt;mark_job_completed()&lt;/code&gt; that the master uses to drive execution. When a job completes, &lt;code&gt;_propagate_completion()&lt;/code&gt; checks if any dependent jobs are now ready to run. The actual MapReduce execution still happens through the task scheduler; the DAG scheduler just controls the ordering.&lt;/p&gt;

&lt;p&gt;There's a comprehensive demo script at &lt;code&gt;examples/dag_scheduler_demo.py&lt;/code&gt; that shows:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Parallel execution of independent jobs&lt;/li&gt;
&lt;li&gt;Failure cascading through the DAG&lt;/li&gt;
&lt;li&gt;Output chaining between stages&lt;/li&gt;
&lt;li&gt;The ASCII visualization of job status&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Run it with &lt;code&gt;python3 examples/dag_scheduler_demo.py&lt;/code&gt; to see the scheduler in action.&lt;/p&gt;

&lt;h3&gt;
  
  
  End-to-end pipeline test
&lt;/h3&gt;

&lt;p&gt;The real proof is running actual MapReduce jobs with data chaining. The integration test at &lt;code&gt;examples/dag_integration_test.py&lt;/code&gt; runs a three-stage pipeline:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;word_count → filter_common → top_words
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Each job's output feeds into the next:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;word_count&lt;/strong&gt;: Counts all words in the input file&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;filter_common&lt;/strong&gt;: Filters to words appearing more than 5 times&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;top_words&lt;/strong&gt;: Computes statistics on the filtered results&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Here's actual output from running the pipeline:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Pipeline: word_count → filter_common → top_words

Job 1 (word_count):    8,332 unique words, 2.9M total occurrences
Job 2 (filter_common): 1,309 words kept (15.7% of input)
Job 3 (top_words):     Top 10 computed

Top 10 words: ut (57,988), in (50,649), et (47,689), vestibulum (47,654)...

Total Pipeline Time: 8.70s
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The output files are saved to &lt;code&gt;output/&lt;/code&gt;:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;word_count_output.json&lt;/code&gt; (134KB of word counts)&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;filter_common_output.json&lt;/code&gt; (filtered words)&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;top_words_output.json&lt;/code&gt; (statistics)&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;pipeline_results.json&lt;/code&gt; (complete run metadata)&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Run it yourself:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;# Unit tests (no Docker needed)&lt;/span&gt;
python3 examples/dag_integration_test.py &lt;span class="nt"&gt;--test&lt;/span&gt; unit

&lt;span class="c"&gt;# Full end-to-end pipeline with real data&lt;/span&gt;
python3 examples/dag_integration_test.py &lt;span class="nt"&gt;--test&lt;/span&gt; pipeline
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;h2&gt;
  
  
  Actually running this thing
&lt;/h2&gt;

&lt;p&gt;I don't have a thousand machines like Google did in 2004. (Theirs were dual-processor x86 boxes with 2-4GB RAM. Mine is a laptop that heats up during Zoom calls.)&lt;/p&gt;

&lt;p&gt;So: Docker. Each container pretends to be a machine. gRPC handles the network stuff.&lt;/p&gt;

&lt;h3&gt;
  
  
  Three Docker configurations
&lt;/h3&gt;

&lt;p&gt;The system has separate Docker Compose files for different use cases:&lt;/p&gt;

&lt;div class="table-wrapper-paragraph"&gt;&lt;table&gt;
&lt;thead&gt;
&lt;tr&gt;
&lt;th&gt;File&lt;/th&gt;
&lt;th&gt;Purpose&lt;/th&gt;
&lt;th&gt;Command&lt;/th&gt;
&lt;/tr&gt;
&lt;/thead&gt;
&lt;tbody&gt;
&lt;tr&gt;
&lt;td&gt;&lt;code&gt;docker-compose.yml&lt;/code&gt;&lt;/td&gt;
&lt;td&gt;Normal runs with sample data&lt;/td&gt;
&lt;td&gt;&lt;code&gt;docker-compose up&lt;/code&gt;&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;&lt;code&gt;docker-compose.benchmark.yml&lt;/code&gt;&lt;/td&gt;
&lt;td&gt;Performance benchmarking (1M words)&lt;/td&gt;
&lt;td&gt;&lt;code&gt;docker-compose -f docker-compose.benchmark.yml up&lt;/code&gt;&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;&lt;code&gt;docker-compose.dag.yml&lt;/code&gt;&lt;/td&gt;
&lt;td&gt;DAG pipeline execution&lt;/td&gt;
&lt;td&gt;&lt;code&gt;docker-compose -f docker-compose.dag.yml up&lt;/code&gt;&lt;/td&gt;
&lt;/tr&gt;
&lt;/tbody&gt;
&lt;/table&gt;&lt;/div&gt;

&lt;p&gt;All configurations use the same unified &lt;code&gt;server.py&lt;/code&gt; with different flags:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;# Single-job mode (default)&lt;/span&gt;
python3 &lt;span class="nt"&gt;-m&lt;/span&gt; src.master.server &lt;span class="nt"&gt;--input&lt;/span&gt; data/input/sample.txt

&lt;span class="c"&gt;# Pipeline mode (DAG execution)&lt;/span&gt;
python3 &lt;span class="nt"&gt;-m&lt;/span&gt; src.master.server &lt;span class="nt"&gt;--pipeline&lt;/span&gt; config/pipeline.json
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The worker count is tunable. The master dynamically accepts any number of workers that connect:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;# Run with 1 worker&lt;/span&gt;
docker-compose up master worker1

&lt;span class="c"&gt;# Run with 3 workers&lt;/span&gt;
docker-compose up master worker1 worker2 worker3

&lt;span class="c"&gt;# Run with 5 workers (add more to docker-compose.yml first)&lt;/span&gt;
docker-compose up master worker1 worker2 worker3 worker4 worker5
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Quick start
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;# spin up the cluster&lt;/span&gt;
docker-compose up &lt;span class="nt"&gt;-d&lt;/span&gt;

&lt;span class="c"&gt;# watch the master logs&lt;/span&gt;
docker-compose logs &lt;span class="nt"&gt;-f&lt;/span&gt; master

&lt;span class="c"&gt;# kill a worker mid-job&lt;/span&gt;
docker-compose stop worker2
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;There's also a web dashboard at &lt;strong&gt;&lt;a href="http://localhost:5000" rel="noopener noreferrer"&gt;http://localhost:5000&lt;/a&gt;&lt;/strong&gt; (when using &lt;code&gt;docker-compose.benchmark.yml&lt;/code&gt;) that shows real-time worker status and job progress. It's read-only, just polling the master's HTTP status endpoint every second. No buttons, no control. Just a live view of what's happening.&lt;/p&gt;

&lt;h3&gt;
  
  
  Things to try
&lt;/h3&gt;

&lt;p&gt;If you want to see fault tolerance in action, here are some experiments:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Start the cluster: &lt;code&gt;docker-compose -f docker-compose.benchmark.yml up --build&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;Open &lt;a href="http://localhost:5000" rel="noopener noreferrer"&gt;http://localhost:5000&lt;/a&gt; to watch progress&lt;/li&gt;
&lt;li&gt;While it's running, kill a worker: &lt;code&gt;docker stop benchmark_worker2&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;Watch the dashboard. Worker2 turns "failed", and its tasks get reassigned to surviving workers&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;The interesting case is when you restart the killed worker while the job is still running. If the original task was already reassigned and completed by another worker, the restarted worker might try to complete it again. The master should ignore the duplicate completion. This is idempotency in practice.&lt;/p&gt;




&lt;h2&gt;
  
  
  Some numbers (and the problem of being too fast)
&lt;/h2&gt;

&lt;p&gt;Here's an embarrassing problem I didn't expect: the system was too fast to benchmark properly.&lt;/p&gt;

&lt;h3&gt;
  
  
  The benchmarking challenge
&lt;/h3&gt;

&lt;p&gt;I ran into a big problem: I was suffering from success. &lt;br&gt;
My first attempt at benchmarking used a simple word count on &lt;code&gt;sample.txt&lt;/code&gt; (the default input file). The job completed in under 100 milliseconds. Workers would register, grab all the tasks, finish everything, and be done before I could even see progress in the logs. The Docker container startup time (3+ seconds) was longer than the actual computation.&lt;/p&gt;

&lt;p&gt;This is actually a good sign for the implementation. The gRPC overhead is minimal, task scheduling is fast, and the Python code isn't doing anything stupid. But it made benchmarking impossible. You can't measure a 1.5x speedup when your baseline is "too fast to measure."&lt;/p&gt;
&lt;h3&gt;
  
  
  Creating meaningful workload
&lt;/h3&gt;

&lt;p&gt;I had to engineer a bigger input file. Here's what I ended up with:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;# big_sample.txt stats:&lt;/span&gt;
&lt;span class="c"&gt;# - 6.6 MB file size&lt;/span&gt;
&lt;span class="c"&gt;# - 89,155 lines  &lt;/span&gt;
&lt;span class="c"&gt;# - 1,000,000 words (exactly 1M for nice round numbers)&lt;/span&gt;
&lt;span class="c"&gt;# - Lorem ipsum text repeated and varied&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The benchmark configuration uses &lt;code&gt;--chunk-size 500&lt;/code&gt;, which splits the 89K lines into 178 map tasks (89155 / 500 ≈ 178). Each map task processes 500 lines (~5,600 words). This creates enough parallelism for the workers to stay busy and for timing differences to be measurable.&lt;/p&gt;

&lt;h3&gt;
  
  
  The numbers
&lt;/h3&gt;

&lt;div class="table-wrapper-paragraph"&gt;&lt;table&gt;
&lt;thead&gt;
&lt;tr&gt;
&lt;th&gt;Configuration&lt;/th&gt;
&lt;th&gt;Time&lt;/th&gt;
&lt;th&gt;Notes&lt;/th&gt;
&lt;/tr&gt;
&lt;/thead&gt;
&lt;tbody&gt;
&lt;tr&gt;
&lt;td&gt;1 worker&lt;/td&gt;
&lt;td&gt;2.07s&lt;/td&gt;
&lt;td&gt;Sequential baseline&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;3 workers&lt;/td&gt;
&lt;td&gt;1.36s&lt;/td&gt;
&lt;td&gt;1.52x speedup&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;3 workers (1 killed mid-job)&lt;/td&gt;
&lt;td&gt;1.22s&lt;/td&gt;
&lt;td&gt;Fault tolerance overhead minimal&lt;/td&gt;
&lt;/tr&gt;
&lt;/tbody&gt;
&lt;/table&gt;&lt;/div&gt;

&lt;h3&gt;
  
  
  Why isn't it 3x faster with 3 workers?
&lt;/h3&gt;

&lt;p&gt;The theoretical maximum speedup with 3 workers is 3x. We got 1.52x. Here's where the time goes:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;1. Amdahl's Law in action&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Not everything parallelizes. The reduce phase has only 3 tasks (R=3), and they can't start until all 178 map tasks complete. If one mapper is slightly slower, everyone waits. The sequential portion (reduce phase startup, final aggregation) limits speedup.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;2. Task granularity overhead&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Each of the 178 map tasks requires:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;gRPC call to master to get the task (~1-2ms)&lt;/li&gt;
&lt;li&gt;JSON serialization of input data&lt;/li&gt;
&lt;li&gt;JSON serialization of output (intermediate files)&lt;/li&gt;
&lt;li&gt;gRPC call to report completion (~1-2ms)&lt;/li&gt;
&lt;li&gt;gRPC call to report intermediate file locations (~1-2ms)&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;That's roughly 5-6ms of overhead per task. With 178 tasks, that's almost a full second of just coordination overhead. The actual word counting is probably faster than the RPC calls.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;3. Shared filesystem bottleneck&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;In the Docker setup, all workers write intermediate files to the same mounted volume. The shuffle phase (reduce workers reading map outputs) hits this shared resource. In a real cluster with distributed storage (like GFS), this would be parallelized.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;4. Python GIL considerations&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Each worker runs in its own container, so the GIL isn't a direct issue. But within each worker, the heartbeat thread and task execution thread share the GIL. Heavy computation could delay heartbeats, though in practice word counting is I/O-bound, not CPU-bound.&lt;/p&gt;

&lt;h3&gt;
  
  
  Task execution internals
&lt;/h3&gt;

&lt;p&gt;Here's what actually happens when a worker executes a map task:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# From src/worker/executor.py - execute_map()
&lt;/span&gt;
&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;execute_map&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;input_data&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;input_key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Execute a map task.

    1. Apply user&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;s map function to input (word_count_map)
    2. Partition output into R buckets using hash(key) mod R
    3. Write partitioned data to local intermediate files (JSON)
    4. Return file locations to report to master
    &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="n"&gt;input_key&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;input_key&lt;/span&gt; &lt;span class="ow"&gt;or&lt;/span&gt; &lt;span class="n"&gt;task_id&lt;/span&gt;

    &lt;span class="c1"&gt;# Execute map phase: tokenize text, emit (word, 1) pairs
&lt;/span&gt;    &lt;span class="c1"&gt;# Then partition by hash(word) % num_reduce_tasks
&lt;/span&gt;    &lt;span class="n"&gt;partitions&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;map_phase&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;execute&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;input_key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;input_data&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# Write R intermediate files (one per reduce partition)
&lt;/span&gt;    &lt;span class="c1"&gt;# File format: {word: [1, 1, 1, ...], word2: [1], ...}
&lt;/span&gt;    &lt;span class="n"&gt;file_paths&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;intermediate_manager&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;write_partitioned_output&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; 
        &lt;span class="n"&gt;partitions&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;file_paths&lt;/span&gt;  &lt;span class="c1"&gt;# {0: "map_5_partition_0.json", 1: "...", 2: "..."}
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The intermediate file format groups values by key within each partition:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="err"&gt;//&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;intermediate/worker&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="err"&gt;/map_&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="err"&gt;_partition_&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="err"&gt;.json&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"the"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"quick"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"brown"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="err"&gt;...&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This pre-grouping at the mapper (sometimes called a combiner effect) reduces the work the reducer has to do. Instead of seeing 50,000 individual &lt;code&gt;("the", 1)&lt;/code&gt; pairs, the reducer sees &lt;code&gt;("the", [1, 1, 1, ...])&lt;/code&gt; already grouped.&lt;/p&gt;

&lt;h3&gt;
  
  
  Reduce task execution
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# From src/worker/executor.py - execute_reduce()
&lt;/span&gt;
&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;execute_reduce&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;partition_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;intermediate_locations&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Execute a reduce task.

    1. Fetch intermediate files from ALL map workers for this partition
    2. Merge the pre-grouped data (shuffle phase)
    3. Sort by key
    4. Apply reduce function to each (key, values) group
    5. Return final output
    &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="c1"&gt;# Fetch from all mappers: each contributed one file for this partition
&lt;/span&gt;    &lt;span class="c1"&gt;# intermediate_locations = [
&lt;/span&gt;    &lt;span class="c1"&gt;#   {"file_path": "intermediate/worker1/map_0_partition_0.json"},
&lt;/span&gt;    &lt;span class="c1"&gt;#   {"file_path": "intermediate/worker2/map_1_partition_0.json"},
&lt;/span&gt;    &lt;span class="c1"&gt;#   ...
&lt;/span&gt;    &lt;span class="c1"&gt;# ]
&lt;/span&gt;
    &lt;span class="n"&gt;grouped_data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;_fetch_and_group_intermediate&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;intermediate_locations&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="c1"&gt;# grouped_data = {"the": [1,1,1,...,1,1], "cat": [1,1], ...}
&lt;/span&gt;
    &lt;span class="c1"&gt;# Sort by key for consistent output ordering
&lt;/span&gt;    &lt;span class="n"&gt;sorted_data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;sorted&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;grouped_data&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;items&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;

    &lt;span class="c1"&gt;# Apply reduce function: sum all the 1s
&lt;/span&gt;    &lt;span class="n"&gt;result&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;reduce_phase&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;execute&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;sorted_data&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="c1"&gt;# result = {"the": 50000, "cat": 200, ...}
&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;result&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Master task scheduling
&lt;/h3&gt;

&lt;p&gt;The scheduler implements a strict phase ordering:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# From src/master/scheduler.py - get_next_task()
&lt;/span&gt;
&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;get_next_task&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;worker_id&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Scheduling policy:
    1. During map phase: assign pending map tasks (FIFO)
    2. After ALL maps complete: create reduce tasks  
    3. During reduce phase: assign pending reduce tasks
    &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="c1"&gt;# Check phase transition
&lt;/span&gt;    &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;check_map_phase_complete&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;  &lt;span class="c1"&gt;# Creates reduce tasks if ready
&lt;/span&gt;
    &lt;span class="n"&gt;pending_tasks&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;state&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get_pending_tasks&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="ow"&gt;not&lt;/span&gt; &lt;span class="n"&gt;pending_tasks&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;check_job_complete&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
            &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt; MapReduce job complete!&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;

    &lt;span class="c1"&gt;# Prioritize: map tasks first, then reduce
&lt;/span&gt;    &lt;span class="n"&gt;map_tasks&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;t&lt;/span&gt; &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;pending_tasks&lt;/span&gt; &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;task_type&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;map&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
    &lt;span class="n"&gt;reduce_tasks&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;t&lt;/span&gt; &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;pending_tasks&lt;/span&gt; &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;task_type&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;reduce&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;

    &lt;span class="n"&gt;task&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;map_tasks&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;map_tasks&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="n"&gt;reduce_tasks&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
    &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;state&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;assign_task&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;task&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;task_id&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt; &lt;span class="n"&gt;worker_id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;task&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The key insight: reduce tasks aren't even created until all map tasks complete. This ensures reducers have all the intermediate data they need. If we created reduce tasks earlier, reducers would have incomplete data.&lt;/p&gt;

&lt;h3&gt;
  
  
  Intermediate file management
&lt;/h3&gt;

&lt;p&gt;Each worker maintains its own intermediate directory:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;intermediate/
├── worker1/
│   ├── map_0_partition_0.json   # Keys hashing to partition 0
│   ├── map_0_partition_1.json   # Keys hashing to partition 1
│   ├── map_0_partition_2.json   # Keys hashing to partition 2
│   ├── map_3_partition_0.json   # Worker 1 also did map task 3
│   └── ...
├── worker2/
│   ├── map_1_partition_0.json
│   └── ...
└── worker3/
    └── ...
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;With M=178 map tasks and R=3 reduce tasks, we create 178 × 3 = 534 intermediate files total. Each file is typically 10-50KB for this workload.&lt;/p&gt;

&lt;h3&gt;
  
  
  Timing breakdown (rough estimates for 1-worker baseline)
&lt;/h3&gt;

&lt;div class="table-wrapper-paragraph"&gt;&lt;table&gt;
&lt;thead&gt;
&lt;tr&gt;
&lt;th&gt;Phase&lt;/th&gt;
&lt;th&gt;Time&lt;/th&gt;
&lt;th&gt;Notes&lt;/th&gt;
&lt;/tr&gt;
&lt;/thead&gt;
&lt;tbody&gt;
&lt;tr&gt;
&lt;td&gt;Worker startup &amp;amp; registration&lt;/td&gt;
&lt;td&gt;~3s&lt;/td&gt;
&lt;td&gt;Docker container + gRPC handshake&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;Map phase (178 tasks)&lt;/td&gt;
&lt;td&gt;~1.5s&lt;/td&gt;
&lt;td&gt;178 × (parse + hash + write JSON)&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;Shuffle (reduce fetching files)&lt;/td&gt;
&lt;td&gt;~0.3s&lt;/td&gt;
&lt;td&gt;3 reducers × 178 files each&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;Reduce phase (3 tasks)&lt;/td&gt;
&lt;td&gt;~0.2s&lt;/td&gt;
&lt;td&gt;3 × (merge + sum + write result)&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;&lt;strong&gt;Total job time&lt;/strong&gt;&lt;/td&gt;
&lt;td&gt;&lt;strong&gt;~2.0s&lt;/strong&gt;&lt;/td&gt;
&lt;td&gt;Excluding container startup&lt;/td&gt;
&lt;/tr&gt;
&lt;/tbody&gt;
&lt;/table&gt;&lt;/div&gt;

&lt;p&gt;The 3-worker case parallelizes the map phase but not much else. Workers split the 178 map tasks roughly evenly (~60 each), cutting that 1.5s down to ~0.5s. But shuffle and reduce don't scale as well, hence 1.52x not 3x.&lt;/p&gt;

&lt;p&gt;You can run the benchmark yourself:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;python3 examples/benchmark.py
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This starts the Docker cluster, runs the job with different worker counts, and reports timing. Results are saved to &lt;code&gt;benchmark_results.txt&lt;/code&gt;.&lt;/p&gt;




&lt;h2&gt;
  
  
  Connecting the dots to class
&lt;/h2&gt;

&lt;p&gt;Building this system gave me concrete experience with distributed systems concepts that previously felt abstract. Here's how they showed up in practice:&lt;/p&gt;

&lt;h3&gt;
  
  
  Failure models and detection
&lt;/h3&gt;

&lt;p&gt;The system assumes crash-stop failures: workers either work correctly or stop entirely. This is the simplest failure model and the easiest to handle. Byzantine failures (where nodes lie or behave arbitrarily) would require much more complex protocols. Things like Byzantine Fault Tolerant consensus, which needs 3f+1 nodes to tolerate f Byzantine failures.&lt;/p&gt;

&lt;p&gt;Failure detection in distributed systems is fundamentally imperfect. You cannot distinguish between "crashed" and "slow." This connects to the FLP impossibility result (Fischer, Lynch, and Paterson, 1985): in an asynchronous system where message delays are unbounded, no deterministic consensus protocol can guarantee both safety (never making a wrong decision) and liveness (eventually making progress) with even one faulty process.&lt;/p&gt;

&lt;p&gt;What does this mean practically? It means you have to choose. MapReduce sidesteps the impossibility by:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Not requiring consensus: The master makes all decisions unilaterally. No need for workers to agree on anything.&lt;/li&gt;
&lt;li&gt;Accepting imperfect failure detection: Timeouts might incorrectly mark a slow worker as dead. That's okay.&lt;/li&gt;
&lt;li&gt;Making failures recoverable through retry: If we mistakenly reassign a task, the duplicate execution doesn't corrupt anything.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;The FLP result doesn't say distributed systems are impossible. It says you can't have &lt;em&gt;everything&lt;/em&gt;. MapReduce works because it gives up on perfect failure detection and compensates with idempotent operations.&lt;/p&gt;

&lt;h3&gt;
  
  
  Consistency and idempotency
&lt;/h3&gt;

&lt;p&gt;MapReduce achieves eventual consistency through idempotent operations. The same task might run multiple times, but the result is always the same. This is a form of at-least-once semantics: we guarantee every task runs at least once, but it might run more than once.&lt;/p&gt;

&lt;p&gt;The alternative would be exactly-once semantics, which is much harder to achieve in distributed systems. It typically requires two-phase commit or similar coordination protocols, which add latency and complexity. MapReduce avoids this by making duplicate execution safe.&lt;/p&gt;

&lt;h3&gt;
  
  
  Coordination patterns
&lt;/h3&gt;

&lt;p&gt;The master implements centralized coordination. All decisions flow through one node. This is simple but creates a bottleneck and single point of failure.&lt;/p&gt;

&lt;p&gt;The alternative is decentralized coordination, where nodes coordinate peer-to-peer. Systems like Paxos and Raft do this for consensus. The trade-off: decentralized systems are more fault-tolerant but harder to reason about and typically slower (multiple rounds of communication to agree on anything).&lt;/p&gt;

&lt;p&gt;For MapReduce, centralized coordination makes sense. The master is a coordination bottleneck, not a data bottleneck. It makes decisions but doesn't process terabytes of data. This asymmetry makes centralization acceptable.&lt;/p&gt;

&lt;h3&gt;
  
  
  Data partitioning
&lt;/h3&gt;

&lt;p&gt;Hash partitioning ensures keys are evenly distributed across reducers. It's simple and works well when keys are uniformly distributed.&lt;/p&gt;

&lt;p&gt;The downside is that it doesn't preserve ordering. If you need sorted output, you'd use range partitioning instead: keys 0-99 go to reducer 0, keys 100-199 go to reducer 1, etc. This preserves order but risks imbalanced load if keys aren't uniformly distributed.&lt;/p&gt;

&lt;h3&gt;
  
  
  State machine thinking
&lt;/h3&gt;

&lt;p&gt;I started thinking of tasks as state machines:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;PENDING -&amp;gt; RUNNING -&amp;gt; COMPLETED
              |
              v
           FAILED -&amp;gt; PENDING (retry)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This made the code cleaner. Each state transition has clear triggers and effects. The master maintains the current state; workers trigger transitions by requesting tasks and reporting completion.&lt;/p&gt;

&lt;h3&gt;
  
  
  CAP theorem trade-offs
&lt;/h3&gt;

&lt;p&gt;The CAP theorem says you can have at most two of: Consistency, Availability, Partition tolerance. Where does MapReduce land?&lt;/p&gt;

&lt;p&gt;Actually, MapReduce assumes partitions won't happen, or are rare enough to ignore. Within a datacenter, network partitions are uncommon (though not impossible). Given this assumption, MapReduce provides both consistency and availability. It's a CA system that assumes reliable networking.&lt;/p&gt;

&lt;p&gt;But what happens when the assumption breaks? If the master becomes partitioned from workers, the system stops making progress. Workers can't get tasks. Tasks can't be reported complete. MapReduce chooses to stop rather than return potentially wrong results. In CAP terms, when forced to choose, it picks consistency over availability.&lt;/p&gt;

&lt;p&gt;This is similar to traditional databases. They often assume reliable networks within a datacenter and provide CA semantics. The CAP trade-off really bites when you cross datacenters or have unreliable networks. That's when you need to make hard choices about CP vs AP.&lt;/p&gt;

&lt;p&gt;A true CP system (like one using Raft for master election) would continue making progress during partitions by electing a new leader in the majority partition. MapReduce doesn't do this. It just waits for the partition to heal. That's a pragmatic choice for batch processing where jobs can be restarted.&lt;/p&gt;




&lt;h2&gt;
  
  
  What I skipped (and why)
&lt;/h2&gt;

&lt;p&gt;Speculative execution. The paper describes running backup copies of slow tasks. This requires tracking task timing statistics and deciding when a task is "too slow." It's an optimization that matters at scale but adds significant complexity. I'd need to track p50/p99 task durations, decide on backup thresholds, and handle the case where both original and backup complete. Valuable, but not core to understanding MapReduce.&lt;/p&gt;

&lt;p&gt;Combiner functions. If your reduce function is associative (like sum or count), you can run a "mini-reduce" on the mapper before shuffling. This reduces network traffic significantly. Instead of sending [(word, 1), (word, 1), (word, 1)], you send [(word, 3)]. I understood the concept but prioritized other features.&lt;/p&gt;

&lt;p&gt;Master failover. This would require implementing Raft or Paxos for leader election, which is essentially a whole separate project. The master checkpoints its state, so recovery is possible, but automated failover would need consensus among standby nodes. Maybe next semester.&lt;/p&gt;

&lt;p&gt;Data locality optimization. Google's scheduler tries to place map tasks on machines that already have the input data. In my Docker setup, all containers share the same filesystem, so locality is meaningless. Real clusters would benefit hugely from this.&lt;/p&gt;




&lt;h2&gt;
  
  
  What I actually learned
&lt;/h2&gt;

&lt;p&gt;The paper is 13 pages. The algorithm fits in your head. The hard part is everything the paper doesn't say: the edge cases, the debugging, the moments where you have to make a design decision and the paper just moves on.&lt;/p&gt;

&lt;p&gt;I spent more time debugging threading issues than writing the actual map/reduce logic. The distinction between "completed" and "durable" didn't click until I had workers dying at inconvenient times and had to think through what should happen to their outputs.&lt;/p&gt;

&lt;p&gt;Building this changed how I read distributed systems papers. I now notice the gaps, the places where the authors say "and then we do X" without explaining how X actually works when messages get delayed or nodes crash at the wrong moment. Those gaps are where the real learning happens.&lt;/p&gt;




&lt;h2&gt;
  
  
  Links
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;My implementation: &lt;a href="https://github.com/Ajodo-Godson/MapReduce" rel="noopener noreferrer"&gt;github.com/Ajodo-Godson/MapReduce&lt;/a&gt;. Clone it, break it, learn from it.&lt;/li&gt;
&lt;li&gt;The original paper: &lt;a href="https://static.googleusercontent.com/media/research.google.com/en//archive/mapreduce-osdi04.pdf" rel="noopener noreferrer"&gt;MapReduce: Simplified Data Processing on Large Clusters&lt;/a&gt;. Pages 3-5 cover the execution model, page 7 covers fault tolerance. Both are worth re-reading after you've implemented something.&lt;/li&gt;
&lt;li&gt;&lt;a href="https://grpc.io/docs/what-is-grpc/introduction/" rel="noopener noreferrer"&gt;gRPC introduction&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://protobuf.dev/" rel="noopener noreferrer"&gt;Protocol Buffers documentation&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;
&lt;a href="https://groups.csail.mit.edu/tds/papers/Lynch/jacm85.pdf" rel="noopener noreferrer"&gt;FLP Impossibility Paper&lt;/a&gt;. Dense but foundational. The result is simple; the proof is not.&lt;/li&gt;
&lt;/ul&gt;




&lt;p&gt;&lt;em&gt;Fall 2025. Distributed Systems.&lt;/em&gt;&lt;/p&gt;

</description>
      <category>systemdesign</category>
      <category>computerscience</category>
      <category>learning</category>
      <category>showdev</category>
    </item>
  </channel>
</rss>
