Join Algorithms in Distributed Engines

LESSON

Database Engine Internals and Implementation

042 30 min advanced

Day 443: Join Algorithms in Distributed Engines

The core idea: In a distributed engine, join planning is mostly a decision about data movement. The local join operator matters, but the expensive question is which rows have to move, which side can stay put, and how much skew or spill the cluster can survive.

Today's "Aha!" Moment

The Harbor Point compliance desk has moved past the routing problem from 040.md and the stage-boundary problem from 041.md. Its morning report now needs to join today's reservations with issuer_limits so the desk can flag open trades that are already above the approved limit. The SQL looks ordinary:

SELECT r.issuer_id, SUM(r.notional_usd) AS open_notional, l.limit_usd
FROM reservations r
JOIN issuer_limits l ON r.issuer_id = l.issuer_id
WHERE r.trading_day = CURRENT_DATE
  AND r.status = 'open'
  AND l.active = true
GROUP BY r.issuer_id, l.limit_usd;

What decides whether this query feels instant or melts the cluster is not the word JOIN by itself. It is whether issuer_limits is small enough to broadcast, whether both tables are already partitioned by issuer_id, or whether the engine has to repartition both sides and pay a full shuffle. The algorithm name, such as hash join or merge join, only becomes meaningful after that distribution choice is made.

That is the misconception to correct. Engineers often ask, "Should the engine use a hash join or a merge join?" In a single-node database that can be a good first question. In a distributed engine it is usually the second question. The first is, "How do matching rows meet at all?" Once you see a join as a combination of exchange strategy plus local matching strategy, the performance pathologies become much easier to predict.

Why This Matters

Harbor Point stores reservations as a large sharded fact table. issuer_limits is much smaller and changes only a few times per day. Later in the same workflow, the desk also joins reservations to fills, which is another large table distributed independently because it is written by a different ingestion path. Those two joins look similar in SQL and behave very differently in production.

If the engine broadcasts issuer_limits, each worker can keep scanning its local shard of reservations and join against an in-memory copy of the limit table. That is cheap because only the small side moves. If the engine applies the same broadcast strategy to the reservations-to-fills join, every worker suddenly needs a copy of a large relation and the query can blow through memory before it returns the first row.

Distributed joins therefore determine more than raw speed. They shape network traffic, exchange spill volume, retry cost, and the amount of skew the system can tolerate. When a cluster becomes slower after more nodes are added, a bad distributed join is one of the first places to look because extra nodes also mean extra network paths, extra copies of broadcast data, and more opportunities for one hot key to dominate a partition.

Learning Objectives

By the end of this session, you will be able to:

  1. Explain why join planning starts with data placement - Describe how broadcast, repartition, and colocated joins decide where matching rows can meet.
  2. Compare local join algorithms inside a distributed stage - Analyze when a worker should use hash, merge, or lookup-style execution after the distribution choice is fixed.
  3. Diagnose production join failures - Trace how skew, stale statistics, spill, and late-arriving filters turn a plausible join plan into a bad one.

Core Concepts Explained

Concept 1: The first distributed join decision is who moves

The previous lesson introduced exchange operators as the stage boundaries where rows change ownership. Distributed joins are where those exchanges stop being abstract. Before Harbor Point's engine can run any local join code, it has to choose one of three broad shapes.

The cheapest case is a colocated join. If reservations and issuer_limits are both partitioned by issuer_id with the same hash function and compatible partition counts, then each worker already holds the matching rows it needs. The plan can stay local:

Worker 1: reservations[issuer_id hash 1] join issuer_limits[issuer_id hash 1]
Worker 2: reservations[issuer_id hash 2] join issuer_limits[issuer_id hash 2]
...

This is ideal because there is no extra shuffle. The trade-off is that colocated joins are fragile. The moment one table is repartitioned differently, filtered through a stage that changes distribution, or bucketed on another key, the optimizer loses that free win.

The next-best case for Harbor Point's limit check is a broadcast join. Because issuer_limits is small, the coordinator can send a copy of it to every worker that scans reservations. Each worker then performs the join locally against the broadcast copy. Broadcast is attractive because it avoids shuffling the large fact table, but the small side is replicated N times. A table that looks harmless at 50 MB becomes a 1 GB memory commitment on a 20-worker cluster before the hash table overhead is counted.

When neither colocated nor broadcast is safe, the engine falls back to a repartitioned join. Both inputs pass through a repartition exchange on the join key so that all rows for one key land in the same downstream partition:

reservations -- hash(issuer_id) --> partition 0..P-1
issuer_limits -- hash(issuer_id) --> partition 0..P-1

This is the general solution because it works even when both sides are large and initially misaligned. It is also the most expensive one because both sides pay network, buffering, and possible spill. The optimizer's first real job is to avoid this plan unless the other two options are worse or impossible.

Concept 2: After rows meet, each worker still needs the right local join algorithm

Once Harbor Point's engine has decided how rows arrive at a worker, the local join algorithm takes over. In distributed analytical engines, the default is usually a hash join. One side becomes the build side: the worker reads that side first, constructs a hash table keyed by issuer_id, and then probes it with rows from the other side. Broadcast joins almost always turn the replicated input into the build side because it is intentionally the smaller relation.

Hash join is popular because it handles unsorted inputs and can be extremely fast when the build side fits in memory. The danger is memory pressure. If the optimizer underestimates the build side, the worker spills partitions to disk, repartitions internally, or fails outright. In a distributed plan that means the join is now paying both cluster-level shuffle cost and node-level spill cost.

Merge join becomes attractive when both inputs are already sorted or can be streamed in key order cheaply. Imagine Harbor Point's later reconciliation query joining reservations and fills after both have passed through an upstream sort on reservation_id. A merge join can walk both ordered streams once, consuming much less random-access memory than a large hash table. The trade-off is that sorting is expensive; if the engine has to sort both sides only for the join, the merge join often loses to a hash join.

Some distributed SQL engines also support lookup joins, which are essentially controlled nested-loop joins with indexing. Suppose the compliance desk starts with 200 flagged reservation IDs from a rules engine and needs to fetch matching rows from a partitioned fills table keyed by reservation_id. Instead of shuffling the whole fills table, the engine can send targeted point lookups to the owning partitions. That is excellent for a tiny outer input and catastrophic for a large one because it turns into thousands or millions of remote index probes.

The important point is that "distributed join algorithm" is a two-layer decision. The outer layer decides distribution. The inner layer decides how each worker matches rows once the distribution is fixed.

Concept 3: Real systems fail on skew, bad estimates, and missed pruning opportunities

Harbor Point's unpleasant join incidents rarely come from a plan that is obviously absurd. They come from plans that were reasonable under the wrong assumptions. The classic example is stale statistics. If issuer_limits unexpectedly grows because the risk team adds intraday overrides per desk and region, yesterday's "small dimension table" may no longer fit comfortably as a broadcast build side. The optimizer can still choose broadcast if its row-count estimate is stale, and the query then dies under per-worker memory pressure.

Skew is the other common failure mode. Suppose one issuer is responsible for 40 percent of today's open notional because of a market event. In a repartitioned hash join, all rows for that issuer land in one downstream partition. Most workers finish quickly while one worker builds an oversized hash table, spills heavily, and keeps the query open. From the user's perspective the cluster is slow; from the engine's perspective one partition is doing almost all the work.

Modern engines try to reduce this pain with runtime filters, bloom filters, or dynamic filtering. If the issuer_limits side first applies active = true and reduces the key set to a few hundred issuers, the engine can push that information back toward the reservations scan and avoid reading or shuffling rows for irrelevant issuers. This optimization is powerful because it cuts work before the join happens. It is also timing-sensitive. If the dynamic filter is produced after the large scan already ran, the theoretical win arrives too late to matter.

Fault tolerance adds one more trade-off. A repartitioned join creates large exchange outputs that may need to be retried if one worker fails. Materializing those exchange partitions makes the query more resilient, but it adds I/O and storage churn. Keeping everything in memory minimizes latency, but a single worker loss can force upstream stages to rerun. The right choice depends on whether Harbor Point is serving a fast interactive check or a long compliance report that is expensive to restart.

Troubleshooting

Issue: The optimizer picks a broadcast join and workers run out of memory.

Why it happens / is confusing: The supposedly small side was estimated from stale statistics, or it is small on disk but expands significantly once decoded and hashed in memory.

Clarification / Fix: Check estimated versus actual row counts on the build side, then force repartition or refresh statistics if the broadcast threshold is no longer safe. Memory sizing for a broadcast join must include one copy per worker plus hash-table overhead.

Issue: Adding nodes makes a large join slower instead of faster.

Why it happens / is confusing: More nodes increase parallel scan capacity, but they also increase the number of exchange buffers, network transfers, and copies of any broadcast input. If one hot key dominates, the extra workers do not help the overloaded partition.

Clarification / Fix: Inspect exchange-level row counts, skew ratio, and spill volume. If the problem is skew, use adaptive partition splitting or key salting where the engine supports it. If the problem is broadcast replication cost, switch to repartition and let each worker own only one key range.

Issue: A lookup join looks elegant in the plan but pounds the storage layer.

Why it happens / is confusing: The outer side is larger than expected, so what looked like a few indexed remote probes has become a high-fanout RPC pattern with poor locality.

Clarification / Fix: Cap lookup joins to genuinely small outer relations or to cases where the outer side is already narrowed by a selective predicate. Once probe count grows, a repartitioned hash join is often cheaper than thousands of random remote reads.

Advanced Connections

Connection 1: 041.md defines the stage boundaries that joins exploit

The exchange operators from the previous lesson are the mechanism that makes distributed joins possible. Broadcast join means inserting an exchange that copies one side to many consumers. Repartitioned join means inserting matching hash-partition exchanges on both sides. Colocated join is interesting precisely because it avoids those exchanges.

Connection 2: 043.md asks what happens inside the worker after the join shape is chosen

Once the optimizer chooses broadcast, repartition, or colocated execution, the next performance question is no longer "which node gets which rows?" It is "how efficiently does each worker process its batches?" The next lesson picks up there by looking at vectorized execution, cache-friendly layouts, and why a good join plan can still waste CPU if the local executor processes rows one at a time.

Resources

Optional Deepening Resources

Key Insights

  1. Distributed joins begin with row placement - Broadcast, repartition, and colocated execution decide whether the cluster moves one input, both inputs, or neither before any local join logic runs.
  2. The local join algorithm is only half the story - Hash, merge, and lookup joins are worker-level strategies that inherit the costs created by the distribution plan above them.
  3. Most production pain comes from wrong assumptions - Stale statistics, skewed keys, late runtime filters, and retry policy choices are what turn an acceptable join plan into an expensive one.
PREVIOUS Parallel Query Execution and Exchange Operators NEXT Vectorized Execution and CPU Cache Efficiency

← Back to Database Engine Internals and Implementation

← Back to Learning Hub