Scaling As-of Joins
[menu]
Back to BlogDaft now offers native ASOF joins, an operation essential for aligning time series data, such as sensor streams that tick at different rates. In this post, I'll walk through how we implemented ASOF joins, made them 6x faster, and cut memory usage in half. We'll dive into the three optimizations behind the wins (hash grouping, binary search, and multi-threaded parallelism) and show how the same architecture scales out to a distributed cluster.
But first, what is an ASOF join?
ASOF joins link two datasets by matching each timestamp in one table with the most recent timestamp in another table. In the example below, data collected in the left and right table do not align perfectly, and an ASOF join simply tries to answer the question "for each video frame, what was the latest joint angle & gripper state of our robot?"
How to perform an ASOF join
To perform an ASOF join in Daft, use the .join_asof() method.
on: the column to match on (usually a timestamp).
by: the column to partition by, so rows only match within the same entity (e.g. same robot_id).
import daft
frames = daft.from_pydict(<br>"ts": [2, 5, 8],<br>"robot_id": ["arm_001", "arm_001", "arm_002"],<br>"frame_id": [1, 2, 3],
telemetry = daft.from_pydict(<br>"ts": [1, 4, 8],<br>"robot_id": ["arm_001", "arm_001", "arm_002"],<br>"joint_angle": [10.0, 20.0, 30.0],<br>"gripper": ["open", "closed", "open"],
result = frames.join_asof(telemetry, on="ts", by="robot_id", strategy="backward")<br>result.show()
tsrobot_idframe_idjoint_anglegripper2arm_001110open5arm_001220closed8arm_002330open<br>Building V1: Sort + two-pointers
Our first implementation of an ASOF join was a sorted two-pointer search.
Sort the left and right tables by a composite (by, on) key. This groups rows by entity and orders them by timestamp within each entity.
Walk both tables with two pointers. For each left row, advance the right pointer through rows of the same by key as long as right.ts . The last such right row is the match.
Benchmarking V1
We benchmarked this initial implementation against pandas on a synthetic workload (10M left rows, 100M right rows) and discovered that were 3.75x slower.
system scale status med_time_s med_peak_gb<br>pandas medium ok 35.53 9.08
V1<br>daft_native medium ok 133.38 10.28
Profiling the run revealed to us that we were bottlenecked by our string comparisons. While integer comparison takes a single CPU cycle, comparing strings involved an iteration through memory character by character. Furthermore, ASOF joins allow users to specify multiple "by" keys, each requiring a computationally expensive string-matching operation.
This broke our assumption that a simple sort and two-pointer approach would easily work, even though it was O(M log M + N log N) (where M and N represent the number of rows in your left and right table respectively). Big O tells you how your algorithm scales, but not how much each operation costs.
Building V2: Parallel hash bucketing + two pointers
Given that the bottleneck was caused by a multi-key string comparison during sorting, we wanted our new implementation to elide this completely. Here's what we came up with:
Hash the left & right tables by their "by" keys to group rows belonging to the same entity together
Parallelize the sorting of each entity's left and right table by their "on" keys
Do the same two-pointer approach within each entity
Previously, every comparison made while sorting included expensive string columns, potentially multiple of them. By switching to hash grouping, we had a much cheaper way to partition our data. Within each group, we'd now only need to sort by the "on" key, which was a single (typically integer) column. Not only is each comparison cheaper, since we had already grouped the tables by their "by" key, we could parallelize the sorting of each group across cores, giving us a significant performance increase.
system scale status med_time_s med_peak_gb<br>pandas medium ok 35.53 9.08
V1<br>daft_native medium ok 133.38 10.28
V2<br>daft_native medium ok 36.53 10.58
Building V3: Streaming ASOF Joins
Still, V2's parallelism model had a fundamental flaw. It was based on the number of "by" key groups, which meant that parallelism was dependent on your data cardinality. If you had a 32-core machine but only 2 "by" keys, you'd only ever use 2 cores while the other 30 sit idle.
Furthermore, our previous approach required us to materialize both the left and right table before doing any computation. We weren't leveraging the streaming nature of the Daft engine, which would have allowed us to process batches of the right table incrementally. We decided to take a different approach for V3:
The Build Phase
Materialize all rows from the left table
Group rows of the same "by" key together and sort each group by their "on" key
Build a hash map over the groups, enabling O(1) lookups
The Probe Phase
The right table streamed into the Daft engine in equal sized batches. Each batch is treated...