|
| 1 | +# Query Kafka with SQL using marimo Python Notebook |
| 2 | + |
| 3 | +This tutorial demonstrates how to query Apache Kafka with SQL using Timeplus Proton and marimo notebooks. You'll learn how to set up a lightweight environment to process and visualize streaming data from Kafka without complex infrastructure requirements. |
| 4 | + |
| 5 | +## Key Highlights of This Approach |
| 6 | + |
| 7 | +- **Lightweight Setup**: No Docker, JVM, or complex Python dependencies |
| 8 | +- **Real Data**: Processing real-time GitHub events from Kafka |
| 9 | +- **Efficient Processing**: Processing millions of Kafka messages without local storage |
| 10 | +- **Interactive Visualization**: Charts update automatically and support interactive filtering |
| 11 | + |
| 12 | +## Quick Start |
| 13 | + |
| 14 | +Run the following commands to set up the environment: |
| 15 | + |
| 16 | +```bash |
| 17 | +curl https://astral.sh/uv/install.sh | sh |
| 18 | +curl https://install.timeplus.com/oss | sh |
| 19 | +./proton server& |
| 20 | +uvx marimo run --sandbox https://raw.githubusercontent.com/timeplus-io/proton/refs/heads/develop/examples/marimo/github.py |
| 21 | +``` |
| 22 | + |
| 23 | +These commands will: |
| 24 | +1. Download uv (a Rust-based Python manager) |
| 25 | +2. Download Timeplus Proton (an OSS streaming database in C++) |
| 26 | +3. Run a marimo notebook with all dependencies auto-installed |
| 27 | + |
| 28 | +## Detailed Setup and Tutorial Steps |
| 29 | + |
| 30 | +### Step 1: Introduction to marimo |
| 31 | + |
| 32 | +[marimo](https://marimo.io) is a modern Python notebook that offers several advantages over traditional Jupyter notebooks: |
| 33 | + |
| 34 | +- Pure Python code (no .ipynb JSON) |
| 35 | +- Git-friendly format |
| 36 | +- Integrated dependency management with uv |
| 37 | + |
| 38 | +The first part of the [github.py](https://github.com/timeplus-io/proton/blob/develop/examples/marimo/github.py) notebook declares its dependencies: |
| 39 | + |
| 40 | +```python |
| 41 | +# /// script |
| 42 | +# requires-python = ">=3.13" |
| 43 | +# dependencies = [ |
| 44 | +# "altair==5.5.0", |
| 45 | +# "marimo", |
| 46 | +# "polars[pyarrow]==1.26.0", |
| 47 | +# "sqlalchemy==2.0.40", |
| 48 | +# "sqlglot==26.12.1", |
| 49 | +# "timeplus-connect==0.8.16", |
| 50 | +# ] |
| 51 | +# /// |
| 52 | +``` |
| 53 | + |
| 54 | +When you run `uvx marimo run --sandbox github.py`, uv will check and download Python 3.13, create a new Python virtual environment, and install all required dependencies. |
| 55 | + |
| 56 | +### Step 2: Start the Timeplus Proton Server |
| 57 | + |
| 58 | +To connect to Kafka, you'll first need to run Timeplus Proton: |
| 59 | + |
| 60 | +```bash |
| 61 | +curl https://install.timeplus.com/oss | sh |
| 62 | +./proton server |
| 63 | +``` |
| 64 | + |
| 65 | +This downloads the single binary of Timeplus Proton to your current folder and starts the server. |
| 66 | + |
| 67 | +To connect to Timeplus Proton in the marimo notebook, you can use the web UI or write Python code: |
| 68 | + |
| 69 | +```python |
| 70 | +engine = sqlalchemy.create_engine("timeplus://default:@localhost:8123") |
| 71 | +``` |
| 72 | + |
| 73 | +### Step 3: Connect to Kafka |
| 74 | + |
| 75 | +To query a Kafka topic using SQL, create an external stream in Timeplus Proton: |
| 76 | + |
| 77 | +```sql |
| 78 | +CREATE EXTERNAL STREAM IF NOT EXISTS github_events( |
| 79 | + actor string, |
| 80 | + created_at string, |
| 81 | + id string, |
| 82 | + payload string, |
| 83 | + repo string, |
| 84 | + type string |
| 85 | +) |
| 86 | +SETTINGS type='kafka', |
| 87 | + brokers='{kafka_broker}', |
| 88 | + topic='github_events', |
| 89 | + security_protocol='SASL_SSL', |
| 90 | + sasl_mechanism='SCRAM-SHA-256', |
| 91 | + username='readonly', |
| 92 | + password='{kafka_pwd}', |
| 93 | + skip_ssl_cert_check=true, |
| 94 | + data_format='JSONEachRow', |
| 95 | + one_message_per_row=true |
| 96 | +``` |
| 97 | + |
| 98 | +Notes: |
| 99 | +- Replace `{kafka_broker}` and `{kafka_pwd}` with your actual Kafka broker address and password |
| 100 | +- The Kafka topic contains live GitHub events data in JSON format |
| 101 | +- If you have too many columns or variable schema, you can create a stream with a single string column and parse the JSON at query time |
| 102 | + |
| 103 | +### Step 4: Run Your First SQL Query Against Kafka |
| 104 | + |
| 105 | +Let's count all messages in the Kafka topic: |
| 106 | + |
| 107 | +```sql |
| 108 | +SELECT count() FROM github_events |
| 109 | +``` |
| 110 | + |
| 111 | +This query is optimized by Timeplus to check the offset difference between the first and last Kafka message, providing a fast count without scanning all data. |
| 112 | + |
| 113 | +### Step 5: Visualize Data with marimo |
| 114 | + |
| 115 | +To visualize the count result in marimo, update the output variable of the SQL cell to `cntdf` and create a stat widget: |
| 116 | + |
| 117 | +```python |
| 118 | +mo.stat(cntdf["count()"][0]) |
| 119 | +``` |
| 120 | + |
| 121 | +### Step 6: Implement Auto-Refresh |
| 122 | + |
| 123 | +To automatically refresh the count, add a refresh widget: |
| 124 | + |
| 125 | +```python |
| 126 | +cnt_refresh = mo.ui.refresh(options=["1s","2s"], default_interval="1s") |
| 127 | +cnt_refresh |
| 128 | +``` |
| 129 | + |
| 130 | +Then modify your SQL to reference this refresh widget: |
| 131 | + |
| 132 | +```sql |
| 133 | +-- {cnt_refresh.value} |
| 134 | +SELECT count() FROM github_events |
| 135 | +``` |
| 136 | + |
| 137 | +By adding the comment that references `cnt_refresh.value`, the SQL query will re-run whenever the refresh state changes. |
| 138 | + |
| 139 | +To show the delta between counts: |
| 140 | + |
| 141 | +```python |
| 142 | +# Create a state to track the last count |
| 143 | +last_count = mo.state(0) |
| 144 | + |
| 145 | +# Calculate the delta |
| 146 | +def update_count(current): |
| 147 | + delta = current - last_count.value |
| 148 | + last_count.set(current) |
| 149 | + return delta |
| 150 | + |
| 151 | +# Display with delta as caption |
| 152 | +mo.stat( |
| 153 | + cntdf["count()"][0], |
| 154 | + caption=f"Δ {'+'if update_count(cntdf['count()'][0]) >= 0 else ''}{update_count(cntdf['count()'][0])}" |
| 155 | +) |
| 156 | +``` |
| 157 | + |
| 158 | +### Step 7: Create Interactive Charts |
| 159 | + |
| 160 | +Let's create interactive charts to visualize GitHub event data: |
| 161 | + |
| 162 | +1. Query for top event types: |
| 163 | + |
| 164 | +```sql |
| 165 | +-- {refresh.value} |
| 166 | +with cte as(SELECT top_k(type,10,true) as a FROM github_events limit 1 SETTINGS seek_to='-{range.value}m') |
| 167 | +select a.1 as type, a.2 as cnt from cte array join a |
| 168 | +``` |
| 169 | + |
| 170 | +2. Query for top repositories by event type: |
| 171 | + |
| 172 | +```sql |
| 173 | +-- {refresh.value} |
| 174 | +with cte as(SELECT top_k(repo,10,true) as a FROM github_events {typeWhere} limit 1 SETTINGS seek_to='-{range.value}m') |
| 175 | +select a.1 as repo, a.2 as cnt from cte array join a |
| 176 | +``` |
| 177 | + |
| 178 | +3. Handle selections to filter the bar chart based on pie chart clicks: |
| 179 | + |
| 180 | +```python |
| 181 | +_type=' ' |
| 182 | +if chart_types.selections.get("select_point"): |
| 183 | + _array=chart_types.selections["select_point"].get("type",None) |
| 184 | + if _array: |
| 185 | + _type=f"WHERE type='{_array[0]}'" |
| 186 | +typeWhere=_type |
| 187 | +``` |
| 188 | + |
| 189 | +4. Create the charts using Altair: |
| 190 | + |
| 191 | +```python |
| 192 | +chart_types = mo.ui.altair_chart( |
| 193 | + alt.Chart(df_type, height=150, width=150) |
| 194 | + .mark_arc() |
| 195 | + .encode(theta="cnt", color="type"), |
| 196 | + legend_selection=False |
| 197 | +) |
| 198 | + |
| 199 | +chart_repos = mo.ui.altair_chart( |
| 200 | + alt.Chart(df_hotrepo, height=200) |
| 201 | + .mark_bar() |
| 202 | + .encode(x='cnt', |
| 203 | + y=alt.Y('repo',sort=alt.EncodingSortField(field='cnt',order='descending')),) |
| 204 | +) |
| 205 | +``` |
| 206 | + |
| 207 | +5. Arrange elements in a layout: |
| 208 | + |
| 209 | +```python |
| 210 | +mo.vstack([ |
| 211 | + mo.hstack([range, refresh]), |
| 212 | + mo.hstack([chart_types, chart_repos], widths=[0,1]) |
| 213 | +]) |
| 214 | +``` |
| 215 | + |
| 216 | +## Advanced Features Applied |
| 217 | + |
| 218 | +- **Time-Based Filtering**: Use `seek_to='-{range.value}m'` to analyze data from a specific time window |
| 219 | +- **Optimized Aggregations**: Use [top_k](/functions_for_agg#top_k) for efficient ranking operations |
| 220 | +- **Interactive Filtering**: Link charts for dynamic data exploration |
| 221 | + |
| 222 | +## Conclusion |
| 223 | + |
| 224 | +This tutorial demonstrates how to query and visualize Kafka data using SQL with Timeplus Proton and marimo notebooks. The approach provides a lightweight, SQL-native way to work with streaming data without the overhead of traditional big data systems. |
| 225 | + |
| 226 | +For more information and examples, visit the [Timeplus GitHub repository](https://github.com/timeplus-io/proton/tree/develop/examples/marimo). |
0 commit comments