Stop Waiting for Insights: Building Streaming Analytics with DolphinDB | by DolphinDB | MediumSitemapOpen in appSign up<br>Sign in
Medium Logo
Get app<br>Write
Search
Sign up<br>Sign in
Stop Waiting for Insights: Building Streaming Analytics with DolphinDB
DolphinDB
10 min read·<br>Jan 27, 2026
Listen
Share
Press enter or click to view image in full size
Real-time data processing lies at the heart of modern trading systems, IoT platforms, and operational intelligence pipelines. DolphinDB addresses these needs through its Streaming Engines — encapsulated computation units designed for low-latency analytics over continuous data flows.<br>In this post, we explore DolphinDB’s streaming engine architecture and walk through practical examples, including:<br>Window aggregation for capital flow analysis<br>Stateful calculations such as price rate of change (ROC)<br>Cross-sectional rankings<br>Streaming pipelines<br>Real-time joins<br>Complex event processing for trading strategies<br>What Are Streaming Engines?<br>Streaming engines are independent computing components that subscribe to stream tables, perform real-time calculations, and publish results to downstream tables or engines.<br>DolphinDB provides more than a dozen built-in engines, enabling flexible real-time processing patterns across financial markets, energy systems, and IoT workloads.
Computing engines are further divided into in-group and cross-sectional computation:<br>In-group time series computing: Data is grouped and processed in one of three ways:<br>Row-by-row computation<br>Aggregation over windows<br>Anomaly detection within each group<br>Cross-sectional computing: Selects the latest record per group for cross-sectional computation.<br>Joining engines, similar to SQL JOIN operations, are designed for real-time correlation between two tables. The left table is always a stream table, while the right table can be either a stream or static table:<br>Stream + stream join: Values are matched exactly or inexactly based on time series relationships.<br>Stream + static join: The stream table is dynamically joined with a snapshot of the right table, which can be either a stream or static table.<br>Complex event processing engines are tailored for advanced processing over complex events:<br>Order Book Engine: Maintains and updates order book with predefined rules from trade and order data.<br>CEP Engine: Monitors events from multiple sources and processes events with specified patterns to extract information.<br>The following sections demonstrate these streaming features with use cases.<br>Window Aggregation: 1-Minute Capital Flow<br>Given stock trade data with fields like tradeTime, securityID, price, and qty, we define the following metrics for measuring capital flow:<br>Press enter or click to view image in full size
A trade is classified as small if quantity ≤ 50,000 and big otherwise.<br>These factors can be expressed as follows in DolphinDB:<br>defg calCapitalFlow(buyNo, sellNo, qty, price){<br>smallBigBoundary = 50000<br>tempTable1 = select buyNo, sellNo, qty, price,<br>iif(buyNo>sellNo, `B, `S) as BSFlag, iif(buyNo>sellNo, `B, `S) as orderNo<br>from table(buyNo as `buyNo, sellNo as `sellNo, qty as `qty, price as `price)<br>tempTable2 = select sum(qty) as qty, sum(qty*price) as tradeAmount<br>from tempTable1 group by orderNo, BSFlag<br>buySmallAmount = exec sum(tradeAmount)<br>from tempTable2 where qtysmallBigBoundary && BSFlag==`B<br>sellSmallAmount = exec sum(tradeAmount)<br>from tempTable2 where qtysmallBigBoundary && BSFlag==`S<br>return nullFill([buySmallAmount, buyBigAmount, sellSmallAmount, sellBigAmount], 0)<br>}This function becomes the metric inside a time-series engine with rolling or sliding windows:<br>// Create input and output stream tables<br>share(table=streamTable(1:0, `tradeTime`securityID`price`qty`buyNo`sellNo,<br>[TIMESTAMP, SYMBOL, DOUBLE, LONG, LONG, LONG]), sharedName=`trade)<br>share(table=streamTable(1:0,<br>`tradeTime`securityID`buySmallAmount`buyBigAmount`sellSmallAmount`sellBigAmount,<br>[TIMESTAMP,SYMBOL,DOUBLE,DOUBLE,DOUBLE,DOUBLE]), sharedName=`capitalFlow)<br>go<br>// Create time-series engine<br>createTimeSeriesEngine(name="tradeTSAggr", windowSize=60000, step=60000,<br>metrics=[], dummyTable=trade,<br>outputTable=capitalFlow, timeColumn="tradeTime", useSystemTime=false, keyColumn=`securityID)<br>// Subscribe to table trade<br>subscribeTable(tableName="trade", actionName="tradeTSAggr", offset=-1,<br>handler=getStreamEngine("tradeTSAggr"), msgAsTable=true)Modify the engine definition for calculation with sliding windows based on tradeTime:<br>createTimeSeriesEngine(name="tradeTSAggr", windowSize=60000, step=30000,<br>metrics=[], dummyTable=trade,<br>outputTable=capitalFlow, timeColumn="tradeTime", useSystemTime=false, keyColumn=`securityID)Modify the engine definition for calculation with sliding windows based on system time:<br>createTimeSeriesEngine(name="tradeTSAggr", windowSize=60000, step=60000,<br>metrics=[], dummyTable=trade,<br>outputTable=capitalFlow, timeColumn="tradeTime", useSystemTime=true, keyColumn=`securityID)Stateful Calculation: Price Rate of Change (ROC)<br>Many...