Cross-Shard Queries and Fan-Out Costs

LESSON

Data Architecture and Platforms

024 30 min advanced

Day 482: Cross-Shard Queries and Fan-Out Costs

The core idea: Once data is partitioned, an apparently simple read becomes a distributed workflow whose real cost is set by target selection, snapshot semantics, and the slowest shard in the fan-out.

Today's "Aha!" Moment

After 023.md, PayLedger stopped routing every Northwind Payroll write to one hot shard. Payout holds now live on the shard that owns a specific funding account, which fixed the month-end hotspot. The next page load exposed the new pressure point. The merchant-operations dashboard still promises a single summary card: total available funds, pending holds, failed payouts in the last hour, and the next settlement cut. Before repartitioning, that card came from one shard. Now it requires twelve shard-local reads, a coordinator merge, and a decision about whether "current" means one shared snapshot or merely twelve recent answers.

The easy misconception is that parallelism makes this nearly free. It does not. If eleven shards answer in 9 ms and one answers in 52 ms, the coordinator still waits around 52 ms before it can return an exact total. If each shard call succeeds 99.9% of the time and the query needs 20 shards, the best-case full-success rate is only about 98%. A scatter-gather path amplifies small per-shard costs into user-visible tail latency and availability loss.

That is the shift this lesson needs to make concrete. A cross-shard query is not "a normal query, but across more machines." It is a small distributed system: it needs routing metadata, a policy for partial failure, a merge plan, and an explicit answer to whether the combined result must represent one moment in time or whether a slightly stale derived view is acceptable. Once those contracts are clear, fan-out can be an intentional trade-off. Without them, it becomes an unbounded tax on every high-level read.

Why This Matters

Production teams usually notice cross-shard query problems late because shard-local benchmarks look fine. Each storage node can report healthy p50 query time, acceptable CPU, and modest scan counts, while the user-facing endpoint degrades anyway. The coordinator is doing extra work to discover target shards, open connections, buffer partials, sort or aggregate them, and decide what to do when one shard times out. Meanwhile the product team still expects merchant-summary to behave like a single-record lookup.

PayLedger makes the trade-off concrete. The hotspot mitigation from 023.md was correct because the invariant lives at the funding-account level, not at the merchant level. But that better write distribution means merchant-wide reads are now derived data. If the support workflow needs an exact answer before approving a manual payout override, the system may need a shared read timestamp or a primary-only read path. If the page is operational telemetry for humans, a precomputed summary that is 15 seconds stale may be the better product contract. Cross-shard queries force that distinction into the open, which is why they sit at the boundary between schema design, query planning, and production reliability.

Core Walkthrough

Part 1: Grounded Situation

PayLedger stores authoritative balance state by (merchant_id, funding_account_id). Each shard owns the WAL, in-memory balance ledger, and idempotency window for its accounts. The merchant dashboard does not care about any single account in isolation. It needs a roll-up of total available funds, holds waiting to settle, payments that failed in the last hour, and any funding account whose replica lag makes failover unsafe.

That sounds like one query, but the authoritative facts live on different shard owners. The first design mistake is broadcasting the dashboard request to every shard in the fleet. That creates predictable waste: quiet shards do no useful work, network traffic grows with cluster size rather than with merchant size, and every request becomes more fragile than the data model actually requires.

The correct starting point is a directory that maps merchant_id to the specific funding accounts and shard owners that can answer the question. In PayLedger, the router already maintains this directory because writes need it during resharding. The read coordinator reuses the same metadata so a 64-shard cluster might only touch 5 shards for one merchant summary.

client
  -> summary-api coordinator
      -> account-directory: merchant 1842 -> shards [03, 11, 18, 27, 41]
      -> shard 03: local aggregate for accounts [A1, A4]
      -> shard 11: local aggregate for accounts [A2, A9]
      -> shard 18: local aggregate for accounts [A3, A5, A6]
      -> shard 27: local aggregate for accounts [A7, A8]
      -> shard 41: local aggregate for accounts [A10, A11, A12]
  <- merged merchant summary

This makes fan-out proportional to the actual ownership graph, not to the size of the cluster. That distinction matters because the expensive version of cross-shard querying is not "touching more than one shard." It is touching more shards than the access pattern truly requires.

Part 2: Mechanism

Once the coordinator knows the target set, the mechanics are straightforward but unforgiving. It issues shard-local work that is as small as possible, waits for required answers, and merges partials using an explicit consistency rule. The request path usually looks like this:

async def merchant_summary(merchant_id, as_of):
    route = account_directory.lookup(merchant_id)
    query = LocalSummaryQuery(merchant_id=merchant_id, as_of=as_of)

    partials = await scatter_gather(
        targets=route.shards,
        request=query,
        timeout_ms=75,
    )

    if partials.missing_required_shards:
        raise IncompleteSummary("exact summary unavailable")

    return merge_summary(partials)

The important detail is not the async code. It is what as_of and missing_required_shards mean. There are three common contracts:

  1. Exact and current. Every shard must answer at one shared snapshot boundary, or the query fails. This is appropriate for financial approval paths and expensive for interactive dashboards.
  2. Exact at a bounded staleness. The system chooses a read timestamp or a derived snapshot that is, say, no more than 15 seconds old. This gives consistent totals with better latency if the product can tolerate lag.
  3. Best-effort operational view. Partial results are allowed as long as the response marks itself incomplete. This can work for internal observability pages but is dangerous for business logic.

Fan-out cost comes from more than the number of shards. A useful mental accounting model is:

query cost ~= routing lookup
           + slowest required shard
           + merge/sort/aggregate work
           + bytes moved across the network
           + failure handling when one shard is slow or unavailable

Because the shard calls run in parallel, engineers often assume the total cost is approximately one shard read. In practice, it trends toward the slowest shard plus coordinator overhead, and the probability of hitting one slow or missing shard rises with every extra target. That is why local pushdown matters so much. Each shard should return a compact aggregate, top-k result, or filtered slice, not a raw row set that the coordinator has to re-scan in memory.

Resharding adds another correctness edge. If the directory says merchant 1842 lives on shards [03,11,18,27,41] but account A9 has just moved from 11 to 44, the coordinator can double count or miss data unless the directory lookup and shard ownership epoch are part of the query contract. Cross-shard reads are often blamed on query planners when the real bug is stale routing metadata.

Part 3: Implications and Trade-offs

For PayLedger, the merchant summary endpoint ended up with two modes. The manual payout override flow uses a small exact read against the specific funding accounts named in the request. It touches few shards, runs against primary-owned state, and fails closed if any required shard cannot provide the chosen snapshot. The support dashboard takes a different path: it reads a merchant_summary view refreshed from shard change streams every few seconds. That view is deliberately stale, but it removes repeated fan-out from the human-facing page and avoids turning every refresh into five cross-shard RPCs.

That split is the real trade-off. Exact interactive fan-out keeps the data closest to its source of truth, but it pays in tail latency, coordinator complexity, and lower end-to-end availability. Precomputed or incrementally maintained summaries make the request path cheap, but they move work into background pipelines and introduce freshness lag that the product must surface honestly. Neither path is universally correct. The right answer depends on whether the caller is making a money-moving decision, browsing an operations dashboard, or exporting a report.

This is also where cross-shard query design starts leaning into the next part of the track. Once teams realize that repeated distributed reads are too expensive on the request path, they often build batch or incremental pipelines to materialize the expensive views ahead of time. That is not an admission of failure. It is the normal consequence of making fan-out costs explicit, and it is the bridge into 025.md.

Failure Modes and Misconceptions

Connections

Resources

Key Takeaways

PREVIOUS Hotspot Mitigation Strategies NEXT Batch Processing and Throughput-Oriented Pipelines

← Back to Data Architecture and Platforms

← Back to Learning Hub