The disArray-ed File Problem: Building Incremental Data Pipelines for Out-of-Order Scientific Data · bisArray
Introduction
We at bisArray hate to admit it, but the real world can be rather disArray-ed.
The general use case we would like to discuss in this article is telemetry — something in the lab or the factory floor produces data as a function of time, and we need to capture it and provide insights. Maybe this is a lab sensor, or a glove box, or a battery cycler, or a production tool, or readout from an antibody fermenter — whatever the subject is, it is producing data as a function of time. Straightforward enough, but what if there is a lot of data from a lot of channels? Like 1,000 channels and 100M rows a day? What if we need to take global values from this data? Like say the full equivalent cycles our battery under test has been cycling? And the real kicker, what if that data arrives out of order, i.e. it is disArrayed. Say we are integrating the running total of parts in and out of spec for a production tool which produces a file summary every 4 hr, the tool was offline for two days in the middle of the month and a worker uploads the files to our pipeline weeks out of sequence. We need to be able to handle that.
We see this problem in many of the instrument-heavy systems we help build and modernize:<br>out-of-order arrival, at-least-once delivery, and scale that makes "just reprocess<br>everything" impossible. On one acquisition system, roughly 38% of files arrived out of<br>event-time order under normal operation, with a long tail of stragglers landing up to 40<br>minutes after the data they contained was measured. At a few thousand files an hour and<br>billions of resulting rows per month, the pipeline cannot re-derive those global metrics<br>from scratch; it has to repair itself incrementally.
The architecture at a glance
For today's discussion lets make some concrete assumptions about what we are doing and describe the architecture in those terms. We think of the architecture in two steps:
Step 1 Files from local computers are centrally enqueued; and<br>Step 2 Files are pulled from the queue and processed.
To put this in highly specific terms, assume:
The use case is a factory floor with twenty process tools each of which makes a disposable surgical tool for cataract surgery. The tool is automated, PLC driven but has an online HMI for doing manual things like entering work orders, pulling configuration from a central database and pushing pass/fail criteria into several industrial vision cameras. (We wrote exactly such an HMI that has been in use for years!) Let's call the tools Tool01 ... Tool20 ;
The HMI for ToolXX produces a report on the work order, that summarizes the run, parts made, parts failed, mispicks, that kind of thing. The report is a file continuously appended and is submitted daily and at the end of the work order run; and
The files are placed into a central location on the factory's internal network, but temporal order is not guaranteed! A data agent — a locally running python script — watches for file changes and when a file changes pushes information about that file into an air-gapped RabbitMQ queue running locally on the network. Crucially, along with the file location (source_uri) there needs to be a mechanism for deducing which ToolXX generated the file. We have done this many ways. Using the folder structure, adding the tool number to the file name, or parsing it out of the file are three such approaches. This is the subject_key we will see later in our database discussion. The data agent might add other information as well: perhaps its version, a timestamp or similar. This is all wrapped up into a metadata object which is likewise part of the message placed on the queue;
Step 1: File Notifications Become Queue MessagesA data agent watches semi-ordered report files, resolves the subject, and places normalized work messages on RabbitMQ.Process ToolsTool01Tool02...20PLC-driven HMIs write run reportsFactory Network Share\\share\Tool07\WO123.csvFiles arrive or change out of orderData AgentNormalize file notificationsource_urimetadatasubject_key = Tool07Folder, filenameor file parser can resolve subject_keyRabbitMQ Queuesource_uri, subject_key,metadataQueue message is small;file remains on storageImportant boundaryThe data agent does not process filesIt only identifies and enqueues them..
Step 1: files land on the network share; the data agent detects each change and enqueues the file (with its subject_key and metadata) onto RabbitMQ.
That's the first part. Files are placed in a semi-automated way into a filesystem which is monitored by a data agent. The data agent sees the files as arrived or changed then pushes a message with relevant metadata to a queue. Now for the second part of our architecture:
Step 2: Queue Messages Become Deterministic WorkRabbitMQ wakes the system up; PostgreSQL owns correctness through queue rows, subject locks, and worker claims.RabbitMQNotification...