Skip to content
This repository was archived by the owner on Feb 16, 2024. It is now read-only.

Commit 824627c

Browse files
committed
Add demo kafka-druid-water-level-data (#76)
## Description Demo is basically finished, Dashboard could be further improved. Data is from https://www.pegelonline.wsv.de/gast/start. It will bulk-load the past 31 days from the API and than start scraping the stations approximately every 2 minutes. Demo is build on top of the existing `kafka-druid-superset-s3` stack. Stacks ftw 🚀 Docs will need to follow as I currently have lots of other docs to write. Run from feature-branch with `stackablectl --additional-demos-file demos/demos-v1.yaml --additional-stacks-file stacks/stacks-v1.yaml demo install kafka-druid-water-level-data` Dashboard looks something like ![Screenshot_20220818_120529](https://user-images.githubusercontent.com/29303194/185370281-cee5ab49-e1a5-431e-8be1-450d3984e7a0.png)
1 parent 0690d66 commit 824627c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+832
-35
lines changed

demos/demos-v1.yaml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,18 @@ demos:
2929
- plainYaml: https://raw.githubusercontent.com/stackabletech/stackablectl/main/demos/kafka-druid-earthquake-data/ingest-test-data.yaml
3030
- plainYaml: https://raw.githubusercontent.com/stackabletech/stackablectl/main/demos/kafka-druid-earthquake-data/create-druid-ingestion-job.yaml
3131
- plainYaml: https://raw.githubusercontent.com/stackabletech/stackablectl/main/demos/kafka-druid-earthquake-data/setup-superset.yaml
32+
kafka-druid-water-level-data:
33+
description: Demo ingesting water level data into Kafka, streaming it into Druid and creating a Superset dashboard
34+
documentation: https://docs.stackable.tech/stackablectl/stable/demos/kafka-druid-water-level-data.html
35+
stackableStack: kafka-druid-superset-s3
36+
labels:
37+
- kafka
38+
- druid
39+
- superset
40+
- minio
41+
- s3
42+
- water-levels
43+
manifests:
44+
- plainYaml: https://raw.githubusercontent.com/stackabletech/stackablectl/main/demos/kafka-druid-water-level-data/ingest-test-data.yaml
45+
- plainYaml: https://raw.githubusercontent.com/stackabletech/stackablectl/main/demos/kafka-druid-water-level-data/create-druid-ingestion-job.yaml
46+
- plainYaml: https://raw.githubusercontent.com/stackabletech/stackablectl/main/demos/kafka-druid-water-level-data/setup-superset.yaml

demos/kafka-druid-earthquake-data/create-druid-ingestion-job.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ spec:
88
spec:
99
containers:
1010
- name: create-druid-ingestion-job
11-
image: python:3.10 # Using same image so caching can happen
11+
image: docker.stackable.tech/stackable/testing-tools:0.1.0-stackable0.1.0
1212
command: ["bash", "-c", "curl -X POST -H 'Content-Type: application/json' -d @/tmp/ingestion-job-spec/ingestion-job-spec.json http://druid-coordinator:8081/druid/indexer/v1/supervisor"]
1313
volumeMounts:
1414
- name: ingestion-job-spec

demos/kafka-druid-earthquake-data/ingest-test-data.yaml

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ spec:
88
spec:
99
containers:
1010
- name: ingest-test-data
11-
image: python:3.10
12-
command: ["bash", "-c", "curl -o earthquake.csv https://repo.stackable.tech/repository/misc/earthquake-data/earthquakes_1950_to_2022.csv && pip install pandas==1.4.2 kafka-python3==3.0.0 && python /tmp/script/script.py"]
11+
image: docker.stackable.tech/stackable/testing-tools:0.1.0-stackable0.1.0
12+
command: ["bash", "-c", "python -u /tmp/script/script.py"]
1313
volumeMounts:
1414
- name: script
1515
mountPath: /tmp/script
@@ -31,17 +31,18 @@ data:
3131
from kafka3 import KafkaProducer
3232
import time
3333
34-
BOOTSTRAP_SERVERS = "kafka:9092"
34+
BOOTSTRAP_SERVERS = "kafka:9092" # For local testing / developing replace it, afterwards change back to kafka:9092
3535
TOPIC = "earthquakes"
36-
CSV_FILE = "earthquake.csv"
36+
CSV_FILE = "https://repo.stackable.tech/repository/misc/earthquake-data/earthquakes_1950_to_2022.csv"
3737
TARGET_RECORDS_PER_SECOND = 1000
3838
3939
print(f"Producing {TARGET_RECORDS_PER_SECOND} records/s from {CSV_FILE} to topic {TOPIC} with bootstrap servers {BOOTSTRAP_SERVERS}\n")
4040
41-
csv_file = pd.DataFrame(pd.read_csv(CSV_FILE, sep=","))
42-
41+
# Create producer first to early error out if Kafka is not ready yet to reduce unnecessary network usage
4342
producer = KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVERS)
4443
44+
csv_file = pd.DataFrame(pd.read_csv(CSV_FILE, sep=","))
45+
4546
for row in csv_file.index:
4647
starttime = time.time()
4748
row_json = csv_file.loc[row].to_json()

demos/kafka-druid-earthquake-data/setup-superset.yaml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ spec:
88
spec:
99
containers:
1010
- name: setup-superset
11-
image: python:3.10
12-
command: ["bash", "-c", "curl -o superset-assets.zip https://raw.githubusercontent.com/stackabletech/stackablectl/main/demos/kafka-druid-earthquake-data/superset-assets.zip && pip install requests==2.22.0 && python /tmp/script/script.py"]
11+
image: docker.stackable.tech/stackable/testing-tools:0.1.0-stackable0.1.0
12+
command: ["bash", "-c", "curl -o superset-assets.zip https://raw.githubusercontent.com/stackabletech/stackablectl/main/demos/kafka-druid-earthquake-data/superset-assets.zip && python -u /tmp/script/script.py"]
1313
volumeMounts:
1414
- name: script
1515
mountPath: /tmp/script
@@ -30,8 +30,7 @@ data:
3030
import logging
3131
import requests
3232
33-
base_url = "http://superset-external:8088"
34-
# base_url = "http://172.18.0.4:31024"
33+
base_url = "http://superset-external:8088" # For local testing / developing replace it, afterwards change back to http://superset-external:8088
3534
username = "admin"
3635
password = "admin"
3736
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
---
2+
apiVersion: batch/v1
3+
kind: Job
4+
metadata:
5+
name: create-druid-ingestion-job
6+
spec:
7+
template:
8+
spec:
9+
containers:
10+
- name: create-druid-ingestion-job
11+
image: docker.stackable.tech/stackable/testing-tools:0.1.0-stackable0.1.0
12+
command: ["bash", "-c", "curl -X POST -H 'Content-Type: application/json' -d @/tmp/ingestion-job-spec/stations-ingestion-job-spec.json http://druid-coordinator:8081/druid/indexer/v1/supervisor && curl -X POST -H 'Content-Type: application/json' -d @/tmp/ingestion-job-spec/measurements-ingestion-job-spec.json http://druid-coordinator:8081/druid/indexer/v1/supervisor"]
13+
volumeMounts:
14+
- name: ingestion-job-spec
15+
mountPath: /tmp/ingestion-job-spec
16+
restartPolicy: OnFailure
17+
volumes:
18+
- name: ingestion-job-spec
19+
configMap:
20+
name: create-druid-ingestion-job-spec
21+
restartPolicy: Never
22+
backoffLimit: 50 # It can take some time until Druid is ready
23+
---
24+
apiVersion: v1
25+
kind: ConfigMap
26+
metadata:
27+
name: create-druid-ingestion-job-spec
28+
data:
29+
stations-ingestion-job-spec.json: |
30+
{
31+
"type": "kafka",
32+
"spec": {
33+
"ioConfig": {
34+
"type": "kafka",
35+
"consumerProperties": {
36+
"bootstrap.servers": "kafka:9092"
37+
},
38+
"topic": "stations",
39+
"inputFormat": {
40+
"type": "json",
41+
"flattenSpec": {
42+
"fields": [
43+
{
44+
"name": "water_longname",
45+
"type": "path",
46+
"expr": "$.water.longname"
47+
},
48+
{
49+
"name": "water_shortname",
50+
"type": "path",
51+
"expr": "$.water.shortname"
52+
}
53+
]
54+
}
55+
},
56+
"useEarliestOffset": true
57+
},
58+
"tuningConfig": {
59+
"type": "kafka"
60+
},
61+
"dataSchema": {
62+
"dataSource": "stations",
63+
"timestampSpec": {
64+
"column": "!!!_no_such_column_!!!",
65+
"missingValue": "2000-01-01T00:00:00Z"
66+
},
67+
"dimensionsSpec": {
68+
"dimensions": [
69+
"water_longname",
70+
"water_shortname",
71+
"uuid",
72+
{
73+
"type": "long",
74+
"name": "number"
75+
},
76+
"shortname",
77+
"longname",
78+
{
79+
"type": "double",
80+
"name": "km"
81+
},
82+
"agency",
83+
{
84+
"type": "double",
85+
"name": "longitude"
86+
},
87+
{
88+
"type": "double",
89+
"name": "latitude"
90+
}
91+
]
92+
},
93+
"granularitySpec": {
94+
"queryGranularity": "none",
95+
"rollup": false,
96+
"segmentGranularity": "all"
97+
}
98+
}
99+
}
100+
}
101+
measurements-ingestion-job-spec.json: |
102+
{
103+
"type": "kafka",
104+
"spec": {
105+
"ioConfig": {
106+
"type": "kafka",
107+
"consumerProperties": {
108+
"bootstrap.servers": "kafka:9092"
109+
},
110+
"topic": "measurements",
111+
"inputFormat": {
112+
"type": "json"
113+
},
114+
"useEarliestOffset": true
115+
},
116+
"tuningConfig": {
117+
"type": "kafka"
118+
},
119+
"dataSchema": {
120+
"dataSource": "measurements",
121+
"timestampSpec": {
122+
"column": "timestamp",
123+
"format": "millis"
124+
},
125+
"dimensionsSpec": {
126+
"dimensions": [
127+
{
128+
"type": "long",
129+
"name": "value"
130+
},
131+
"station_uuid"
132+
]
133+
},
134+
"granularitySpec": {
135+
"queryGranularity": "none",
136+
"rollup": false,
137+
"segmentGranularity": "day"
138+
}
139+
}
140+
}
141+
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
---
2+
apiVersion: batch/v1
3+
kind: Job
4+
metadata:
5+
name: ingest-test-data
6+
spec:
7+
template:
8+
spec:
9+
containers:
10+
- name: ingest-test-data
11+
image: docker.stackable.tech/stackable/testing-tools:0.1.0-stackable0.1.0
12+
command: ["bash", "-c", "python -u /tmp/script/script.py"]
13+
volumeMounts:
14+
- name: script
15+
mountPath: /tmp/script
16+
restartPolicy: OnFailure
17+
volumes:
18+
- name: script
19+
configMap:
20+
name: ingest-test-data-script
21+
restartPolicy: Never
22+
backoffLimit: 50 # It can take some time until Kafka is ready
23+
---
24+
apiVersion: v1
25+
kind: ConfigMap
26+
metadata:
27+
name: ingest-test-data-script
28+
data:
29+
script.py: |
30+
import json
31+
from kafka3 import KafkaProducer
32+
import pandas as pd
33+
import time
34+
35+
HISTORY_DAYS = 31
36+
# At a maximum scrape once every minute.
37+
# In practice it will take longer than a minute to scrape, so the loop will be busy.
38+
# This will result in a continuos stream of data.
39+
LIVE_UPDATE_INTERVAL_S = 60
40+
41+
BOOTSTRAP_SERVERS = "kafka:9092" # For local testing / developing replace it, afterwards change back to kafka:9092
42+
STATIONS_TOPIC = "stations"
43+
MEASUREMENTS_TOPIC = "measurements"
44+
45+
print(f"Producing station records to topic {STATIONS_TOPIC} with bootstrap servers {BOOTSTRAP_SERVERS}\n")
46+
producer = KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVERS)
47+
48+
stations = pd.read_json("https://www.pegelonline.wsv.de/webservices/rest-api/v2/stations.json")
49+
print(f"Stations:\n{stations}")
50+
for station in stations.index:
51+
station_json = stations.loc[station].to_json()
52+
producer.send(STATIONS_TOPIC, str.encode(station_json))
53+
54+
print(f"Producing measurement records of the last {HISTORY_DAYS} days to topic {MEASUREMENTS_TOPIC} with bootstrap servers {BOOTSTRAP_SERVERS}\n")
55+
56+
# Using separate loop to first send stations and the measurements afterwards
57+
for station in stations.index:
58+
station = stations.loc[station]
59+
station_uuid = station["uuid"]
60+
url = f"https://www.pegelonline.wsv.de/webservices/rest-api/v2/stations/{station_uuid}/W/measurements.json?start=P{HISTORY_DAYS}D"
61+
try:
62+
measurements = pd.read_json(url)
63+
except Exception as err:
64+
print(f"[WARN] Could not read measurements for station {station['longname']} ({station_uuid}): {err}")
65+
continue
66+
measurements['station_uuid'] = station_uuid
67+
for measurement in measurements.index:
68+
measurement_json = measurements.loc[measurement].to_json()
69+
producer.send(MEASUREMENTS_TOPIC, str.encode(measurement_json))
70+
71+
print(f"Send {len(measurements)} measurements for station {station['longname']}")
72+
73+
74+
print(f"Finished loading {HISTORY_DAYS} days of historic data, now starting live streaming")
75+
while True:
76+
starttime = time.time()
77+
measurement_counter = 0
78+
measurement_failed_counter = 0
79+
for station_uuid in stations["uuid"]:
80+
url = f"https://www.pegelonline.wsv.de/webservices/rest-api/v2/stations/{station_uuid}/W/currentmeasurement.json"
81+
try:
82+
measurement = pd.read_json(url, typ='series')
83+
except Exception as err:
84+
measurement_failed_counter += 1
85+
continue
86+
87+
measurement = {
88+
"timestamp": int(time.time() * 1000),
89+
"value": measurement["value"],
90+
"station_uuid": station_uuid
91+
}
92+
measurement_json = json.dumps(measurement, separators=(',', ':'))
93+
producer.send(MEASUREMENTS_TOPIC, str.encode(measurement_json))
94+
measurement_counter += 1
95+
print(f"Send {measurement_counter} measurements in {int(time.time() - starttime)}s ({measurement_failed_counter} failed)")
96+
time.sleep(max(0, LIVE_UPDATE_INTERVAL_S - ((time.time() - starttime))))
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
-- For Superset datasets
2+
select
3+
measurements.__time as __time,
4+
measurements."value" as measurement,
5+
stations.agency as agency,
6+
stations.km as km,
7+
stations.latitude as latitude,
8+
stations.longitude as longitude,
9+
stations.longname as longname,
10+
stations.number as number,
11+
stations.shortname as shortname,
12+
stations.water_longname as water_longname,
13+
stations.water_shortname as water_shortname,
14+
measurements.station_uuid as station_uuid
15+
from measurements inner join stations on stations.uuid = measurements.station_uuid
16+
17+
select * from
18+
(
19+
select
20+
station_uuid,
21+
avg("value") as avg_measurement,
22+
latest("value") as current_measurement,
23+
(latest("value") - avg("value")) / avg("value") * 100 as deviation_percent,
24+
round((latest("value") - avg("value")) / avg("value") * 100 / 5) * 5 as deviation_percent_bucketed_5,
25+
round((latest("value") - avg("value")) / avg("value") * 100 / 10) * 10 as deviation_percent_bucketed_10,
26+
round((latest("value") - avg("value")) / avg("value") * 100 / 25) * 25 as deviation_percent_bucketed_25
27+
from measurements
28+
group by 1
29+
) as measurements
30+
inner join stations on stations.uuid = measurements.station_uuid

0 commit comments

Comments
 (0)