Consuming a real-time feed reliably

ingve1 pts0 comments

Consuming a real-time feed reliably

Consuming a real-time feed reliably<br>Posted on 2026-06-30:: architecture golang headcode kafka postgres

When you're ingesting a continuous external stream — live transit data, market ticks, IoT telemetry, whatever — the consumer's job and the job of interpreting what was consumed are tempting to bundle together. They shouldn't be. This is the pattern behind the Darwin Kafka ingestion in Headcode, my UK rail data API, though it generalises well beyond rail data: a dumb, fast consumer that lands raw messages in Postgres, and a separate process (not covered here) that interprets them later. This section covers just the consumer.Wiring: one process, two cooperating components<br>A single process runs a Kafka consumer and a Postgres writer side by side, connected by a channel:msgs := make(chan RawMessage, 1000)<br>saver := &Saver{Pool: pool}

consumer, err := kafka.NewConsumer(&kafka.Config{<br>Broker: cfg.Broker,<br>Topic: cfg.Topic,<br>Group: cfg.ConsumerGroup,<br>MessageChan: msgs,<br>ConsumeOffset: kgo.NewOffset().AtEnd(),<br>})

group := errgroup.WithCancelOnError(ctx)<br>group.Go(consumer.Run)<br>group.Go(func(ctx context.Context) error { return saver.Run(ctx, msgs) })The consumer decodes Kafka records into a lightweight RawMessage struct and pushes them onto a buffered channel. The saver drains that channel, batches what it finds, and writes it to Postgres. Neither side knows anything about the other's internals — the channel is the entire contract.(errgroup.WithCancelOnError here is a thin wrapper around the standard golang.org/x/sync/errgroup, not a function from the package itself. It's errgroup.WithContext(ctx) under the hood, which already cancels the group's context on the first non-nil error. kafka.NewConsumer/kafka.Config are likewise thin wrappers, this time from headcode, a Kafka setup package of mine. It's not stdlib or a third-party client, but it builds on real franz-go (kgo) types underneath, which is why ConsumeOffset takes a kgo.Offset directly.)ConsumeOffset: AtEnd() is worth calling out: a fresh consumer group starts at the live end of the topic, not the beginning. A real-time feed is a stream, not a bounded dataset — there's no value in replaying historical Kafka retention on first boot. Once the consumer group has committed an offset, restarts resume from there as normal.Per-partition fan-out<br>Rather than a single poll-and-process loop, the consumer follows a rebalance-callback model. When the consumer group assigns partitions, a dedicated goroutine spins up per partition:func (c *Consumer) onPartitionsAssigned(assigned map[string][]int32) {<br>for topic, partitions := range assigned {<br>for _, partition := range partitions {<br>worker := &partitionWorker{records: make(chan []Record, 10), out: c.out}<br>if c.workers[topic] == nil {<br>c.workers[topic] = make(map[int32]*partitionWorker)<br>c.workers[topic][partition] = worker<br>go worker.consume()<br>}The main poll loop just fetches and routes each batch to the right partition's channel:fetches.EachPartition(func(p FetchPartition) {<br>worker := c.workers[p.Topic][p.Partition]<br>worker.records p.Records<br>})This keeps partitions independent — a slow or stuck worker for one partition can't block the others — and it falls out naturally from how Kafka already parallelises a topic. When a rebalance takes a partition away, the corresponding goroutine is torn down via a quit channel.Unwrapping the envelope<br>Many real-time feeds don't put a clean JSON object on the wire — messages arrive wrapped in some transport envelope (SOAP, JMS-style headers, a custom framing format) with the useful fields nested a few levels down. Each partition worker unwraps this before anything else happens:func (w *partitionWorker) consume() {<br>for recs := range w.records {<br>for _, rec := range recs {<br>decoded, err := envelope.Unwrap(rec.Value)<br>if err != nil {<br>log.Warn("discarding undecodable record", "offset", rec.Offset, "err", err)<br>continue

w.out RawMessage{<br>ID: uuid.New(),<br>KafkaOffset: rec.Offset,<br>KafkaPartition: rec.Partition,<br>MessageKind: decoded.Kind,<br>Payload: decoded.Payload, // untouched<br>ReceivedAt: time.Now().UTC(),<br>}Three things to note: undecodable records are logged and dropped rather than killing the consumer (a malformed message shouldn't take down the feed); the payload itself is kept as raw bytes, not parsed into a domain struct here, since interpreting it is the next stage's problem; and the id is generated here, at construction time, rather than left to a Postgres default. That matters once a batch gets retried: the same RawMessage carries the same id on every retry attempt, so the primary key acts as a second line of defense alongside the unique index below. If id were instead generated by the database (DEFAULT gen_random_uuid()), a retried insert would get a fresh id each time and the primary key would do nothing to stop duplicates. The consumer's only opinions are about envelope metadata: what kind of message is this, where did it sit in the Kafka log.Batched,...

consumer kafka partition time topic group

Related Articles