async-kinesis provides optional metrics collection for monitoring producer and consumer performance. Metrics are disabled by default and have zero overhead when not enabled.
- Quick Start
- Available Metrics
- Metrics Collectors
- Prometheus Integration
- Custom Collectors
- Best Practices
from kinesis import Producer, InMemoryMetricsCollector
# Create a metrics collector
metrics = InMemoryMetricsCollector()
# Use it with a producer
async with Producer(
stream_name="my-stream",
metrics_collector=metrics
) as producer:
await producer.put({"message": "hello"})
# Check collected metrics
print(metrics.get_metrics())
# {'producer_records_sent_total{stream_name=my-stream}': 1.0, ...}from kinesis import Consumer, InMemoryMetricsCollector
metrics = InMemoryMetricsCollector()
async with Consumer(
stream_name="my-stream",
metrics_collector=metrics
) as consumer:
async for record in consumer:
handle(record)
# Per-shard counters of records and bytes successfully enqueued.
print(metrics.counters)
# {'consumer_records_received_total{shard_id=shardId-000000000000,stream_name=my-stream}': 42, ...}Running multiple consumers in the same process? Pass an explicit collector to each. The
global collector set via set_metrics_collector() is shared across every consumer that
doesn't supply its own, which is usually not what you want for multi-stream observability.
# Install with prometheus support
pip install async-kinesis[prometheus]from kinesis import Producer, PrometheusMetricsCollector
from prometheus_client import start_http_server
# Start Prometheus metrics server
start_http_server(8000) # Metrics available at http://localhost:8000
# Create Prometheus collector
metrics = PrometheusMetricsCollector(namespace="my_app")
# Use with producer/consumer
async with Producer(
stream_name="my-stream",
metrics_collector=metrics
) as producer:
await producer.put({"message": "hello"})from kinesis import set_metrics_collector, PrometheusMetricsCollector
# Set globally for all producers/consumers
set_metrics_collector(PrometheusMetricsCollector())
# Now all instances will use this collector by default
async with Producer(stream_name="my-stream") as producer:
# Automatically uses the global collector
await producer.put({"message": "hello"})| Metric | Type | Description | Labels |
|---|---|---|---|
producer_records_sent_total |
Counter | Total records successfully sent | stream_name |
producer_bytes_sent_total |
Counter | Total bytes successfully sent | stream_name |
producer_errors_total |
Counter | Total producer errors | stream_name, error_type |
producer_throttles_total |
Counter | Total throttling errors | stream_name |
producer_batch_size |
Histogram | Size of batches sent | stream_name |
producer_queue_size |
Gauge | Current producer queue size | stream_name |
producer_flush_duration_seconds |
Histogram | Time to flush records | stream_name |
| Metric | Type | Description | Labels |
|---|---|---|---|
consumer_records_received_total |
Counter | Records successfully enqueued to the consumer queue (raw Kinesis rows, pre-deaggregation). Rows dropped mid-batch by a queue timeout are not counted. | stream_name, shard_id |
consumer_bytes_received_total |
Counter | Bytes of Data for rows counted by consumer_records_received_total. |
stream_name, shard_id |
consumer_errors_total |
Counter | Errors in get_records. error_type is connection, timeout, unknown, or the raw AWS error code (e.g. ProvisionedThroughputExceededException, ExpiredIteratorException, InternalFailure). |
stream_name, shard_id, error_type |
consumer_checkpoint_success_total |
Counter | Durable backend-persist operations that returned successfully. Under auto_checkpoint=False, only increments when checkpointer.manual_checkpoint() flushes, not on every per-shard checkpoint call (those buffer without writing). |
stream_name, shard_id |
consumer_checkpoint_failure_total |
Counter | Durable backend-persist operations that raised. Same manual-mode caveat as success. | stream_name, shard_id |
consumer_iterator_age_milliseconds |
Gauge | MillisBehindLatest from the last GetRecords response. Only emitted when the backend populates the field. |
stream_name, shard_id |
consumer_queue_size |
Gauge | Consumer internal queue depth after the enqueue pass. | stream_name |
consumer_lag_records |
Gauge | Planned - not currently emitted. | stream_name, shard_id |
consumer_processing_time_seconds |
Histogram | Planned - not currently emitted. | stream_name, shard_id |
consumer_errors_total uses raw AWS error codes rather than semantic labels. ProvisionedThroughputExceededException and ExpiredIteratorException are recovery events, not failures; alert on connection, unknown, and InternalFailure instead.
Checkpoint counter semantics: consumer_checkpoint_* counters track the underlying backend write (Redis SET, DynamoDB UpdateItem, memory dict write), not the Consumer's per-iteration checkpoint call. Under auto_checkpoint=False (Redis/DynamoDB manual mode), these counters stay flat between manual_checkpoint() flushes and then increment in bursts when the user flushes. Dashboards using rate(consumer_checkpoint_success_total[5m]) will show quiet periods followed by spikes, which is the correct signal: durable writes actually happen in bursts.
Standalone usage: A checkpointer used without a Consumer (e.g. a worker orchestrating its own manual_checkpoint()) defaults to stream_name="<standalone>" until checkpointer.bind_metrics(collector, {"stream_name": "..."}) is called. The sentinel is visible in dashboards; use it to distinguish consumer-wired emissions from direct-use ones.
| Metric | Type | Description | Labels |
|---|---|---|---|
stream_shards_active |
Gauge | Number of active shards | stream_name |
stream_shards_closed |
Gauge | Number of closed shards | stream_name |
stream_resharding_events_total |
Counter | Resharding events detected | stream_name |
The default collector that does nothing. Zero overhead when metrics are not needed.
from kinesis import NoOpMetricsCollector
# Explicitly use no-op (this is the default)
async with Producer(
stream_name="my-stream",
metrics_collector=NoOpMetricsCollector()
) as producer:
# No metrics collected
passSimple collector for testing and debugging. Stores metrics in memory.
from kinesis import InMemoryMetricsCollector
metrics = InMemoryMetricsCollector()
# Use with producer
async with Producer(
stream_name="my-stream",
metrics_collector=metrics
) as producer:
await producer.put({"test": "data"})
# Access metrics
print(f"Records sent: {metrics.counters}")
print(f"Queue sizes: {metrics.gauges}")
print(f"Flush times: {metrics.histograms}")Production-ready Prometheus integration.
from kinesis import PrometheusMetricsCollector
from prometheus_client import CollectorRegistry, start_http_server
# Custom registry (optional)
registry = CollectorRegistry()
metrics = PrometheusMetricsCollector(
namespace="kinesis_app",
registry=registry
)
# Start metrics server
start_http_server(8000, registry=registry)Create your own metrics collector by implementing the MetricsCollector interface:
from kinesis import MetricsCollector, MetricType
from typing import Dict, Optional
class DatadogMetricsCollector(MetricsCollector):
def __init__(self, datadog_client):
self.client = datadog_client
def increment(
self,
metric: MetricType,
value: float = 1,
labels: Optional[Dict[str, str]] = None
) -> None:
tags = [f"{k}:{v}" for k, v in (labels or {}).items()]
self.client.increment(metric.value, value, tags=tags)
def gauge(
self,
metric: MetricType,
value: float,
labels: Optional[Dict[str, str]] = None
) -> None:
tags = [f"{k}:{v}" for k, v in (labels or {}).items()]
self.client.gauge(metric.value, value, tags=tags)
def histogram(
self,
metric: MetricType,
value: float,
labels: Optional[Dict[str, str]] = None
) -> None:
tags = [f"{k}:{v}" for k, v in (labels or {}).items()]
self.client.histogram(metric.value, value, tags=tags)
def timer(self, metric: MetricType, labels: Optional[Dict[str, str]] = None):
# Return a context manager that times the operation
from kinesis.metrics import Timer
return Timer(self.histogram, metric, labels)import logging
from kinesis import PrometheusMetricsCollector, set_metrics_collector
from prometheus_client import start_http_server
# Configure logging
logging.basicConfig(level=logging.INFO)
# Start Prometheus endpoint
start_http_server(8000)
# Set global metrics collector
set_metrics_collector(PrometheusMetricsCollector(
namespace="my_service"
))
# Now all producers/consumers will emit metricsExample Prometheus queries for Grafana:
# Producer throughput
rate(my_service_producer_records_sent_total[5m])
# Producer error rate
rate(my_service_producer_errors_total[5m])
# Consumer iterator age (lag indicator)
my_service_consumer_iterator_age_milliseconds
# Resharding events
increase(my_service_stream_resharding_events_total[1h])
groups:
- name: kinesis
rules:
- alert: HighProducerErrorRate
expr: rate(my_service_producer_errors_total[5m]) > 0.1
for: 5m
annotations:
summary: "High error rate in Kinesis producer"
- alert: HighConsumerIteratorAge
expr: my_service_consumer_iterator_age_milliseconds > 60000
for: 10m
annotations:
summary: "Consumer is more than 60s behind the shard tip"
- alert: ReshardingDetected
expr: increase(my_service_stream_resharding_events_total[5m]) > 0
annotations:
summary: "Kinesis stream resharding detected"import pytest
from kinesis import Producer, InMemoryMetricsCollector
@pytest.mark.asyncio
async def test_producer_metrics():
metrics = InMemoryMetricsCollector()
async with Producer(
stream_name="test-stream",
metrics_collector=metrics
) as producer:
await producer.put({"test": "data"})
await producer.flush()
# Verify metrics
assert metrics.counters.get(
"producer_records_sent_total{stream_name=test-stream}"
) > 0- Metrics collection adds minimal overhead (~1-2%)
- NoOpMetricsCollector has zero overhead
- Prometheus collectors are thread-safe
- Histograms use configurable buckets for efficiency
# Different collectors per stream
metrics_orders = PrometheusMetricsCollector(namespace="orders")
metrics_events = PrometheusMetricsCollector(namespace="events")
async with Producer(
stream_name="orders-stream",
metrics_collector=metrics_orders
) as orders_producer:
# Orders metrics
pass
async with Producer(
stream_name="events-stream",
metrics_collector=metrics_events
) as events_producer:
# Events metrics
pass- Check the metrics endpoint is accessible:
curl http://localhost:8000/metrics- Verify collector is set:
from kinesis import get_metrics_collector
print(type(get_metrics_collector())) # Should not be NoOpMetricsCollector- Ensure records are being processed:
# Force a flush to trigger metrics
await producer.flush()The InMemoryMetricsCollector stores all histogram values. For production, use PrometheusMetricsCollector which uses efficient buckets.
Ensure your labels are strings and don't contain invalid characters:
# Good
labels = {"stream_name": "my-stream", "env": "prod"}
# Bad
labels = {"stream.name": "my-stream"} # Dots may cause issues