Build a Real-Time E-Commerce Analytics API from Kafka in 15 Minutes

data engineering

This guide details building a real-time e-commerce analytics API from Kafka streams using Tinybird. Learn data ingestion, enrichment with dimension tables and PostgreSQL, materialized views for fast aggregations, and exposing API endpoints for comprehensive, real-time insights.

The article provides a detailed guide on building a production-ready real-time e-commerce analytics API from Kafka streams using Tinybird. It covers data ingestion, enrichment with dimension tables and PostgreSQL, materialized views for fast aggregations, and exposing data through multiple API endpoints.

What You'll Build

By the end of this tutorial, you will have developed:

  • A serverless Kafka connector for real-time ingestion of order events.
  • A basic API endpoint to query raw Kafka data.
  • Enriched data using dimension tables for products and customers.
  • PostgreSQL enrichment for product catalog data.
  • Materialized Views for pre-aggregated metrics.
  • Advanced API endpoints for real-time analytics.

Architecture Overview

The data pipeline is designed to be straightforward yet powerful. Events from Kafka are immediately queryable via API endpoints. The pipeline is then enhanced by enriching events with reference data from dimension tables and PostgreSQL. Subsequently, metrics are pre-aggregated in Materialized Views for rapid querying, and finally, served through low-latency API endpoints.

Prerequisites

Before you begin, ensure you have:

  • A Tinybird account (sign up free).
  • The Tinybird CLI installed (installation guide).
  • Access to a Kafka cluster (Confluent Cloud, AWS MSK, or self-hosted).
  • A PostgreSQL database (optional, for the enrichment example).
  • Basic familiarity with SQL.

Part 1: Kafka to API - Get Started in 5 Minutes

This section focuses on connecting to Kafka and creating a basic API endpoint to query raw data efficiently.

Step 1: Connect Kafka to Your Analytics Pipeline

First, establish a Kafka connection. This can be done via the CLI or by manually creating a connection file.

  • Option 1: Using the CLI (Recommended) Run the interactive command:

tb connection create kafka ``` You will be prompted to provide: * Connection name (e.g., ecommerce_kafka) * Bootstrap server (e.g., pkc-xxxxx.us-east-1.aws.confluent.cloud:9092) * Kafka key (API key for Confluent Cloud, or username) * Kafka secret (API secret for Confluent Cloud, or password) The CLI will generate a .connection file for you.

Throughout this guide, `tb --cloud` is used to deploy to Tinybird Cloud, and `tb` (without the flag) for local Tinybird environments. Use `tb --cloud deploy` for production deployment and `tb deploy` for local testing. This applies to `secrets`, `append`, `connection`, etc.
  • Option 2: Manual Connection File Alternatively, create a connection file manually:

TYPE kafka KAFKA_BOOTSTRAP_SERVERS pkc-xxxxx.us-east-1.aws.confluent.cloud:9092 KAFKA_SECURITY_PROTOCOL SASL_SSL KAFKA_SASL_MECHANISM PLAIN KAFKA_KEY {{ tb_secret("KAFKA_KEY") }} KAFKA_SECRET {{ tb_secret("KAFKA_SECRET") }} Set your secrets: tb secret set KAFKA_KEY your_kafka_key tb secret set KAFKA_SECRET your_kafka_secret ```

  • Validate the Connection Kafka connections involve various settings (bootstrap servers, security protocols, SASL mechanisms, SSL configurations), and client libraries can sometimes have opaque error handling. Previewing your data and having a troubleshooting guide is crucial before proceeding.

    Test your connection:

tb connection data kafka_ecommerce This command validates connectivity, authentication, and previews messages from your topics. You should see output similar to: ✓ Connection validated successfully ✓ Found 3 topics

  • orders (12 partitions)
  • payments (8 partitions)
  • shipping (6 partitions) ✓ Sample message from 'orders' topic: {"order_id": "ord_12345", "customer_id": "cust_67890", ...} If errors occur, verify: * The bootstrap server address is correct and reachable. * Credentials are properly set using `tb secret list`. * Network connectivity (firewall rules, security groups). * The Kafka cluster is running and accessible. For detailed troubleshooting, refer to the [Kafka connector troubleshooting guide](Kafka%20connector%20troubleshooting%20guide).

Step 2: Create Kafka Data Source for Real-Time Ingestion

Next, create a Data Source that consumes from your Kafka topic.

  • Using the CLI

tb datasource create --kafka ``` This interactive command will guide you to: * Select your Kafka connection. * List available topics. * Preview message structure. * Automatically generate a schema.

  • Manual Data Source Creation Alternatively, create the Data Source file manually. Here's an example for order events:

SCHEMA > order_id String json:$.order_id, customer_id String json:$.customer_id, product_id Int32 json:$.product_id, quantity Int32 json:$.quantity, price Decimal(10, 2) json:$.price, order_total Decimal(10, 2) json:$.order_total, timestamp DateTime json:$.timestamp, payment_method LowCardinality(String) json:$.payment_method, shipping_address String json:$.shipping_address

ENGINE MergeTree ENGINE_PARTITION_KEY toYYYYMM(timestamp) ENGINE_SORTING_KEY timestamp, order_id

KAFKA_CONNECTION_NAME kafka_ecommerce KAFKA_TOPIC orders KAFKA_GROUP_ID {{ tb_secret("KAFKA_GROUP_ID", "ecommerce_consumer") }} KAFKA_AUTO_OFFSET_RESET latest ``` The KAFKA_GROUP_ID uniquely identifies your consumer group. Each combination of KAFKA_TOPIC and KAFKA_GROUP_ID can only be used in one Data Source to ensure correct offset tracking and prevent conflicts.

**Best practices:** * Use unique group IDs per Data Source when consuming from the same topic. * Use environment-specific group IDs to isolate consumers and their committed offsets: * Local: `"ecommerce_consumer_local"` (or default `"ecommerce_consumer"`) * Staging: `"ecommerce_consumer_staging"` * Production: `"ecommerce_consumer_prod"` You can manage group IDs using secrets with defaults: ```

Set different group IDs for each environment

tb secret set KAFKA_GROUP_ID ecommerce_consumer_local # Local tb --cloud --host <STAGING_HOST> secret set KAFKA_GROUP_ID ecommerce_consumer_staging # Staging tb --cloud --host <PROD_HOST> secret set KAFKA_GROUP_ID ecommerce_consumer_prod # Production ``` The {{ tb_secret("KAFKA_GROUP_ID", "ecommerce_consumer") }} syntax utilizes the secret value if set, or defaults to "ecommerce_consumer" if not. This facilitates using different group IDs across environments without modifying Data Source files. For more details on managing consumer groups, refer to the CI/CD and version control guide.

**Schema optimization notes:** * **Decimal types**: `Decimal(10, 2)` is used for `price` and `order_total` to ensure precise financial calculations, avoiding floating-point rounding errors. * **LowCardinality**: `payment_method` uses `LowCardinality(String)` because it typically has a limited number of unique values (e.g., credit_card, debit_card, paypal), which optimizes storage and query performance. * **Sorting key**: The sorting key `(timestamp, order_id)` is optimized for time series queries. Columns frequently used for filtering, such as `timestamp` for time range queries and `order_id` for specific order lookups, should always be included in the sorting key. * **Partitioning**: Monthly partitioning by `toYYYYMM(timestamp)` keeps partitions manageable and enables efficient data management. **Sample Kafka Message** Your Kafka messages should conform to this structure: ```json

{ "order_id": "ord_12345", "customer_id": "cust_67890", "product_id": 42, "quantity": 2, "price": 29.99, "order_total": 59.98, "timestamp": "2025-01-27T10:30:00Z", "payment_method": "credit_card", "shipping_address": "123 Main St, New York, NY 10001" } ```

  • Verify Data Ingestion Before deployment, test locally:

Deploy to local environment (Tinybird Local)

tb deploy

Check if data is flowing

tb sql "SELECT count(*) FROM orders_kafka" Once verified locally, deploy to the cloud:

Deploy to Tinybird Cloud

tb --cloud deploy ``` Remember: tb deploy targets your local Tinybird environment, while tb --cloud deploy targets Tinybird Cloud. Use local deployment for testing and cloud deployment for production.

After deployment, confirm ingestion: ```sql

-- Check recent orders SELECT count(*) as total_orders, min(timestamp) as first_order, max(timestamp) as last_order FROM orders_kafka WHERE timestamp > now() - INTERVAL 1 hour ``` Data will flow into Tinybird as soon as messages arrive in your Kafka topic. Monitor ingestion in the Tinybird UI or query the data directly. For comprehensive monitoring of your Kafka connector (consumer lag, throughput, errors), consult the Kafka monitoring guide. You can also query the tinybird.kafka_ops_log Service Data Source for real-time diagnostics.

Step 3: Create Your First API Endpoint

Now, create a simple API endpoint to query your raw Kafka data, demonstrating the ease of exposing Kafka data as an API:

TOKEN "metrics_api_token" READ DESCRIPTION > Simple endpoint to query recent orders from Kafka NODE recent_orders SQL > % SELECT order_id, customer_id, product_id, quantity, price, order_total, timestamp, payment_method FROM orders_kafka WHERE timestamp >= toDateTime({{ String(start_time, '2025-01-27 00:00:00', description='Start time (YYYY-MM-DD HH:MM:SS)') }}) AND timestamp <= toDateTime({{ String(end_time, '2025-01-27 23:59:59', description='End time (YYYY-MM-DD HH:MM:SS)') }}) ORDER BY timestamp DESC LIMIT {{ Int32(limit, 100, description='Maximum number of results') }}

Deploy and test your endpoint:

# Deploy to cloud tb --cloud deploy # Test the endpoint curl "https://api.tinybird.co/v0/pipes/recent_orders.json?token=YOUR_TOKEN&start_time=2025-01-27%2000:00:00&end_time=2025-01-27%2023:59:59&limit=10"

You now have a functional API endpoint querying Kafka data in real-time. Subsequent sections will cover enrichment, materialized views, and more advanced endpoints.


Part 2: Enrich Data with Dimension Tables

This section details how to enhance your data pipeline by incorporating dimension tables for data enrichment.

Step 1: Set Up Dimension Tables for Data Enrichment

Dimension tables store reference data used to enrich order events. Create tables for products and customers.

  • Products Dimension Table

SCHEMA > product_id Int32, product_name String, category LowCardinality(String), brand LowCardinality(String), base_price Decimal(10, 2), created_at DateTime

ENGINE MergeTree ENGINE_SORTING_KEY product_id ```

  • Customers Dimension Table

SCHEMA > customer_id String, customer_name String, email String, country LowCardinality(String), customer_segment LowCardinality(String), registration_date DateTime

ENGINE MergeTree ENGINE_SORTING_KEY customer_id ```

  • Load Dimension Data Load data into these tables using the Events API, S3 URLs, or Copy Pipes. For automated ingestion from S3, use the S3 connector to automatically sync dimension data when files are updated in your bucket.

    For relatively small dimension tables (up to a few million rows), using MergeTree with replace is simpler and ensures data consistency. When updating dimension data, use replace instead of append to atomically update the entire table:

Replace all products from S3 URL (atomic operation)

tb datasource replace products s3://my-bucket/dimension-data/products.csv

Replace all customers from S3 URL (atomic operation)

tb datasource replace customers s3://my-bucket/dimension-data/customers.csv ``` For larger dimension tables or when incremental updates are required, ReplacingMergeTree with append operations can be used. This allows deduplication during merges but necessitates using FINAL in JOINs to retrieve the latest version of each record. For most use cases with smaller dimension tables, MergeTree + replace offers a simpler and more direct approach.

Step 2: Enrich Kafka Events with Dimension Data

Create a Materialized View to enrich order events with product and customer data during ingestion.

First, define the datasource file:

SCHEMA > `order_id` String, `customer_id` String, `product_id` String, `quantity` Int32, `price` Decimal(10, 2), `order_total` Decimal(10, 2), `timestamp` DateTime, `payment_method` LowCardinality(String), `shipping_address` String, `product_name` String, `product_category` LowCardinality(String), `product_brand` String, `product_base_price` Decimal(10, 2), `customer_name` String, `customer_country` LowCardinality(String), `customer_segment` LowCardinality(String), `discount_amount` Decimal(10, 2) ENGINE MergeTree ENGINE_PARTITION_KEY toYYYYMM(timestamp) ENGINE_SORTING_KEY timestamp, order_id

Then, create the materialized view pipe:

NODE enriched_orders SQL > SELECT o.order_id, o.customer_id, o.product_id, o.quantity, o.price, o.order_total, o.timestamp, o.payment_method, o.shipping_address, -- Enrich with product data using JOIN p.product_name, p.category as product_category, p.brand as product_brand, p.base_price as product_base_price, -- Enrich with customer data c.customer_name, c.country as customer_country, c.customer_segment, -- Calculate derived fields o.order_total - (o.quantity * p.base_price) as discount_amount FROM orders_kafka o LEFT JOIN products p ON o.product_id = p.product_id LEFT JOIN customers c ON o.customer_id = c.customer_id TYPE materialized DATASOURCE enriched_orders

Since MergeTree with replace is used for dimension tables, FINAL in the JOINs is not required. The replace operation ensures tables always contain the most current data, simplifying and enhancing the performance of JOINs for smaller dimension tables (up to a few million rows). This Materialized View automatically processes new orders from Kafka.

Deploy it:

tb deploy

Part 3: Add PostgreSQL Enrichment

Step 1: Sync PostgreSQL Data for Real-Time Enrichment

For data managed in PostgreSQL (e.g., a frequently updated product catalog), the PostgreSQL table function can be used to sync it into Tinybird.

  • Set Up PostgreSQL Secrets

tb secret set pg_database ecommerce tb secret set pg_username postgres tb secret set pg_password your_password ``` Note: The PostgreSQL host and port are specified directly in the Copy Pipe as 'your-postgres-host.com:5432'. Update this placeholder with your actual PostgreSQL connection string.

  • Create a Copy Pipe to Sync Product Catalog

NODE sync_catalog SQL > SELECT product_id, product_name, category, brand, base_price, description, updated_at FROM postgresql( 'your-postgres-host.com:5432', {{ tb_secret('pg_database') }}, 'product_catalog', {{ tb_secret('pg_username') }}, {{ tb_secret('pg_password') }} )

TYPE COPY TARGET_DATASOURCE product_catalog_synced COPY_MODE replace COPY_SCHEDULE "*/15 * * * *" -- Sync every 15 minutes ``` This Copy Pipe performs the following actions: * Reads data from your PostgreSQL product_catalog table. * Atomically replaces the entire product_catalog_synced Data Source with the new data on each run. * Runs automatically every 15 minutes. * Uses COPY_MODE replace to ensure the synced table consistently matches the PostgreSQL source.

For dimension tables that are relatively small (up to a few million rows), using `replace` is recommended for its simplicity and data consistency.
  • Use PostgreSQL Data in Enrichment Update your enrichment Materialized View to incorporate data synced from PostgreSQL. First, update the datasource schema to include the product description:

SCHEMA > order_id String, customer_id String, product_id String, quantity Int32, price Decimal(10, 2), order_total Decimal(10, 2), timestamp DateTime, payment_method LowCardinality(String), shipping_address String, product_name String, product_category LowCardinality(String), product_brand String, product_base_price Decimal(10, 2), product_description String, customer_name String, customer_country LowCardinality(String), customer_segment LowCardinality(String), discount_amount Decimal(10, 2)

ENGINE MergeTree ENGINE_PARTITION_KEY toYYYYMM(timestamp) ENGINE_SORTING_KEY timestamp, order_id Then, update the materialized view pipe: NODE enriched_orders SQL > SELECT o.order_id, o.customer_id, o.product_id, o.quantity, o.price, o.order_total, o.timestamp, o.payment_method, o.shipping_address, -- Dimension table enrichment (fast, always available) p.product_name, p.category as product_category, p.brand as product_brand, p.base_price as product_base_price, -- PostgreSQL-synced data (updated every 15 minutes) pg.description as product_description, -- Customer enrichment c.customer_name, c.country as customer_country, c.customer_segment, -- Calculate derived fields o.order_total - (o.quantity * p.base_price) as discount_amount FROM orders_kafka o LEFT JOIN products p ON o.product_id = p.product_id LEFT JOIN customers c ON o.customer_id = c.customer_id LEFT JOIN product_catalog_synced pg ON o.product_id = pg.product_id

TYPE materialized DATASOURCE enriched_orders ``` Consider the following for dimension tables versus PostgreSQL: * Dimension tables: Ideal for data that changes infrequently and requires sub-millisecond lookups. * PostgreSQL sync: Best for data that is frequently updated and managed within your application database.


Part 4: Create Materialized Views for Fast Analytics Queries

This section focuses on creating Materialized Views to pre-aggregate metrics, enabling fast API queries.

  • Revenue Metrics Materialized View First, create the datasource file:

SCHEMA > hour DateTime, product_category LowCardinality(String), customer_country LowCardinality(String), order_count UInt64, total_revenue Decimal(10, 2), avg_order_value Decimal(10, 2), total_units_sold UInt32

ENGINE SummingMergeTree ENGINE_PARTITION_KEY toYYYYMM(hour) ENGINE_SORTING_KEY hour, product_category, customer_country Then, create the materialized view pipe: NODE revenue_by_hour SQL > SELECT toStartOfHour(timestamp) as hour, product_category, customer_country, count() as order_count, sum(order_total) as total_revenue, avg(order_total) as avg_order_value, sum(quantity) as total_units_sold FROM enriched_orders GROUP BY hour, product_category, customer_country

TYPE materialized DATASOURCE revenue_metrics ```

  • Top Products Materialized View First, create the datasource file:

SCHEMA > hour DateTime, product_id String, product_name String, product_category String, order_count UInt64, units_sold UInt32, revenue Decimal(10, 2)

ENGINE SummingMergeTree ENGINE_PARTITION_KEY toYYYYMM(hour) ENGINE_SORTING_KEY hour, product_id Then, create the materialized view pipe: NODE top_products_hourly SQL > SELECT toStartOfHour(timestamp) as hour, product_id, product_name, product_category, count() as order_count, sum(quantity) as units_sold, sum(order_total) as revenue FROM enriched_orders GROUP BY hour, product_id, product_name, product_category

TYPE materialized DATASOURCE top_products ```

  • Customer Analytics Materialized View First, create the datasource file:

SCHEMA > customer_id String, customer_name String, customer_country LowCardinality(String), customer_segment LowCardinality(String), date Date, order_count UInt64, lifetime_value Decimal(10, 2), avg_order_value Decimal(10, 2), first_order_date DateTime, last_order_date DateTime

ENGINE ReplacingMergeTree ENGINE_PARTITION_KEY toYYYYMM(date) ENGINE_SORTING_KEY customer_id, date Then, create the materialized view pipe: NODE customer_metrics SQL > SELECT customer_id, customer_name, customer_country, customer_segment, toStartOfDay(timestamp) as date, count() as order_count, sum(order_total) as lifetime_value, avg(order_total) as avg_order_value, min(timestamp) as first_order_date, max(timestamp) as last_order_date FROM enriched_orders GROUP BY customer_id, customer_name, customer_country, customer_segment, date

TYPE materialized DATASOURCE customer_analytics ``` These Materialized Views update automatically as new orders arrive, keeping your metrics consistently current.


Part 5: Build Advanced Real-Time Analytics API Endpoints

Now, create advanced API endpoints that leverage Materialized Views for fast, aggregated queries. This section details the creation of three API endpoints to serve real-time metrics.

  • Endpoint 1: Real-Time Revenue Metrics

TOKEN "metrics_api_token" READ

DESCRIPTION > Real-time revenue metrics by time window, category and country

NODE filtered_metrics SQL > % SELECT * FROM revenue_metrics WHERE hour >= toDateTime({{ String(start_time, '2025-01-27 00:00:00', description='Start time (YYYY-MM-DD HH:MM:SS)') }}) AND hour <= toDateTime({{ String(end_time, '2025-01-27 23:59:59', description='End time (YYYY-MM-DD HH:MM:SS)') }}) {% if defined(product_category) %} AND product_category = {{ String(product_category, description='Filter by product category') }} {% endif %} {% if defined(customer_country) %} AND customer_country = {{ String(customer_country, description='Filter by country') }} {% endif %}

NODE aggregated SQL > SELECT sum(order_count) as total_orders, sum(total_revenue) as total_revenue, avg(avg_order_value) as avg_order_value, sum(total_units_sold) as total_units_sold, min(hour) as period_start, max(hour) as period_end FROM filtered_metrics GROUP BY product_category, customer_country ORDER BY total_revenue DESC ``` This endpoint provides aggregated revenue metrics, filtered by time range, product category, and customer country, including totals for orders, revenue, average order value, and units sold.

  • Endpoint 2: Top Products

TOKEN "metrics_api_token" READ

DESCRIPTION > Top products by sales, revenue, or units sold

NODE filtered_products SQL > % SELECT * FROM top_products WHERE hour >= toDateTime({{ String(start_time, '2025-01-27 00:00:00', description='Start time') }}) AND hour <= toDateTime({{ String(end_time, '2025-01-27 23:59:59', description='End time') }}) {% if defined(category) %} AND product_category = {{ String(category, description='Filter by category') }} {% endif %}

NODE ranked_products SQL > SELECT product_id, product_name, product_category, sum(order_count) as total_orders, sum(units_sold) as total_units_sold, sum(revenue) as total_revenue FROM filtered_products GROUP BY product_id, product_name, product_category ORDER BY {% if defined(sort_by) and sort_by == 'revenue' %} total_revenue {% elif defined(sort_by) and sort_by == 'units' %} total_units_sold {% else %} total_orders {% endif %} DESC LIMIT {{ Int32(limit, 10, description='Number of products to return') }} ``` This endpoint returns the top-selling products, ranked by orders, revenue, or units sold, with optional filtering by time range and product category.

  • Endpoint 3: Customer Analytics

TOKEN "metrics_api_token" READ

DESCRIPTION > Customer analytics including lifetime value and purchase patterns

NODE customer_stats SQL > % SELECT customer_id, customer_name, customer_country, customer_segment, sum(order_count) as total_orders, sum(lifetime_value) as lifetime_value, avg(avg_order_value) as avg_order_value, min(first_order_date) as first_order_date, max(last_order_date) as last_order_date, dateDiff('day', min(first_order_date), max(last_order_date)) as customer_lifetime_days FROM customer_analytics WHERE date >= toDate({{ String(start_date, '2025-01-01', description='Start date (YYYY-MM-DD)') }}) AND date <= toDate({{ String(end_date, '2025-01-27', description='End date (YYYY-MM-DD)') }}) {% if defined(customer_id) %} AND customer_id = {{ String(customer_id, description='Filter by customer ID') }} {% endif %} {% if defined(country) %} AND customer_country = {{ String(country, description='Filter by country') }} {% endif %} GROUP BY customer_id, customer_name, customer_country, customer_segment ORDER BY lifetime_value DESC LIMIT {{ Int32(limit, 100, description='Number of customers to return') }} ``` This endpoint provides comprehensive customer analytics, including lifetime value, order counts, average order value, and customer lifetime metrics, with optional filtering by customer ID, country, and date range.

Step 8: Deploy and Test Your Analytics API

  • Deploy to Cloud Deploy all resources to Tinybird Cloud:

Deploy everything to cloud

tb --cloud deploy ``` This command will: * Create all Data Sources in your cloud workspace. * Deploy all Pipes and Materialized Views. * Set up the Kafka connector to begin consumption. * Make your API endpoints accessible.

Verify deployment: ```

Open the Tinybird dashboard to view your resources

tb open

Or run SQL queries directly from the CLI

tb --cloud sql "SELECT count(*) FROM orders_kafka" ```

  • Create API Token The endpoints specify a token named metrics_api_token using TOKEN "metrics_api_token" READ in each pipe file. You can create JWT tokens via the CLI or generate them programmatically within your application. For detailed instructions on JWT token creation, including CLI commands and programmatic examples, refer to the JWT tokens documentation. The token name in the pipe files must precisely match the token name you create. JWT tokens have a TTL (time to live) and require a --resource parameter pointing to the specific pipe name.

  • Test the Simple Endpoint First, test the basic endpoint created in Part 1:

curl "https://api.tinybird.co/v0/pipes/recent_orders.json?token=YOUR_TOKEN&start_time=2025-01-27%2000:00:00&end_time=2025-01-27%2023:59:59&limit=10" ```

  • Test the Revenue Metrics Endpoint Replace YOUR_TOKEN with your actual token:

curl "https://api.tinybird.co/v0/pipes/api_revenue_metrics.json?token=YOUR_TOKEN&start_time=2025-01-27%2000:00:00&end_time=2025-01-27%2023:59:59" Expected response structure: json { "meta": [ {"name": "total_orders", "type": "UInt64"}, {"name": "total_revenue", "type": "Decimal(10, 2)"}, ... ], "data": [ { "total_orders": 1523, "total_revenue": 45678.90, "avg_order_value": 29.98, "total_units_sold": 3046, "period_start": "2025-01-27 00:00:00", "period_end": "2025-01-27 23:00:00", "product_category": "Electronics", "customer_country": "US" } ], "rows": 1, "statistics": { "elapsed": 0.012, "rows_read": 1234, "bytes_read": 56789 } } The actual data response for testing: json { "data": [ { "total_orders": 1523, "total_revenue": 45678.90, "avg_order_value": 29.98, "total_units_sold": 3046, "period_start": "2025-01-27 00:00:00", "period_end": "2025-01-27 23:00:00", "product_category": "Electronics", "customer_country": "US" } ] } ``` Troubleshooting: * If you receive 401 Unauthorized, verify your token is correct. * If 404 Not Found occurs, ensure the pipe name is an exact match. * If results are empty, check if data has been ingested and Materialized Views have processed it.

  • Test the Top Products Endpoint

curl "https://api.tinybird.co/v0/pipes/api_top_products.json?token=YOUR_TOKEN&start_time=2025-01-27%2000:00:00&end_time=2025-01-27%2023:59:59&sort_by=revenue&limit=5" ```

  • Test the Customer Analytics Endpoint

curl "https://api.tinybird.co/v0/pipes/api_customer_analytics.json?token=YOUR_TOKEN&start_date=2025-01-01&end_date=2025-01-27&limit=10" ```

  • Real-Time Updates As new orders arrive in Kafka, they are automatically:

    • Ingested into orders_kafka.
    • Enriched with product and customer data.
    • Aggregated in Materialized Views.
    • Made available through your API endpoints within seconds.

    You can verify real-time updates by:

    • Sending a test order to your Kafka topic.
    • Waiting a few seconds.
    • Querying your endpoints again; the new order should be reflected in the metrics.
  • Performance Characteristics This architecture delivers:

    • Sub-100ms API latency: Pre-aggregated Materialized Views provide query responses in milliseconds.
    • Real-time freshness: Data appears in APIs within seconds of Kafka ingestion.
    • High throughput: Capable of handling thousands of events per second.
    • Scalability: Automatically scales with your data volume.

Next Steps

You have successfully built a complete real-time analytics API. Future considerations include:

  • Exporting data back to Kafka using Sinks.
  • Connecting to BI tools like Tableau, Power BI, and Grafana.
  • Monitoring and optimization using kafka_ops_log and performance tuning.
  • Common patterns and extensions for scaling your pipeline.

For immediate next steps:

  • Add more endpoints: Create endpoints for specific business metrics.
  • Set up monitoring: Use tinybird.kafka_ops_log to monitor consumer lag and errors.
  • Add alerting: Create endpoints that trigger alerts when metrics exceed thresholds.
  • Optimize further: Add more Materialized Views for common query patterns.

Conclusion

In approximately 15 minutes, you've constructed a production-ready real-time analytics API that:

  • Automatically ingests data from Kafka.
  • Enriches events with dimension tables and PostgreSQL data.
  • Pre-aggregates metrics for rapid querying.
  • Serves data through multiple API endpoints.
  • Updates in real time as new events arrive.

All this is achieved with no application code and no infrastructure management, relying solely on SQL and configuration. This demonstrates the power of Tinybird for real-time analytics.

Ready to build your own? Sign up for free and start building. Join the Slack community for support.