Real-Time Capital Flow Analysis Using Window Aggregation | by DolphinDB | MediumSitemapOpen in appSign up<br>Sign in
Medium Logo
Get app<br>Write
Search
Sign up<br>Sign in
Real-Time Capital Flow Analysis Using Window Aggregation
DolphinDB
4 min read·<br>Mar 13, 2026
Listen
Share
Press enter or click to view image in full size
In equity markets, understanding who is buying and how much they’re spending is just as important as knowing the price. Capital flow analysis — tracking the volume and value of buy and sell orders segmented by order size — is a widely used technique for gauging market sentiment and identifying potential price movements driven by institutional versus retail activity.<br>The challenge is doing this in real time, at scale, across hundreds of securities simultaneously. Batch processing end-of-day trade data is straightforward, but actionable insights require live computation as trades flow in tick by tick.<br>This post walks through a concrete implementation of real-time 1-minute capital flow calculation using DolphinDB’s time-series engine. We’ll define the capital flow metrics, implement the aggregation logic, and show how to configure the engine for three different windowing modes: rolling windows, sliding windows, and system-time-based windows.<br>Defining Capital Flow Metrics<br>Given stock trade data with fields like tradeTime, securityID, price, and qty, we define the following metrics for measuring capital flow:
Order size classification uses a threshold of 50,000 units:<br>Small order: qty ≤ 50,000<br>Large order: qty > 50,000<br>Buy/sell direction is determined by comparing buyNo and sellNo: if buyNo > sellNo, the trade is buyer-initiated (B); otherwise it’s seller-initiated (S).<br>Implementing the Aggregation Function<br>The capital flow calculation is encapsulated in a user-defined aggregation function calCapitalFlow, which can be plugged directly into DolphinDB's time-series engine as a window metric:<br>defg calCapitalFlow(buyNo, sellNo, qty, price){<br>smallBigBoundary = 50000<br>tempTable1 = select buyNo, sellNo, qty, price,<br>iif(buyNo>sellNo, `B, `S) as BSFlag, iif(buyNo>sellNo, `B, `S) as orderNo<br>from table(buyNo as `buyNo, sellNo as `sellNo, qty as `qty, price as `price)<br>tempTable2 = select sum(qty) as qty, sum(qty*price) as tradeAmount<br>from tempTable1 group by orderNo, BSFlag<br>buySmallAmount = exec sum(tradeAmount)<br>from tempTable2 where qtysmallBigBoundary && BSFlag==`B<br>sellSmallAmount = exec sum(tradeAmount)<br>from tempTable2 where qtysmallBigBoundary && BSFlag==`S<br>return nullFill([buySmallAmount, buyBigAmount, sellSmallAmount, sellBigAmount], 0)<br>}The function processes all trades within a given time window, classifies each trade by direction and size, aggregates trade amounts by group, and returns the four metrics as a vector. nullFill ensures that empty windows (e.g., no buy activity) return zero rather than null.<br>Setting Up the Streaming Engine<br>The pipeline consists of three components: an input stream table (trade feed), an output stream table (capital flow results), and a time-series engine that connects the two.<br>Step 1: Create Stream Tables<br>// Create input and output stream tables<br>share(table=streamTable(1:0, `tradeTime`securityID`price`qty`buyNo`sellNo,<br>[TIMESTAMP, SYMBOL, DOUBLE, LONG, LONG, LONG]), sharedName=`trade)<br>share(table=streamTable(1:0,<br>`tradeTime`securityID`buySmallAmount`buyBigAmount`sellSmallAmount`sellBigAmount,<br>[TIMESTAMP,SYMBOL,DOUBLE,DOUBLE,DOUBLE,DOUBLE]), sharedName=`capitalFlow)<br>goStep 2: Create the Time-Series Engine<br>DolphinDB’s createTimeSeriesEngine supports three windowing configurations, each suited to different use cases.<br>Option A: Rolling Windows Based on Event Time<br>A non-overlapping 1-minute window that closes and outputs results every 60 seconds of trade time. This is the standard choice for end-of-minute bar calculations:<br>createTimeSeriesEngine(<br>name="tradeTSAggr",<br>windowSize=60000,<br>step=60000, // step == windowSize → rolling (non-overlapping)<br>metrics=[],<br>dummyTable=trade,<br>outputTable=capitalFlow,<br>timeColumn="tradeTime",<br>useSystemTime=false,<br>keyColumn=`securityID<br>)Option B: Sliding Windows Based on Event Time<br>A 1-minute window that advances every 30 seconds, producing overlapping results. Useful for smoother, more responsive capital flow signals:<br>createTimeSeriesEngine(<br>name="tradeTSAggr",<br>windowSize=60000,<br>step=30000, // step ],<br>dummyTable=trade,<br>outputTable=capitalFlow,<br>timeColumn="tradeTime",<br>useSystemTime=false,<br>keyColumn=`securityID<br>)Option C: Rolling Windows Based on System Time<br>Windows are triggered by wall-clock time rather than trade timestamps. This is particularly useful when the incoming data stream may be delayed or irregular — results are output at fixed real-world intervals regardless of event time:<br>createTimeSeriesEngine(<br>name="tradeTSAggr",<br>windowSize=60000,<br>step=60000,<br>metrics=[],<br>dummyTable=trade,<br>outputTable=capitalFlow,<br>timeColumn="tradeTime",<br>useSystemTime=true, // triggers on system clock, not event time<br>keyColumn=`securityID<br>)Step 3: Subscribe to...