-
-
Notifications
You must be signed in to change notification settings - Fork 296
export RQ statistics as prometheus metrics #666
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
032fe34
c5c67a9
44ca5b2
af477bd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,59 @@ | ||
| from rq.job import JobStatus | ||
|
|
||
| from .queues import filter_connection_params, get_connection, get_queue, get_unique_connection_configs | ||
| from .workers import get_worker_class | ||
|
|
||
| try: | ||
| from prometheus_client import Summary | ||
| from prometheus_client.core import GaugeMetricFamily, CounterMetricFamily | ||
|
|
||
| class RQCollector: | ||
terencehonles marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| """RQ stats collector""" | ||
|
|
||
| summary = Summary('rq_request_processing_seconds_total', 'Time spent collecting RQ data') | ||
|
|
||
| def collect(self): | ||
| from .settings import QUEUES | ||
|
|
||
| with self.summary.time(): | ||
| rq_workers = GaugeMetricFamily('rq_workers', 'RQ workers', labels=['name', 'state', 'queues']) | ||
| rq_job_successful_total = CounterMetricFamily('rq_job_successful_total', 'RQ successful job count', labels=['name', 'queues']) | ||
| rq_job_failed_total = CounterMetricFamily('rq_job_failed_total', 'RQ failed job count', labels=['name', 'queues']) | ||
| rq_working_seconds_total = CounterMetricFamily('rq_working_seconds_total', 'RQ total working time', labels=['name', 'queues']) | ||
|
|
||
| rq_jobs = GaugeMetricFamily('rq_jobs', 'RQ jobs by status', labels=['queue', 'status']) | ||
|
|
||
| worker_class = get_worker_class() | ||
| unique_configs = get_unique_connection_configs() | ||
| connections = {} | ||
| for queue_name, config in QUEUES.items(): | ||
| index = unique_configs.index(filter_connection_params(config)) | ||
| if index not in connections: | ||
| connections[index] = connection = get_connection(queue_name) | ||
|
|
||
| for worker in worker_class.all(connection): | ||
| name = worker.name | ||
| label_queues = ','.join(worker.queue_names()) | ||
| rq_workers.add_metric([name, worker.get_state(), label_queues], 1) | ||
| rq_job_successful_total.add_metric([name, label_queues], worker.successful_job_count) | ||
| rq_job_failed_total.add_metric([name, label_queues], worker.failed_job_count) | ||
| rq_working_seconds_total.add_metric([name, label_queues], worker.total_working_time) | ||
| else: | ||
| connection = connections[index] | ||
|
|
||
| queue = get_queue(queue_name, connection=connection) | ||
| rq_jobs.add_metric([queue_name, JobStatus.QUEUED], queue.count) | ||
| rq_jobs.add_metric([queue_name, JobStatus.STARTED], queue.started_job_registry.count) | ||
| rq_jobs.add_metric([queue_name, JobStatus.FINISHED], queue.finished_job_registry.count) | ||
| rq_jobs.add_metric([queue_name, JobStatus.FAILED], queue.failed_job_registry.count) | ||
| rq_jobs.add_metric([queue_name, JobStatus.DEFERRED], queue.deferred_job_registry.count) | ||
| rq_jobs.add_metric([queue_name, JobStatus.SCHEDULED], queue.scheduled_job_registry.count) | ||
|
|
||
| yield rq_workers | ||
| yield rq_job_successful_total | ||
| yield rq_job_failed_total | ||
| yield rq_working_seconds_total | ||
| yield rq_jobs | ||
|
|
||
| except ImportError: | ||
| RQCollector = None # type: ignore[assignment, misc] | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,56 @@ | ||
| from django.contrib import admin | ||
| from django.contrib.admin.views.decorators import staff_member_required | ||
| from django.http import Http404, HttpResponse, JsonResponse | ||
| from django.shortcuts import render | ||
| from django.views.decorators.cache import never_cache | ||
|
|
||
| from .settings import API_TOKEN | ||
| from .utils import get_scheduler_statistics, get_statistics | ||
|
|
||
| try: | ||
| import prometheus_client | ||
|
|
||
| from .metrics_collector import RQCollector | ||
| except ImportError: | ||
| prometheus_client = RQCollector = None # type: ignore[assignment, misc] | ||
|
|
||
| registry = None | ||
|
|
||
|
|
||
| @never_cache | ||
| @staff_member_required | ||
| def prometheus_metrics(request): | ||
| global registry | ||
|
|
||
| if not RQCollector: # type: ignore[truthy-function] | ||
| raise Http404 | ||
|
||
|
|
||
| if not registry: | ||
| registry = prometheus_client.CollectorRegistry(auto_describe=True) | ||
| registry.register(RQCollector()) | ||
|
|
||
| encoder, content_type = prometheus_client.exposition.choose_encoder(request.META.get('HTTP_ACCEPT', '')) | ||
| if 'name[]' in request.GET: | ||
| registry = registry.restricted_registry(request.GET.getlist('name[]')) | ||
|
|
||
| return HttpResponse(encoder(registry), headers={'Content-Type': content_type}) | ||
|
|
||
|
|
||
| @never_cache | ||
| @staff_member_required | ||
| def stats(request): | ||
| context_data = { | ||
| **admin.site.each_context(request), | ||
| **get_statistics(run_maintenance_tasks=True), | ||
| **get_scheduler_statistics(), | ||
| } | ||
| return render(request, 'django_rq/stats.html', context_data) | ||
|
|
||
|
|
||
| def stats_json(request, token=None): | ||
| if request.user.is_staff or (token and token == API_TOKEN): | ||
| return JsonResponse(get_statistics()) | ||
|
|
||
| return JsonResponse( | ||
| {"error": True, "description": "Please configure API_TOKEN in settings.py before accessing this view."} | ||
| ) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,135 @@ | ||
| import os | ||
| from unittest import skipIf | ||
| from unittest.mock import patch | ||
|
|
||
| from django.contrib.auth.models import User | ||
| from django.test import TestCase, override_settings | ||
| from django.test.client import Client | ||
| from django.urls import NoReverseMatch, reverse | ||
|
|
||
| from django_rq import get_queue | ||
| from django_rq.workers import get_worker | ||
|
|
||
| from .fixtures import access_self, failing_job | ||
|
|
||
| try: | ||
| import prometheus_client | ||
| except ImportError: | ||
| prometheus_client = None | ||
|
|
||
| RQ_QUEUES = { | ||
| 'default': { | ||
| 'HOST': os.environ.get('REDIS_HOST', 'localhost'), | ||
| 'PORT': 6379, | ||
| 'DB': 0, | ||
| }, | ||
| } | ||
|
|
||
|
|
||
| @skipIf(prometheus_client is None, 'prometheus_client is required') | ||
| @override_settings(RQ={'AUTOCOMMIT': True}) | ||
| class PrometheusTest(TestCase): | ||
| def setUp(self): | ||
| self.user = User.objects.create_user('foo', password='pass') | ||
| self.user.is_staff = True | ||
| self.user.is_active = True | ||
| self.user.save() | ||
| self.client = Client() | ||
| self.client.force_login(self.user) | ||
| get_queue('default').connection.flushall() | ||
|
|
||
| def assertMetricsContain(self, lines): | ||
| response = self.client.get(reverse('rq_metrics')) | ||
| self.assertEqual(response.status_code, 200) | ||
| self.assertLessEqual( | ||
| lines, set(response.content.decode('utf-8').splitlines()) | ||
| ) | ||
|
|
||
| @patch('django_rq.settings.QUEUES', RQ_QUEUES) | ||
| def test_metrics_default(self): | ||
| self.assertMetricsContain( | ||
| { | ||
| '# HELP rq_jobs RQ jobs by status', | ||
| 'rq_jobs{queue="default",status="queued"} 0.0', | ||
| 'rq_jobs{queue="default",status="started"} 0.0', | ||
| 'rq_jobs{queue="default",status="finished"} 0.0', | ||
| 'rq_jobs{queue="default",status="failed"} 0.0', | ||
| 'rq_jobs{queue="default",status="deferred"} 0.0', | ||
| 'rq_jobs{queue="default",status="scheduled"} 0.0', | ||
| } | ||
| ) | ||
|
|
||
| @patch('django_rq.settings.QUEUES', RQ_QUEUES) | ||
| def test_metrics_with_jobs(self): | ||
| queue = get_queue('default') | ||
| queue.enqueue(failing_job) | ||
|
|
||
| for _ in range(10): | ||
| queue.enqueue(access_self) | ||
|
|
||
| worker = get_worker('default', name='test_worker') | ||
| worker.register_birth() | ||
|
|
||
| # override worker registration to effectively simulate non burst mode | ||
| register_death = worker.register_death | ||
| worker.register_birth = worker.register_death = lambda: None # type: ignore[method-assign] | ||
|
|
||
| try: | ||
| self.assertMetricsContain( | ||
| { | ||
| # job information | ||
| '# HELP rq_jobs RQ jobs by status', | ||
| 'rq_jobs{queue="default",status="queued"} 11.0', | ||
| 'rq_jobs{queue="default",status="started"} 0.0', | ||
| 'rq_jobs{queue="default",status="finished"} 0.0', | ||
| 'rq_jobs{queue="default",status="failed"} 0.0', | ||
| 'rq_jobs{queue="default",status="deferred"} 0.0', | ||
| 'rq_jobs{queue="default",status="scheduled"} 0.0', | ||
| # worker information | ||
| '# HELP rq_workers RQ workers', | ||
| 'rq_workers{name="test_worker",queues="default",state="?"} 1.0', | ||
| '# HELP rq_job_successful_total RQ successful job count', | ||
| 'rq_job_successful_total{name="test_worker",queues="default"} 0.0', | ||
| '# HELP rq_job_failed_total RQ failed job count', | ||
| 'rq_job_failed_total{name="test_worker",queues="default"} 0.0', | ||
| '# HELP rq_working_seconds_total RQ total working time', | ||
| 'rq_working_seconds_total{name="test_worker",queues="default"} 0.0', | ||
| } | ||
| ) | ||
|
|
||
| worker.work(burst=True, max_jobs=4) | ||
| self.assertMetricsContain( | ||
| { | ||
| # job information | ||
| 'rq_jobs{queue="default",status="queued"} 7.0', | ||
| 'rq_jobs{queue="default",status="finished"} 3.0', | ||
| 'rq_jobs{queue="default",status="failed"} 1.0', | ||
| # worker information | ||
| 'rq_workers{name="test_worker",queues="default",state="idle"} 1.0', | ||
| 'rq_job_successful_total{name="test_worker",queues="default"} 3.0', | ||
| 'rq_job_failed_total{name="test_worker",queues="default"} 1.0', | ||
| } | ||
| ) | ||
|
|
||
| worker.work(burst=True) | ||
| self.assertMetricsContain( | ||
| { | ||
| # job information | ||
| 'rq_jobs{queue="default",status="queued"} 0.0', | ||
| 'rq_jobs{queue="default",status="finished"} 10.0', | ||
| 'rq_jobs{queue="default",status="failed"} 1.0', | ||
| # worker information | ||
| 'rq_workers{name="test_worker",queues="default",state="idle"} 1.0', | ||
| 'rq_job_successful_total{name="test_worker",queues="default"} 10.0', | ||
| 'rq_job_failed_total{name="test_worker",queues="default"} 1.0', | ||
| } | ||
| ) | ||
| finally: | ||
| register_death() | ||
|
|
||
|
|
||
| @skipIf(prometheus_client is not None, 'prometheus_client is installed') | ||
| class NoPrometheusTest(TestCase): | ||
| def test_no_metrics_without_prometheus_client(self): | ||
| with self.assertRaises(NoReverseMatch): | ||
| reverse('rq_metrics') |
Uh oh!
There was an error while loading. Please reload this page.