How to use FastAPI with Materialize for real-time data processing

How to use FastAPI with Materialize for real-time data processing

Written by Bobby Iliev on Feb 13th, 2022 Views Report Post

Introduction

[!WARNING] This demo includes examples for an unsupported version of Materialize (0.26.x).

This is a self-contained demo of FastAPI and Materialize.

This demo project contains the following components:

Diagram

FastAPI and Materialize Demo

Running the demo

Clone the repository:

git clone https://github.com/bobbyiliev/materialize-tutorials.git

Access the FastAPI demo project directory:

cd materialize-tutorials
git checkout lts
cd mz-fastapi-demo

Pull all Docker images:

docker-compose pull

Build the project:

docker-compose build

Finally, run all containers:

docker-compose up

Create the Materialize sources and views

Once the demo is running, you can create the Materialize sources and views.

Let's start by creating a Redpanda/Kafka source:

CREATE SOURCE sensors
FROM KAFKA BROKER 'redpanda:9092' TOPIC 'sensors'
FORMAT BYTES;

Then create a non-materialized view which you can think of essentially an alias that we will use to create our materialized views. The non-materialized views do not store the results of the query:

CREATE VIEW sensors_data AS
    SELECT
        *
    FROM (
        SELECT
            (data->>'id')::int AS id,
            (data->>'pm25')::double AS pm25,
            (data->>'pm10')::double AS pm10,
            (data->>'geo_lat')::double AS geo_lat,
            (data->>'geo_lon')::double AS geo_lon,
            (data->>'timestamp')::double AS timestamp
        FROM (
            SELECT CAST(data AS jsonb) AS data
            FROM (
                SELECT convert_from(data, 'utf8') AS data
                FROM sensors
            )
        )
    );

After that, create a materialized view that will hold all records in the last 10 minutes:

CREATE MATERIALIZED VIEW sensors_view AS
    SELECT
        *
    FROM sensors_data
    WHERE
        mz_logical_timestamp() < (timestamp*1000 + 100000)::numeric;

Note that we are using the mz_logical_timestamp() function rather than the now() function. This is because in Materialize now() doesn’t represent the system time, as it does in most systems; it represents the time with timezone when the query was executed. It cannot be used when creating views. For more information, see the documentation here.

Next, let's create materialized view that will only include data from the last second so we can see the dataflow and use it for our Server-Sent Events (SSE) demo later on:

CREATE MATERIALIZED VIEW sensors_view_1s AS
    SELECT
        *
    FROM sensors_data
    WHERE
        mz_logical_timestamp() < (timestamp*1000 + 6000)::numeric;

With that our materialized views are ready and we can visit the FastAPI demo project in the browser!

FastAPI Demo

Finally, visit the FastAPI demo app via your browser:

  • Endpoint for all records in the last 10 minutes:

http://localhost/sensors

  • SSE Endpoint streaming the latest records as they are generated using TAIL:

http://localhost/stream

Example response:

SSE FastAPI with Materialize

Materialize Cloud

If you want to run the demo on the cloud, you would need the following:

  • A publicly accessible Redpanda/Kafka instance so that you can connect to it.
  • A Materialize Cloud account. You can sign up for a free Materialize Cloud account to get started with Materialize Cloud.

If you already have that setup, you would need to make the following changes to the demo project:

  • When creating the source, change the redpanda:9092 to your Redpanda/Kafka instance:
CREATE SOURCE sensors
FROM KAFKA BROKER 'your_redpanda_instance:9092' TOPIC 'sensors'
FORMAT BYTES;
  • Change the DATABASE_URL environment variable to your Materialize Cloud database URL and uncomment the certificate-specific environment variables in the docker-compose.yml file. in the docker-compose.yml file.

  • Download the Materialize instance certificate files from your Materialize Cloud dashboard.

Stop the demo

To stop the demo, run the following command:

docker-compose down -v

You can also stop only the data generation container:

docker-compose stop datagen

Materialize Logo

Comments (0)