Skip to content
Nazli Ander
TwitterGithubDEV

Realtime Analysis of Cryptocurrency Prices Using dbt, Materialize, Redpanda & Metabase

data-engineering, python, streaming-data, dbt4 min read

Materialize organized an Online Hack Day a while ago. And they provided a structured streaming setup using dbt (ETL framework), Redpanda (queue), Materialize (database) and Metabase (visualization).

The initial setup was using flight data with OpenSky API to aggregate flight information. I re-purposed the structured streaming setup to use cryptocurrency data in a real-time dashboard.

Without changing much of the setup, it was a great comfort to create a new producer via CoinRanking. Then I created two financial queries in dbt to answer the following questions in real-time:

  1. What is the difference of marketcap within the last 20 minutes per crypto currency?
  2. What are the most deviating cryptocurrencies within the last 20 minutes?
  3. What are the average prices per cryptocurrency within the last 20 minutes?

I connected the queries (materialized views in Materialize) to Metabase, to visualize my answers. Here is a screenshot from the resulting example dashboard in Metabase:

Dashboard Example / Metabase

In this small memorial write-up, I will try to summarize the pipeline for this analysis. I aim to show the easiness of creating a real-time financial analysis with it.

Pipeline Summary

The pipeline that was created by Materialize had the following chronological order, and I re-purposed the same structure by changing the producer:

  1. Ingest real-time data with a Python producer into RedPanda
  2. Create a source from RedPanda in Materialize using dbt (raw data ingestion)
  3. Create a staging view in Materialize to type-cast JSON fields (staging ingested data)
  4. Create a materialized view(s) in Materialize to produce real-time windowed aggregations
  5. Use Metabase to visualize data

The technologies relate to each other as the following diagram suggests:

Pipeline Setup Summary

The setup is initialized with a very diligent docker-compose file, created by @morsapaes. When we docker-compose up, we have the following services running on your local machine:

IMAGE COMMAND PORTS NAMES
sample_project_data-generator "python -u ./crypto.…" data-generator
sample_project_dbt "/bin/bash" 0.0.0.0:8002->8080/tcp dbt
metabase/metabase "/app/run_metabase.sh" 0.0.0.0:3030->3000/tcp metabase
docker.vectorized.io/vectorized/redpanda:v21.11.3 "/entrypoint.sh 'red…" 0.0.0.0:8081-8082->8081-8082/tcp, 0.0.0.0:9092->9092/tcp, 9644/tcp redpanda
materialize/materialized:v0.20.0 "tini -- materialize…" 0.0.0.0:6875->6875/tcp materialized

CoinRanking Producer in a Nutshell

To fetch real-time CoinRanking data into RedPanda, we need a small script. The script needs to read data from CoinRanking within periods. Then it needs to flush the read data into the RedPanda instance. Since RedPanda is Kafka compatible, that is quite straight forward with Python Kafka client.

Producer script basically reads data from the CoinRanking API, using a token. Then it picks up the required fields from the API data requested. Lastly it ingests the list of coin information into RedPanda within every 10 seconds. Here is the compact script:

import logging
import requests
import schedule
from kafka import KafkaProducer
COIN_PAGE = "https://api.coinranking.com/v2/coins"
CRYPTO_TOPIC = "crypto"
TOKEN = os.getenv(
"TOKEN",
".",
)
HEADERS = {"Authorization": f"access_token {TOKEN}"}
FREQUENCY_INGESTION = 10
def produce_list_of_coin_dict_into_queue(list_of_dict: list) -> None:
producer = KafkaProducer(bootstrap_servers="redpanda:9092")
for coin_with_model in list_of_dict:
try:
producer.send(
topic=CRYPTO_TOPIC,
value=dumps(coin_with_model).encode("utf-8"),
key=uuid.uuid4().hex.encode("utf-8"),
)
except Exception as e:
logging.error(f"The problem is: {e}!")
producer.flush()
def get_json_api(page: str) -> tuple:
# Request JSON coin data
def get_coin_model(coin: dict) -> dict:
# Define coing dict mapping
def coins_producer() -> None:
all_coins, _ = get_json_api(COIN_PAGE)
coins_with_model = get_all_coins_with_model(all_coins)
produce_list_of_coin_dict_into_queue(coins_with_model)
if __name__ == "__main__":
coins_producer()
schedule.every(FREQUENCY_INGESTION).seconds.do(coins_producer)
while True:
schedule.run_pending()
time.sleep(1)

By using this producer, within every 10 seconds ingest crypto data into RedPanda with a topic called crypto. We expect to asynchronously consume the ingested data from the topic crypto and use in our analyses.

RedPanda Source in Materialize

The project uses dbt adapter for Materialize. The adapter (dbt-materialize) is a Python package available on PyPI. This package allows us to use SQL + Jinja statements to efficiently transform streaming data and continuously update our data.

According to Materialize, the connected data sources are called source. Creating a source in Materialize is possible by introducing a queue connection with a simple DDL statement. Since we make use of dbt, this is as easy as using the following lines:

{{
config(
materialized='source',
tags=['crypto']
)
}}
{% set source_name %}
{{ mz_generate_name('rc_coins') }}
{% endset %}
CREATE SOURCE {{ source_name }}
FROM KAFKA BROKER 'redpanda:9092' TOPIC 'crypto'
KEY FORMAT BYTES
VALUE FORMAT BYTES
ENVELOPE UPSERT;

After defining the source, ingested data will be recognized by the database. However, we need to convert the string to the database encoding and JSON. This is required for column mapping, as our data is stored in JSON object literals. In the end we can map the column values by accessing and casting the object values:

{{
config(
materialized='view',
tags=['crypto']
)
}}
WITH converted_casted AS (
SELECT
CAST(CONVERT_FROM(data, 'utf8') AS jsonb) AS data
FROM {{ ref('rc_coins') }}
)
SELECT
(data->>'uuid')::string as uuid,
(data->>'name')::string as name,
(data->>'symbol')::string as symbol,
(data->>'btc_price')::numeric as btc_price,
(data->>'last_24h_volume')::numeric as last_24h_volume,
(data->>'marketcap')::numeric as marketcap,
(data->>'price')::double as price,
(data->>'timestamp')::timestamp as timestamp
FROM converted_casted

Materialized Views in Materialize

One of the great features of Materialize is their materialized view. This is a streaming computation of a SELECT query in incrementally updated materialized views.

As in every structured streaming framework, to provide a valid analysis we can benefit from defining a set of assumptions for windowing and lateness of data (grace periods). Currently, Materialize makes use of mz_logical_timestamp() function to define windowing and grace periods of data. The mz_logical_timestamp() function represents the current timestamp in milliseconds at the time of the query execution.

The questions require us to keep track of the last 20 minutes of data. Theoretically we need to analyze data in sliding windows. Following the nice explanation in Materialize documentation, using the mz_logical_timestamp() we can compute the answering queries of our crypto analysis for the past 20 minutes.

{{
config(
materialized ='materializedview',
tags=['crypto']
)
}}
{# 20 mins #}
{% set slide_threshold = '1200000' %}
WITH with_casted_insertion AS (
SELECT *, extract(epoch from timestamp) * 1000 AS inserted_at
FROM {{ ref('stg_crypto') }}
)
SELECT * FROM with_casted_insertion
WHERE TRUE
AND mz_logical_timestamp() >= inserted_at
AND mz_logical_timestamp() < inserted_at + {{ slide_threshold }}

In the producer we did not provide an insertion date in milliseconds. By using SQL statements we can convert the field timestamp into milliseconds and call this as inserted_at. This new field can be our benchmark for windowing.

The mz_logical_timestamp() and its usage is quite complicated. I suggest reading the nice documentation page from Materialize to see other use cases and their explanations.

On top of the materialized view created above we can write our analytics queries to expose in a dashboarding tool, such as Metabase. The analytics queries would use dbt functionalities as well, materialized as materializedview. You can find the resulting queries in the repository.

Resulting Lineage and Last Words

After obtaining the analytics queries, all we need to do is to run only once the crypto tagged models from dbt:

1dbt run -m tag:crypto

The output from dbt will be as:

Found 10 models, 0 tests, 0 snapshots, 0 analyses, 180 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics
Concurrency: 1 threads (target='dev')
1 of 5 START source model public.rc_coins....................................... [RUN]
1 of 5 OK created source model public.rc_coins.................................. [CREATE SOURCE in 0.08s]
2 of 5 START view model public.stg_crypto....................................... [RUN]
2 of 5 OK created view model public.stg_crypto.................................. [CREATE VIEW in 0.05s]
3 of 5 START materializedview model public.fct_crypto_sliding_window............ [RUN]
3 of 5 OK created materializedview model public.fct_crypto_sliding_window....... [CREATE VIEW in 0.06s]
4 of 5 START materializedview model public.marketcap_changes.................... [RUN]
4 of 5 OK created materializedview model public.marketcap_changes............... [CREATE VIEW in 0.06s]
5 of 5 START materializedview model public.volatile_cryptos..................... [RUN]
5 of 5 OK created materializedview model public.volatile_cryptos................ [CREATE VIEW in 0.05s]
Finished running 1 source model, 1 view model, 3 materializedview models in 0.38s.
Completed successfully
Done. PASS=5 WARN=0 ERROR=0 SKIP=0 TOTAL=5

The resulting dbt lineage diagram would be as follows:

dbt Lineage

Real-time analytics is used in getting immediate insights for financial data, news sources, social media reactions and healthcare. In most of the cases SQL is enough for obtaining basic insights in such systems. Thus, SQL dbt in combination with Materialize improve the efficiency of developing real-time analytics pipelines. The setup that was provided in the Hackday was a great example of this.

The biggest challenge after this point would be the maintenance of these pipelines. Luckily, there is sufficient information on integrating both RedPanda and Materialize with monitoring tools. As a next reading tip, I would nerdly suggest their documentation pages:

  1. RedPanda to Prometheus metrics
  1. Materialize monitoring documentation page