Hey you right there on the other side of the display! I congratulate you for being chosen by the Algorithm to view this particular blog post, because today I am going to rumble like thunder about how you can make your data flow through your systems as smoothly as silk and and as fast as lightning!

Refering to my previous blog posts that discussed different database solutions to store and access huge amounts of data in databases without choking them, let’s touch the other side of the coin now.

Who needs a lubricated engine?

It is not helpful helpful to access your data easily if you can’t make it smoothly flow to every place where it is needed. Just as any combustion engine needs to be lubricated with the best synthetic oils to work properly and smoothly, the same way every business system needs to be lubricated as well in order to serve as a relieable cash cow.

When something is changed in a database, we want that change to timely propagate from the database into all systems that depend on that data. Tuning organization’s systems to communicate with each other with minimal delays is good for almost any business, regardless of which business we talk - banking, finance, insurance, just name it!

The problem

In our imaginary use case, let me talk about a laundry service corporation that operates a cluster of laundry machines. To be able to serve their customers well, the laundry machines need to be monitored. Let’s say that each laundry machine generates a continuous flow of log data, reporting their current cloth load, laundry program, heat and power consumption. We want to store all the data into a huge database. Every time when a log entry is written to the database, we want it to immediately propagate to the browser based user interface that the the staff then monitors. We want the data flow to be event-driven. Each client - or consumer - node of the data flow in the system should not be doing expensive continuous short-polling to its provider counterpart. Rather, the provider part should be pushing the changes to the consumer part.

So what to do?

The solution

Clearly, there are at least two interface sections where data needs to jump from one system to another system. Closest to the human user is the browser based UI that retrieves data from the server endpoint. And then the server endpoint needs to retrieve the data from the database upon a change. So there are two intersections that need an event driven data transfer solution. Let’s have a look at it!

Web sockets for user interfaces

To implement an even driven architecture for the UI, we need to be writing a browser-based user interface application that listens for data changes provided by its web server endpoint. Therefore the API to serve the UI cannot be a classic REST API, since a REST API supports only query-response based scenarios. When you are implementing an API that serves your UI, we are talking about a backend for frontend, or BFF.

This API can’t be a universal or generic API, it must not be designed to potentially serve many different kinds of clients. It’s a dedicated single purpose API to serve the one and only UI - its endpoints are optimized for that particular UI. When you want it to behave in an event-driven fashion and immediately push each laundry log entry to the UI once it enters to the endpoint from the database, web sockets are the best way implement it!

Use Kafka and web sockets to write two-way event driven server endpoint

Now let’s write an event driven backend endpoint that is event driven in both ways - it gets the data event in an event driven way from the database and further on pushes that data to the UI as an event as well. So two-fold event-driven! Let’s write this magic endpoint with TypeScript and NodeJS right now. Let us make it subscribe to Kafka to listen for data events. When an event enters the endpoint, the endpoint just calmly pushes it further over a web socket to the UI:

import express, { Express } from 'express';
import * as http from 'http';
import * as WebSocket from 'ws';

const app: Express = express();

const socketServer = http.createServer(app);
const wss = new WebSocket.Server({ server: socketServer });

wss.on('connection', (ws: WebSocket) => {
  // connection is up, let's make the web socket to listen for Kafka events
  LaundryEventKafkaConsumer.addEventListener((data) => {
    console.log("got a log event from laundry service", data);
    ws.send(data.value);
  })
});

socketServer.listen(process.env.SOCKET_PORT, () => {
  console.log(`Server started on port ${process.env.SOCKET_PORT} :)`);
});

Quite mind-blowing how little code we needed for this, isn’t it? Alright folks, stay tuned, next, let’s have a look at my secretive LaundryEventKafkaConsumer that listens to Kafka service which in turn delivers laundry machine log entries from the deep databases of our mighty Laundry service corporation:

import { Consumer, Kafka } from 'kafkajs'

export namespace LaundryEventConsumer {

  // Define Kafka client:
  const kafka: Kafka = new Kafka({
    clientId: 'laundry-app',
    brokers: ['localhost:9092'],
  })

  // Structure of event call back functions:
  type EventCallback = (event: any) => void

  // Kafka consumer to listen for Kafka events:
  const consumer: Consumer = kafka.consumer({ groupId: 'laundry-group' })

  // Array of callbacks to be called upon a Kafka event:
  const eventCallbacks: EventCallback[] = []

  // Providing a way to add a new call back to the consumer:
  export const addEventCallback = (callback: EventCallback) => {
    eventCallbacks.push(callback)
  }

  // Invoke this to connect to Kafka and start listening:
  export const run = async () => {

    // Connect Kafka consumer to Kafka
    await consumer.connect();

    // Subscribe to laundry machine log topic:
    await consumer.subscribe({
      topic: 'laundry-machine-logs',
      fromBeginning: true,
    })

    // Finally, start the consumer:
    await consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        const event = {
          partition,
          offset: message.offset,
          value: message.value.toString(),
        }
        // Broadcast the received Kafka event to all listeners:
        eventCallbacks.forEach((callback: EventCallback) => {
          callback(event)
        })
      },
    })
  }
}

That would it be this time folks, thanks for reading!