How we made WINDOW JOIN parallel and vectorized

tosh1 pts0 comments

How we made WINDOW JOIN parallel and vectorized | QuestDB<br>New: QuestDB For AI Agents<br>New: QuestDB For AI Agents<br>Learn more

QuestDB is the open-source time-series database for demanding workloads—from trading floors to mission control.<br>It delivers ultra-low latency, high ingestion throughput, and a multi-tier storage engine.<br>Native support for Parquet and SQL keeps your data portable, AI-ready—no vendor lock-in.

Consider a workload that comes up constantly on a trading desk: for every<br>executed trade, attach the average bid and ask within a 1-second window<br>around the trade. Without a dedicated operator it takes two joins, an<br>ASOF JOIN for the carry-forward quote at<br>the window start plus a range join for the rows inside the window,<br>stitched with UNION ALL and folded with a GROUP BY:

-- QuestDB timestamps are microseconds, so 1_000_000 is 1 second.<br>WITH prevailing AS (<br>-- ASOF-match against the window start (trade timestamp - 1 s),<br>-- not the trade timestamp itself.<br>SELECT t.orig_ts ts, t.symbol, p.bid, p.ask<br>FROM (<br>(SELECT (timestamp - 1000000) AS ts, symbol,<br>timestamp AS orig_ts<br>FROM trades) TIMESTAMP(ts)<br>) t<br>ASOF JOIN prices p ON p.sym = t.symbol<br>),<br>in_window AS (<br>SELECT t.timestamp ts, t.symbol, p.bid, p.ask<br>FROM trades t<br>JOIN prices p ON p.sym = t.symbol<br>WHERE p.ts > t.timestamp - 1000000<br>AND p.ts<br>SELECT ts, symbol, avg(bid) avg_bid, avg(ask) avg_ask<br>FROM (SELECT * FROM prevailing UNION ALL SELECT * FROM in_window)<br>GROUP BY ts, symbol;

This works, but it's a lot of SQL for a simple operation. The<br>ASOF JOIN and the range JOIN walk the prices table independently<br>even though they are answering two halves of the same question, and the<br>range JOIN forces the planner to hash on sym and then re-filter every<br>matched pair against the BETWEEN predicate. The outer GROUP BY over<br>ts is a hash aggregation that has to materialize a row per<br>(ts, symbol) pair, which works out to 50 million groups in our test<br>data. There is nothing here for the optimizer to fuse, parallelize<br>cleanly, or vectorize.

WINDOW JOIN is QuestDB's dedicated<br>syntax for aggregating one table over a time window around each row of<br>another. The same query, dedicated operator:

SELECT t.*, avg(p.bid) avg_bid, avg(p.ask) avg_ask<br>FROM trades t<br>WINDOW JOIN prices p<br>ON p.sym = t.symbol<br>RANGE BETWEEN 1 second PRECEDING AND 1 second FOLLOWING;

Now the operator knows what it is doing: for every row on the left-hand<br>side of the join (LHS - trades here), find rows on the right-hand side<br>(RHS - prices) whose timestamp falls inside a [lo, hi] window around<br>the LHS timestamp, restrict to matching symbol keys, and reduce them with<br>a batch of aggregate functions.

Making that fast comes down to two pieces: data-level parallelism over<br>the LHS, plus a low-cardinality fast path that copies values into<br>contiguous buffers so the SIMD aggregation kernels we already ship for<br>SAMPLE BY run on window slices unchanged. Benchmarked against<br>Timescale, DuckDB, and ClickHouse on a 50M-row trades table joined to<br>a 150M-row prices table, the parallel + SIMD path runs 5.0x<br>faster than QuestDB's own single-threaded fallback and 25x faster<br>than ClickHouse's best rewrite.

Data-level parallelism

QuestDB stores data in append-only column files, partitioned by time. The<br>query engine reads them as a sequence of page frames: contiguous, columnar<br>slabs of memory that map directly onto file pages. Filtering and aggregation<br>both work at this granularity: a page frame is the unit of dispatch to a<br>worker thread.

WINDOW JOIN follows the same model. The LHS table is sliced into page<br>frames; each worker takes a frame and is responsible for producing the<br>aggregate result for every LHS row in that frame. To do that it needs the<br>RHS rows that fall inside the union of all windows the frame covers.

Concretely, for a frame whose LHS timestamps run from tLo to tHi with<br>a [-w_lo, +w_hi] window, the worker needs RHS rows in<br>[tLo - w_lo, tHi + w_hi]. Locating that slice cheaply is what makes the<br>parallel plan viable, and the enabler is QuestDB's storage layout: rows<br>in both tables are kept in<br>designated timestamp order on<br>disk, so the RHS slice for any time range collapses to a single binary<br>search per worker rather than a scan per LHS row.

Then, for the join keys present in the LHS frame, the worker builds a<br>small in-memory index from the RHS slice: a per-key list of RHS<br>timestamps, plus per-key arrays of the values to aggregate. Once that<br>index is built, the inner loop over LHS rows is just two binary searches<br>per row, one for the window's low bound and one for its high, followed by<br>an aggregate over the resulting contiguous range. Both binary searches walk<br>forward monotonically, so they amortize across rows in the same frame.

Roughly:

LHS page frames<br>┌─────┬─────┬─────┬─────┬─────┐<br>│ F0 │ F1 │ F2 │ F3 │ ... │<br>└──┬──┴──┬──┴──┬──┴──┬──┴─────┘<br>│ │ │ │<br>┌──────┴┐ ┌──┴───┐ │ │<br>│worker0│ │worker1│ ... │ workers pulled from a shared pool<br>└───┬───┘ └──┬────┘ │ one frame at a time<br>│ │ │<br>┌───────┴───────┐│...

window join from timestamp symbol questdb

Related Articles