Build a Real-Time E-Commerce Analytics API from Kafka in 15 Minutes
This guide details building a real-time e-commerce analytics API from Kafka streams using Tinybird. Learn data ingestion, enrichment with dimension tables and PostgreSQL, materialized views for fast aggregations, and exposing API endpoints for comprehensive, real-time insights.
The article provides a detailed guide on building a production-ready real-time e-commerce analytics API from Kafka streams using Tinybird. It covers data ingestion, enrichment with dimension tables and PostgreSQL, materialized views for fast aggregations, and exposing data through multiple API endpoints.
What You'll Build
By the end of this tutorial, you will have developed:
- A serverless Kafka connector for real-time ingestion of order events.
- A basic API endpoint to query raw Kafka data.
- Enriched data using dimension tables for products and customers.
- PostgreSQL enrichment for product catalog data.
- Materialized Views for pre-aggregated metrics.
- Advanced API endpoints for real-time analytics.
Architecture Overview
The data pipeline is designed to be straightforward yet powerful. Events from Kafka are immediately queryable via API endpoints. The pipeline is then enhanced by enriching events with reference data from dimension tables and PostgreSQL. Subsequently, metrics are pre-aggregated in Materialized Views for rapid querying, and finally, served through low-latency API endpoints.

Prerequisites
Before you begin, ensure you have:
- A Tinybird account (sign up free).
- The Tinybird CLI installed (installation guide).
- Access to a Kafka cluster (Confluent Cloud, AWS MSK, or self-hosted).
- A PostgreSQL database (optional, for the enrichment example).
- Basic familiarity with SQL.
Part 1: Kafka to API - Get Started in 5 Minutes
This section focuses on connecting to Kafka and creating a basic API endpoint to query raw data efficiently.
Step 1: Connect Kafka to Your Analytics Pipeline
First, establish a Kafka connection. This can be done via the CLI or by manually creating a connection file.
- Option 1: Using the CLI (Recommended)
Run the interactive command:
tb connection create kafka
```
You will be prompted to provide:
* Connection name (e.g., ecommerce_kafka)
* Bootstrap server (e.g., pkc-xxxxx.us-east-1.aws.confluent.cloud:9092)
* Kafka key (API key for Confluent Cloud, or username)
* Kafka secret (API secret for Confluent Cloud, or password)
The CLI will generate a .connection file for you.
Throughout this guide, `tb --cloud` is used to deploy to Tinybird Cloud, and `tb` (without the flag) for local Tinybird environments. Use `tb --cloud deploy` for production deployment and `tb deploy` for local testing. This applies to `secrets`, `append`, `connection`, etc.
- Option 2: Manual Connection File
Alternatively, create a connection file manually:
TYPE kafka
KAFKA_BOOTSTRAP_SERVERS pkc-xxxxx.us-east-1.aws.confluent.cloud:9092
KAFKA_SECURITY_PROTOCOL SASL_SSL
KAFKA_SASL_MECHANISM PLAIN
KAFKA_KEY {{ tb_secret("KAFKA_KEY") }}
KAFKA_SECRET {{ tb_secret("KAFKA_SECRET") }}
Set your secrets:
tb secret set KAFKA_KEY your_kafka_key
tb secret set KAFKA_SECRET your_kafka_secret
```
-
Validate the Connection Kafka connections involve various settings (bootstrap servers, security protocols, SASL mechanisms, SSL configurations), and client libraries can sometimes have opaque error handling. Previewing your data and having a troubleshooting guide is crucial before proceeding.
Test your connection:
tb connection data kafka_ecommerce
This command validates connectivity, authentication, and previews messages from your topics. You should see output similar to:
✓ Connection validated successfully
✓ Found 3 topics
- orders (12 partitions)
- payments (8 partitions)
- shipping (6 partitions)
✓ Sample message from 'orders' topic:
{"order_id": "ord_12345", "customer_id": "cust_67890", ...}
If errors occur, verify: * The bootstrap server address is correct and reachable. * Credentials are properly set using `tb secret list`. * Network connectivity (firewall rules, security groups). * The Kafka cluster is running and accessible. For detailed troubleshooting, refer to the [Kafka connector troubleshooting guide](Kafka%20connector%20troubleshooting%20guide).
Step 2: Create Kafka Data Source for Real-Time Ingestion
Next, create a Data Source that consumes from your Kafka topic.
- Using the CLI
tb datasource create --kafka ``` This interactive command will guide you to: * Select your Kafka connection. * List available topics. * Preview message structure. * Automatically generate a schema.
- Manual Data Source Creation
Alternatively, create the Data Source file manually. Here's an example for order events:
SCHEMA >
order_id String json:$.order_id,
customer_id String json:$.customer_id,
product_id Int32 json:$.product_id,
quantity Int32 json:$.quantity,
price Decimal(10, 2) json:$.price,
order_total Decimal(10, 2) json:$.order_total,
timestamp DateTime json:$.timestamp,
payment_method LowCardinality(String) json:$.payment_method,
shipping_address String json:$.shipping_address
ENGINE MergeTree ENGINE_PARTITION_KEY toYYYYMM(timestamp) ENGINE_SORTING_KEY timestamp, order_id
KAFKA_CONNECTION_NAME kafka_ecommerce
KAFKA_TOPIC orders
KAFKA_GROUP_ID {{ tb_secret("KAFKA_GROUP_ID", "ecommerce_consumer") }}
KAFKA_AUTO_OFFSET_RESET latest
```
The KAFKA_GROUP_ID uniquely identifies your consumer group. Each combination of KAFKA_TOPIC and KAFKA_GROUP_ID can only be used in one Data Source to ensure correct offset tracking and prevent conflicts.
**Best practices:**
* Use unique group IDs per Data Source when consuming from the same topic.
* Use environment-specific group IDs to isolate consumers and their committed offsets:
* Local: `"ecommerce_consumer_local"` (or default `"ecommerce_consumer"`)
* Staging: `"ecommerce_consumer_staging"`
* Production: `"ecommerce_consumer_prod"`
You can manage group IDs using secrets with defaults:
```
Set different group IDs for each environment
tb secret set KAFKA_GROUP_ID ecommerce_consumer_local # Local
tb --cloud --host <STAGING_HOST> secret set KAFKA_GROUP_ID ecommerce_consumer_staging # Staging
tb --cloud --host <PROD_HOST> secret set KAFKA_GROUP_ID ecommerce_consumer_prod # Production
```
The {{ tb_secret("KAFKA_GROUP_ID", "ecommerce_consumer") }} syntax utilizes the secret value if set, or defaults to "ecommerce_consumer" if not. This facilitates using different group IDs across environments without modifying Data Source files. For more details on managing consumer groups, refer to the CI/CD and version control guide.
**Schema optimization notes:**
* **Decimal types**: `Decimal(10, 2)` is used for `price` and `order_total` to ensure precise financial calculations, avoiding floating-point rounding errors.
* **LowCardinality**: `payment_method` uses `LowCardinality(String)` because it typically has a limited number of unique values (e.g., credit_card, debit_card, paypal), which optimizes storage and query performance.
* **Sorting key**: The sorting key `(timestamp, order_id)` is optimized for time series queries. Columns frequently used for filtering, such as `timestamp` for time range queries and `order_id` for specific order lookups, should always be included in the sorting key.
* **Partitioning**: Monthly partitioning by `toYYYYMM(timestamp)` keeps partitions manageable and enables efficient data management.
**Sample Kafka Message**
Your Kafka messages should conform to this structure:
```json
{ "order_id": "ord_12345", "customer_id": "cust_67890", "product_id": 42, "quantity": 2, "price": 29.99, "order_total": 59.98, "timestamp": "2025-01-27T10:30:00Z", "payment_method": "credit_card", "shipping_address": "123 Main St, New York, NY 10001" } ```
- Verify Data Ingestion
Before deployment, test locally:
Deploy to local environment (Tinybird Local)
tb deploy
Check if data is flowing
tb sql "SELECT count(*) FROM orders_kafka"
Once verified locally, deploy to the cloud:
Deploy to Tinybird Cloud
tb --cloud deploy
```
Remember: tb deploy targets your local Tinybird environment, while tb --cloud deploy targets Tinybird Cloud. Use local deployment for testing and cloud deployment for production.
After deployment, confirm ingestion:
```sql
-- Check recent orders
SELECT
count(*) as total_orders,
min(timestamp) as first_order,
max(timestamp) as last_order
FROM orders_kafka
WHERE timestamp > now() - INTERVAL 1 hour
```
Data will flow into Tinybird as soon as messages arrive in your Kafka topic. Monitor ingestion in the Tinybird UI or query the data directly. For comprehensive monitoring of your Kafka connector (consumer lag, throughput, errors), consult the Kafka monitoring guide. You can also query the tinybird.kafka_ops_log Service Data Source for real-time diagnostics.
Step 3: Create Your First API Endpoint
Now, create a simple API endpoint to query your raw Kafka data, demonstrating the ease of exposing Kafka data as an API:
TOKEN "metrics_api_token" READ
DESCRIPTION >
Simple endpoint to query recent orders from Kafka
NODE recent_orders
SQL >
%
SELECT
order_id,
customer_id,
product_id,
quantity,
price,
order_total,
timestamp,
payment_method
FROM orders_kafka
WHERE timestamp >= toDateTime({{ String(start_time, '2025-01-27 00:00:00', description='Start time (YYYY-MM-DD HH:MM:SS)') }})
AND timestamp <= toDateTime({{ String(end_time, '2025-01-27 23:59:59', description='End time (YYYY-MM-DD HH:MM:SS)') }})
ORDER BY timestamp DESC
LIMIT {{ Int32(limit, 100, description='Maximum number of results') }}
Deploy and test your endpoint:
# Deploy to cloud
tb --cloud deploy
# Test the endpoint
curl "https://api.tinybird.co/v0/pipes/recent_orders.json?token=YOUR_TOKEN&start_time=2025-01-27%2000:00:00&end_time=2025-01-27%2023:59:59&limit=10"
You now have a functional API endpoint querying Kafka data in real-time. Subsequent sections will cover enrichment, materialized views, and more advanced endpoints.
Part 2: Enrich Data with Dimension Tables
This section details how to enhance your data pipeline by incorporating dimension tables for data enrichment.
Step 1: Set Up Dimension Tables for Data Enrichment
Dimension tables store reference data used to enrich order events. Create tables for products and customers.
- Products Dimension Table
SCHEMA >
product_id Int32,
product_name String,
category LowCardinality(String),
brand LowCardinality(String),
base_price Decimal(10, 2),
created_at DateTime
ENGINE MergeTree ENGINE_SORTING_KEY product_id ```
- Customers Dimension Table
SCHEMA >
customer_id String,
customer_name String,
email String,
country LowCardinality(String),
customer_segment LowCardinality(String),
registration_date DateTime
ENGINE MergeTree ENGINE_SORTING_KEY customer_id ```
-
Load Dimension Data Load data into these tables using the Events API, S3 URLs, or Copy Pipes. For automated ingestion from S3, use the S3 connector to automatically sync dimension data when files are updated in your bucket.
For relatively small dimension tables (up to a few million rows), using
MergeTreewithreplaceis simpler and ensures data consistency. When updating dimension data, usereplaceinstead ofappendto atomically update the entire table:
Replace all products from S3 URL (atomic operation)
tb datasource replace products s3://my-bucket/dimension-data/products.csv
Replace all customers from S3 URL (atomic operation)
tb datasource replace customers s3://my-bucket/dimension-data/customers.csv
```
For larger dimension tables or when incremental updates are required, ReplacingMergeTree with append operations can be used. This allows deduplication during merges but necessitates using FINAL in JOINs to retrieve the latest version of each record. For most use cases with smaller dimension tables, MergeTree + replace offers a simpler and more direct approach.
Step 2: Enrich Kafka Events with Dimension Data
Create a Materialized View to enrich order events with product and customer data during ingestion.
First, define the datasource file:
SCHEMA >
`order_id` String,
`customer_id` String,
`product_id` String,
`quantity` Int32,
`price` Decimal(10, 2),
`order_total` Decimal(10, 2),
`timestamp` DateTime,
`payment_method` LowCardinality(String),
`shipping_address` String,
`product_name` String,
`product_category` LowCardinality(String),
`product_brand` String,
`product_base_price` Decimal(10, 2),
`customer_name` String,
`customer_country` LowCardinality(String),
`customer_segment` LowCardinality(String),
`discount_amount` Decimal(10, 2)
ENGINE MergeTree
ENGINE_PARTITION_KEY toYYYYMM(timestamp)
ENGINE_SORTING_KEY timestamp, order_id
Then, create the materialized view pipe:
NODE enriched_orders
SQL >
SELECT
o.order_id,
o.customer_id,
o.product_id,
o.quantity,
o.price,
o.order_total,
o.timestamp,
o.payment_method,
o.shipping_address,
-- Enrich with product data using JOIN
p.product_name,
p.category as product_category,
p.brand as product_brand,
p.base_price as product_base_price,
-- Enrich with customer data
c.customer_name,
c.country as customer_country,
c.customer_segment,
-- Calculate derived fields
o.order_total - (o.quantity * p.base_price) as discount_amount
FROM orders_kafka o
LEFT JOIN products p ON o.product_id = p.product_id
LEFT JOIN customers c ON o.customer_id = c.customer_id
TYPE materialized
DATASOURCE enriched_orders
Since MergeTree with replace is used for dimension tables, FINAL in the JOINs is not required. The replace operation ensures tables always contain the most current data, simplifying and enhancing the performance of JOINs for smaller dimension tables (up to a few million rows). This Materialized View automatically processes new orders from Kafka.
Deploy it:
tb deploy
Part 3: Add PostgreSQL Enrichment
Step 1: Sync PostgreSQL Data for Real-Time Enrichment
For data managed in PostgreSQL (e.g., a frequently updated product catalog), the PostgreSQL table function can be used to sync it into Tinybird.
- Set Up PostgreSQL Secrets
tb secret set pg_database ecommerce
tb secret set pg_username postgres
tb secret set pg_password your_password
```
Note: The PostgreSQL host and port are specified directly in the Copy Pipe as 'your-postgres-host.com:5432'. Update this placeholder with your actual PostgreSQL connection string.
- Create a Copy Pipe to Sync Product Catalog
NODE sync_catalog SQL > SELECT product_id, product_name, category, brand, base_price, description, updated_at FROM postgresql( 'your-postgres-host.com:5432', {{ tb_secret('pg_database') }}, 'product_catalog', {{ tb_secret('pg_username') }}, {{ tb_secret('pg_password') }} )
TYPE COPY
TARGET_DATASOURCE product_catalog_synced
COPY_MODE replace
COPY_SCHEDULE "*/15 * * * *" -- Sync every 15 minutes
```
This Copy Pipe performs the following actions:
* Reads data from your PostgreSQL product_catalog table.
* Atomically replaces the entire product_catalog_synced Data Source with the new data on each run.
* Runs automatically every 15 minutes.
* Uses COPY_MODE replace to ensure the synced table consistently matches the PostgreSQL source.
For dimension tables that are relatively small (up to a few million rows), using `replace` is recommended for its simplicity and data consistency.
- Use PostgreSQL Data in Enrichment
Update your enrichment Materialized View to incorporate data synced from PostgreSQL. First, update the datasource schema to include the product description:
SCHEMA >
order_id String,
customer_id String,
product_id String,
quantity Int32,
price Decimal(10, 2),
order_total Decimal(10, 2),
timestamp DateTime,
payment_method LowCardinality(String),
shipping_address String,
product_name String,
product_category LowCardinality(String),
product_brand String,
product_base_price Decimal(10, 2),
product_description String,
customer_name String,
customer_country LowCardinality(String),
customer_segment LowCardinality(String),
discount_amount Decimal(10, 2)
ENGINE MergeTree
ENGINE_PARTITION_KEY toYYYYMM(timestamp)
ENGINE_SORTING_KEY timestamp, order_id
Then, update the materialized view pipe:
NODE enriched_orders
SQL >
SELECT
o.order_id,
o.customer_id,
o.product_id,
o.quantity,
o.price,
o.order_total,
o.timestamp,
o.payment_method,
o.shipping_address,
-- Dimension table enrichment (fast, always available)
p.product_name,
p.category as product_category,
p.brand as product_brand,
p.base_price as product_base_price,
-- PostgreSQL-synced data (updated every 15 minutes)
pg.description as product_description,
-- Customer enrichment
c.customer_name,
c.country as customer_country,
c.customer_segment,
-- Calculate derived fields
o.order_total - (o.quantity * p.base_price) as discount_amount
FROM orders_kafka o
LEFT JOIN products p ON o.product_id = p.product_id
LEFT JOIN customers c ON o.customer_id = c.customer_id
LEFT JOIN product_catalog_synced pg ON o.product_id = pg.product_id
TYPE materialized DATASOURCE enriched_orders ``` Consider the following for dimension tables versus PostgreSQL: * Dimension tables: Ideal for data that changes infrequently and requires sub-millisecond lookups. * PostgreSQL sync: Best for data that is frequently updated and managed within your application database.
Part 4: Create Materialized Views for Fast Analytics Queries
This section focuses on creating Materialized Views to pre-aggregate metrics, enabling fast API queries.
- Revenue Metrics Materialized View
First, create the datasource file:
SCHEMA >
hour DateTime,
product_category LowCardinality(String),
customer_country LowCardinality(String),
order_count UInt64,
total_revenue Decimal(10, 2),
avg_order_value Decimal(10, 2),
total_units_sold UInt32
ENGINE SummingMergeTree
ENGINE_PARTITION_KEY toYYYYMM(hour)
ENGINE_SORTING_KEY hour, product_category, customer_country
Then, create the materialized view pipe:
NODE revenue_by_hour
SQL >
SELECT
toStartOfHour(timestamp) as hour,
product_category,
customer_country,
count() as order_count,
sum(order_total) as total_revenue,
avg(order_total) as avg_order_value,
sum(quantity) as total_units_sold
FROM enriched_orders
GROUP BY hour, product_category, customer_country
TYPE materialized DATASOURCE revenue_metrics ```
- Top Products Materialized View
First, create the datasource file:
SCHEMA >
hour DateTime,
product_id String,
product_name String,
product_category String,
order_count UInt64,
units_sold UInt32,
revenue Decimal(10, 2)
ENGINE SummingMergeTree
ENGINE_PARTITION_KEY toYYYYMM(hour)
ENGINE_SORTING_KEY hour, product_id
Then, create the materialized view pipe:
NODE top_products_hourly
SQL >
SELECT
toStartOfHour(timestamp) as hour,
product_id,
product_name,
product_category,
count() as order_count,
sum(quantity) as units_sold,
sum(order_total) as revenue
FROM enriched_orders
GROUP BY hour, product_id, product_name, product_category
TYPE materialized DATASOURCE top_products ```
- Customer Analytics Materialized View
First, create the datasource file:
SCHEMA >
customer_id String,
customer_name String,
customer_country LowCardinality(String),
customer_segment LowCardinality(String),
date Date,
order_count UInt64,
lifetime_value Decimal(10, 2),
avg_order_value Decimal(10, 2),
first_order_date DateTime,
last_order_date DateTime
ENGINE ReplacingMergeTree
ENGINE_PARTITION_KEY toYYYYMM(date)
ENGINE_SORTING_KEY customer_id, date
Then, create the materialized view pipe:
NODE customer_metrics
SQL >
SELECT
customer_id,
customer_name,
customer_country,
customer_segment,
toStartOfDay(timestamp) as date,
count() as order_count,
sum(order_total) as lifetime_value,
avg(order_total) as avg_order_value,
min(timestamp) as first_order_date,
max(timestamp) as last_order_date
FROM enriched_orders
GROUP BY customer_id, customer_name, customer_country, customer_segment, date
TYPE materialized DATASOURCE customer_analytics ``` These Materialized Views update automatically as new orders arrive, keeping your metrics consistently current.
Part 5: Build Advanced Real-Time Analytics API Endpoints
Now, create advanced API endpoints that leverage Materialized Views for fast, aggregated queries. This section details the creation of three API endpoints to serve real-time metrics.
- Endpoint 1: Real-Time Revenue Metrics
TOKEN "metrics_api_token" READ
DESCRIPTION > Real-time revenue metrics by time window, category and country
NODE filtered_metrics SQL > % SELECT * FROM revenue_metrics WHERE hour >= toDateTime({{ String(start_time, '2025-01-27 00:00:00', description='Start time (YYYY-MM-DD HH:MM:SS)') }}) AND hour <= toDateTime({{ String(end_time, '2025-01-27 23:59:59', description='End time (YYYY-MM-DD HH:MM:SS)') }}) {% if defined(product_category) %} AND product_category = {{ String(product_category, description='Filter by product category') }} {% endif %} {% if defined(customer_country) %} AND customer_country = {{ String(customer_country, description='Filter by country') }} {% endif %}
NODE aggregated SQL > SELECT sum(order_count) as total_orders, sum(total_revenue) as total_revenue, avg(avg_order_value) as avg_order_value, sum(total_units_sold) as total_units_sold, min(hour) as period_start, max(hour) as period_end FROM filtered_metrics GROUP BY product_category, customer_country ORDER BY total_revenue DESC ``` This endpoint provides aggregated revenue metrics, filtered by time range, product category, and customer country, including totals for orders, revenue, average order value, and units sold.
- Endpoint 2: Top Products
TOKEN "metrics_api_token" READ
DESCRIPTION > Top products by sales, revenue, or units sold
NODE filtered_products SQL > % SELECT * FROM top_products WHERE hour >= toDateTime({{ String(start_time, '2025-01-27 00:00:00', description='Start time') }}) AND hour <= toDateTime({{ String(end_time, '2025-01-27 23:59:59', description='End time') }}) {% if defined(category) %} AND product_category = {{ String(category, description='Filter by category') }} {% endif %}
NODE ranked_products SQL > SELECT product_id, product_name, product_category, sum(order_count) as total_orders, sum(units_sold) as total_units_sold, sum(revenue) as total_revenue FROM filtered_products GROUP BY product_id, product_name, product_category ORDER BY {% if defined(sort_by) and sort_by == 'revenue' %} total_revenue {% elif defined(sort_by) and sort_by == 'units' %} total_units_sold {% else %} total_orders {% endif %} DESC LIMIT {{ Int32(limit, 10, description='Number of products to return') }} ``` This endpoint returns the top-selling products, ranked by orders, revenue, or units sold, with optional filtering by time range and product category.
- Endpoint 3: Customer Analytics
TOKEN "metrics_api_token" READ
DESCRIPTION > Customer analytics including lifetime value and purchase patterns
NODE customer_stats SQL > % SELECT customer_id, customer_name, customer_country, customer_segment, sum(order_count) as total_orders, sum(lifetime_value) as lifetime_value, avg(avg_order_value) as avg_order_value, min(first_order_date) as first_order_date, max(last_order_date) as last_order_date, dateDiff('day', min(first_order_date), max(last_order_date)) as customer_lifetime_days FROM customer_analytics WHERE date >= toDate({{ String(start_date, '2025-01-01', description='Start date (YYYY-MM-DD)') }}) AND date <= toDate({{ String(end_date, '2025-01-27', description='End date (YYYY-MM-DD)') }}) {% if defined(customer_id) %} AND customer_id = {{ String(customer_id, description='Filter by customer ID') }} {% endif %} {% if defined(country) %} AND customer_country = {{ String(country, description='Filter by country') }} {% endif %} GROUP BY customer_id, customer_name, customer_country, customer_segment ORDER BY lifetime_value DESC LIMIT {{ Int32(limit, 100, description='Number of customers to return') }} ``` This endpoint provides comprehensive customer analytics, including lifetime value, order counts, average order value, and customer lifetime metrics, with optional filtering by customer ID, country, and date range.
Step 8: Deploy and Test Your Analytics API
- Deploy to Cloud
Deploy all resources to Tinybird Cloud:
Deploy everything to cloud
tb --cloud deploy ``` This command will: * Create all Data Sources in your cloud workspace. * Deploy all Pipes and Materialized Views. * Set up the Kafka connector to begin consumption. * Make your API endpoints accessible.
Verify deployment:
```
Open the Tinybird dashboard to view your resources
tb open
Or run SQL queries directly from the CLI
tb --cloud sql "SELECT count(*) FROM orders_kafka" ```
-
Create API Token The endpoints specify a token named
metrics_api_tokenusingTOKEN "metrics_api_token" READin each pipe file. You can create JWT tokens via the CLI or generate them programmatically within your application. For detailed instructions on JWT token creation, including CLI commands and programmatic examples, refer to the JWT tokens documentation. The token name in the pipe files must precisely match the token name you create. JWT tokens have a TTL (time to live) and require a--resourceparameter pointing to the specific pipe name. -
Test the Simple Endpoint First, test the basic endpoint created in Part 1:
- Test the Revenue Metrics Endpoint
Replace
YOUR_TOKENwith your actual token:
curl "https://api.tinybird.co/v0/pipes/api_revenue_metrics.json?token=YOUR_TOKEN&start_time=2025-01-27%2000:00:00&end_time=2025-01-27%2023:59:59"
Expected response structure: json
{
"meta": [
{"name": "total_orders", "type": "UInt64"},
{"name": "total_revenue", "type": "Decimal(10, 2)"},
...
],
"data": [
{
"total_orders": 1523,
"total_revenue": 45678.90,
"avg_order_value": 29.98,
"total_units_sold": 3046,
"period_start": "2025-01-27 00:00:00",
"period_end": "2025-01-27 23:00:00",
"product_category": "Electronics",
"customer_country": "US"
}
],
"rows": 1,
"statistics": {
"elapsed": 0.012,
"rows_read": 1234,
"bytes_read": 56789
}
}
The actual data response for testing: json
{
"data": [
{
"total_orders": 1523,
"total_revenue": 45678.90,
"avg_order_value": 29.98,
"total_units_sold": 3046,
"period_start": "2025-01-27 00:00:00",
"period_end": "2025-01-27 23:00:00",
"product_category": "Electronics",
"customer_country": "US"
}
]
}
```
Troubleshooting:
* If you receive 401 Unauthorized, verify your token is correct.
* If 404 Not Found occurs, ensure the pipe name is an exact match.
* If results are empty, check if data has been ingested and Materialized Views have processed it.
- Test the Top Products Endpoint
- Test the Customer Analytics Endpoint
-
Real-Time Updates As new orders arrive in Kafka, they are automatically:
- Ingested into
orders_kafka. - Enriched with product and customer data.
- Aggregated in Materialized Views.
- Made available through your API endpoints within seconds.
You can verify real-time updates by:
- Sending a test order to your Kafka topic.
- Waiting a few seconds.
- Querying your endpoints again; the new order should be reflected in the metrics.
- Ingested into
-
Performance Characteristics This architecture delivers:
- Sub-100ms API latency: Pre-aggregated Materialized Views provide query responses in milliseconds.
- Real-time freshness: Data appears in APIs within seconds of Kafka ingestion.
- High throughput: Capable of handling thousands of events per second.
- Scalability: Automatically scales with your data volume.
Next Steps
You have successfully built a complete real-time analytics API. Future considerations include:
- Exporting data back to Kafka using Sinks.
- Connecting to BI tools like Tableau, Power BI, and Grafana.
- Monitoring and optimization using
kafka_ops_logand performance tuning. - Common patterns and extensions for scaling your pipeline.
For immediate next steps:
- Add more endpoints: Create endpoints for specific business metrics.
- Set up monitoring: Use
tinybird.kafka_ops_logto monitor consumer lag and errors. - Add alerting: Create endpoints that trigger alerts when metrics exceed thresholds.
- Optimize further: Add more Materialized Views for common query patterns.
Related Documentation
- Kafka Connector Guide - Complete Kafka setup documentation.
- PostgreSQL Table Function - Sync data from PostgreSQL.
- Materialized Views - Learn more about Materialized Views.
- API Endpoints - Endpoint configuration and optimization.
Conclusion
In approximately 15 minutes, you've constructed a production-ready real-time analytics API that:
- Automatically ingests data from Kafka.
- Enriches events with dimension tables and PostgreSQL data.
- Pre-aggregates metrics for rapid querying.
- Serves data through multiple API endpoints.
- Updates in real time as new events arrive.
All this is achieved with no application code and no infrastructure management, relying solely on SQL and configuration. This demonstrates the power of Tinybird for real-time analytics.
Ready to build your own? Sign up for free and start building. Join the Slack community for support.