Skip to content

Commit 062f0a8

Browse files
authored
Merge pull request #606 from php-enqueue/metrics
Queue monitoring.
2 parents dbeef43 + bbd54e7 commit 062f0a8

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+2346
-94
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ Features:
9898
* [Yii2. Amqp driver](docs/yii/amqp_driver.md)
9999
* [Message bus](docs/quick_tour.md#client) support.
100100
* [RPC over MQ](docs/quick_tour.md#remote-procedure-call-rpc) support.
101+
* [Monitoring](monitoring.md)
101102
* Temporary queues support.
102103
* Well designed, decoupled and reusable components.
103104
* Carefully tested (unit & functional).

composer.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@
3232
"richardfullmer/rabbitmq-management-api": "^2.0",
3333
"predis/predis": "^1.1",
3434
"thruway/pawl-transport": "^0.5.0",
35-
"voryx/thruway": "^0.5.3"
35+
"voryx/thruway": "^0.5.3",
36+
"influxdb/influxdb-php": "^1.14"
3637
},
3738
"require-dev": {
3839
"phpunit/phpunit": "^5.5",
@@ -78,6 +79,7 @@
7879
"Enqueue\\Test\\": "pkg/test/",
7980
"Enqueue\\Dsn\\": "pkg/dsn/",
8081
"Enqueue\\Wamp\\": "pkg/wamp/",
82+
"Enqueue\\Monitoring\\": "pkg/monitoring/",
8183
"Enqueue\\": "pkg/enqueue/"
8284
},
8385
"exclude-from-classmap": [

docker-compose.yml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,3 +125,16 @@ services:
125125
HOSTNAME_EXTERNAL: 'localstack'
126126
SERVICES: 'sqs'
127127

128+
influxdb:
129+
image: 'influxdb:latest'
130+
131+
chronograf:
132+
image: 'chronograf:latest'
133+
entrypoint: 'chronograf --influxdb-url=http://influxdb:8086'
134+
ports:
135+
- '8888:8888'
136+
137+
grafana:
138+
image: 'grafana/grafana:latest'
139+
ports:
140+
- '3000:3000'

docs/images/grafana_monitoring.jpg

161 KB
Loading

docs/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ Enqueue is an MIT-licensed open source project with its ongoing development made
6262
- [AMQP Interop driver](yii/amqp_driver.md)
6363
* [EnqueueElasticaBundle. Overview](elastica-bundle/overview.md)
6464
* [DSN Parser](dsn.md)
65+
* [Monitoring](monitoring.md)
6566
* [Use cases](#use-cases)
6667
- [Symfony. Async event dispatcher](async_event_dispatcher/quick_tour.md)
6768
- [Monolog. Send messages to message queue](monolog/send-messages-to-mq.md)

docs/monitoring.md

Lines changed: 296 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,296 @@
1+
<h2 align="center">Supporting Enqueue</h2>
2+
3+
Enqueue is an MIT-licensed open source project with its ongoing development made possible entirely by the support of community and our customers. If you'd like to join them, please consider:
4+
5+
- [Become a sponsor](https://www.patreon.com/makasim)
6+
- [Become our client](http://forma-pro.com/)
7+
8+
---
9+
10+
# Monitoring.
11+
12+
Enqueue provides a tool for monitoring your queues.
13+
With it, you can control how many messages were sent, how many processed successful or failed.
14+
How many consumers are working, their up time, processed messages stats, memory usage and system load.
15+
The tool could be integrated with virtually any analytics and monitoring platform.
16+
There are several integration:
17+
* [InfluxDB](https://www.influxdata.com/) and [Grafana](https://grafana.com/)
18+
* [WAMP (Web Application Messaging Protocol)](https://wamp-proto.org/)
19+
20+
We are working on a JS\WAMP based real-time UI tool, for more information please [contact us]([email protected]).
21+
22+
![Grafana Monitoring](images/grafana_monitoring.jpg)
23+
24+
* [Installation](#installation)
25+
* [Track sent messages](#track-sent-messages)
26+
* [Track consumed message](#track-consumed-message)
27+
* [Track consumer metrics](#track-consumer-metrics)
28+
* [Consumption extension](#consumption-extension)
29+
* [Enqueue Client Extension](#enqueue-client-extension)
30+
* [InfluxDB Storage](#influxdb-storage)
31+
* [WAMP (Web Socket Messaging Protocol) Storage](#wamp-(web-socket-messaging-protocol)-storage)
32+
* [Symfony App](#symfony-app)
33+
34+
## Installation
35+
36+
```bash
37+
composer req enqueue/monitoring:0.9.x-dev
38+
```
39+
40+
## Track sent messages
41+
42+
```php
43+
<?php
44+
use Enqueue\Monitoring\SentMessageStats;
45+
use Enqueue\Monitoring\GenericStatsStorageFactory;
46+
47+
$statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1:8086?db=foo');
48+
$statsStorage->pushSentMessageStats(new SentMessageStats(
49+
(int) (microtime(true) * 1000), // timestamp
50+
'queue_name', // queue
51+
'aMessageId',
52+
'aCorrelationId',
53+
[], // headers
54+
[] // properties
55+
));
56+
```
57+
58+
or, if you work with [Queue Interop](https://github.com/queue-interop/queue-interop) transport here's how you can track a message sent
59+
60+
```php
61+
<?php
62+
use Interop\Queue\Context;
63+
use Enqueue\Monitoring\SentMessageStats;
64+
use Enqueue\Monitoring\GenericStatsStorageFactory;
65+
66+
/** @var Context $context */
67+
68+
$queue = $context->createQueue('foo');
69+
$message = $context->createMessage('body');
70+
71+
$context->createProducer()->send($queue, $message);
72+
73+
$statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1:8086?db=foo');
74+
$statsStorage->pushSentMessageStats(new SentMessageStats(
75+
(int) (microtime(true) * 1000),
76+
$queue->getQueueName(),
77+
$message->getMessageId(),
78+
$message->getCorrelationId(),
79+
$message->getHeaders()[],
80+
$message->getProperties()
81+
));
82+
```
83+
84+
## Track consumed message
85+
86+
```php
87+
<?php
88+
use Enqueue\Monitoring\ConsumedMessageStats;
89+
use Enqueue\Monitoring\GenericStatsStorageFactory;
90+
91+
$receivedAt = (int) (microtime(true) * 1000);
92+
93+
// heavy processing here.
94+
95+
$statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1:8086?db=foo');
96+
$statsStorage->pushConsumedMessageStats(new ConsumedMessageStats(
97+
'consumerId',
98+
(int) (microtime(true) * 1000), // now
99+
$receivedAt,
100+
'aQueue',
101+
'aMessageId',
102+
'aCorrelationId',
103+
[], // headers
104+
[], // properties
105+
false, // redelivered or not
106+
ConsumedMessageStats::STATUS_ACK
107+
));
108+
```
109+
110+
or, if you work with [Queue Interop](https://github.com/queue-interop/queue-interop) transport here's how you can track a message sent
111+
112+
```php
113+
<?php
114+
use Interop\Queue\Context;
115+
use Enqueue\Monitoring\ConsumedMessageStats;
116+
use Enqueue\Monitoring\GenericStatsStorageFactory;
117+
118+
/** @var Context $context */
119+
120+
$queue = $context->createQueue('foo');
121+
122+
$consumer = $context->createConsumer($queue);
123+
124+
$consumerId = uniqid('consumer-id', true); // we suggest using UUID here
125+
if ($message = $consumer->receiveNoWait()) {
126+
$receivedAt = (int) (microtime(true) * 1000);
127+
128+
// heavy processing here.
129+
130+
$consumer->acknowledge($message);
131+
132+
$statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1:8086?db=foo');
133+
$statsStorage->pushConsumedMessageStats(new ConsumedMessageStats(
134+
$consumerId,
135+
(int) (microtime(true) * 1000), // now
136+
$receivedAt,
137+
$queue->getQueueName(),
138+
$message->getMessageId(),
139+
$message->getCorrelationId(),
140+
$message->getHeaders(),
141+
$message->getProperties(),
142+
$message->isRedelivered(),
143+
ConsumedMessageStats::STATUS_ACK
144+
));
145+
}
146+
```
147+
148+
## Track consumer metrics
149+
150+
Consumers are long running processes. It vital to know how many of them are running right now, how they perform, how much memory do they use and so.
151+
This example shows how you can send such metrics.
152+
Call this code from time to time between processing messages.
153+
154+
```php
155+
<?php
156+
use Enqueue\Monitoring\ConsumerStats;
157+
use Enqueue\Monitoring\GenericStatsStorageFactory;
158+
159+
$startedAt = (int) (microtime(true) * 1000);
160+
161+
$statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1:8086?db=foo');
162+
$statsStorage->pushConsumerStats(new ConsumerStats(
163+
'consumerId',
164+
(int) (microtime(true) * 1000), // now
165+
$startedAt,
166+
null, // finished at
167+
true, // is started?
168+
false, // is finished?
169+
false, // is failed
170+
['foo'], // consume from queues
171+
123, // received messages
172+
120, // acknowledged messages
173+
1, // rejected messages
174+
1, // requeued messages
175+
memory_get_usage(true),
176+
sys_getloadavg()[0]
177+
));
178+
```
179+
180+
## Consumption extension
181+
182+
There is an extension `ConsumerMonitoringExtension` for Enqueue [QueueConsumer](quick_tour.md#consumption).
183+
It could collect consumed messages and consumer stats for you.
184+
185+
```php
186+
<?php
187+
use Enqueue\Consumption\QueueConsumer;
188+
use Enqueue\Consumption\ChainExtension;
189+
use Enqueue\Monitoring\ConsumerMonitoringExtension;
190+
use Enqueue\Monitoring\GenericStatsStorageFactory;
191+
use Interop\Queue\Context;
192+
193+
/** @var Context $context */
194+
195+
$statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1:8086?db=foo');
196+
197+
$queueConsumer = new QueueConsumer($context, new ChainExtension([
198+
new ConsumerMonitoringExtension($statsStorage)
199+
]));
200+
201+
// bind
202+
203+
// consume
204+
```
205+
206+
## Enqueue Client Extension
207+
208+
There is an extension ClientMonitoringExtension for Enqueue [Client](quick_tour.md#client) too. It could collect sent messages stats for you.
209+
210+
## InfluxDB Storage
211+
212+
Install additional packages:
213+
214+
```
215+
composer req influxdb/influxdb-php:^1.14
216+
```
217+
218+
```php
219+
<?php
220+
use Enqueue\Monitoring\GenericStatsStorageFactory;
221+
222+
$statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1:8086?db=foo');
223+
```
224+
225+
There are available options:
226+
227+
```
228+
* 'host' => '127.0.0.1',
229+
* 'port' => '8086',
230+
* 'user' => '',
231+
* 'password' => '',
232+
* 'db' => 'enqueue',
233+
* 'measurementSentMessages' => 'sent-messages',
234+
* 'measurementConsumedMessages' => 'consumed-messages',
235+
* 'measurementConsumers' => 'consumers',
236+
```
237+
238+
## WAMP (Web Socket Messaging Protocol) Storage
239+
240+
Install additional packages:
241+
242+
```
243+
composer req thruway/pawl-transport:^0.5.0 voryx/thruway:^0.5.3
244+
```
245+
246+
```php
247+
<?php
248+
use Enqueue\Monitoring\GenericStatsStorageFactory;
249+
250+
$statsStorage = (new GenericStatsStorageFactory())->create('wamp://127.0.0.1:9090?topic=stats');
251+
```
252+
253+
There are available options:
254+
255+
```
256+
* 'host' => '127.0.0.1',
257+
* 'port' => '9090',
258+
* 'topic' => 'stats',
259+
* 'max_retries' => 15,
260+
* 'initial_retry_delay' => 1.5,
261+
* 'max_retry_delay' => 300,
262+
* 'retry_delay_growth' => 1.5,
263+
```
264+
265+
## Symfony App
266+
267+
You have to register some services in order to incorporate monitoring facilities into your Symfony application.
268+
269+
```yaml
270+
services:
271+
Enqueue\Monitoring\GenericStatsStorageFactory: ~
272+
273+
Enqueue\Monitoring\StatsStorage:
274+
factory: ['@Enqueue\Monitoring\GenericStatsStorageFactory', 'create']
275+
arguments: ['influxdb://127.0.0.1:8086?db=foo']
276+
277+
Enqueue\Monitoring\ConsumerMonitoringExtension:
278+
arguments:
279+
- '@Enqueue\Monitoring\StatsStorage'
280+
tags:
281+
# if you want to monitor transport consumer
282+
- { name: 'enqueue.transport.consumption_extension', transport: 'default' }
283+
284+
# if you want to monitor client consumer
285+
- { name: 'enqueue.consumption_extension', client: 'default' }
286+
287+
# if you want to monitor sent messages
288+
Enqueue\Monitoring\ClientMonitoringExtension:
289+
arguments:
290+
- '@Enqueue\Monitoring\StatsStorage'
291+
- '@logger'
292+
tags:
293+
- { name: 'enqueue.client_extension', client: 'default' }
294+
```
295+
296+
[back to index](index.md)

docs/quick_tour.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ Enqueue is an MIT-licensed open source project with its ongoing development made
1414
* [Remote Procedure Call (RPC)](#remote-procedure-call-rpc)
1515
* [Client](#client)
1616
* [Cli commands](#cli-commands)
17+
* [Monitoring](#monitoring)
1718

1819
## Transport
1920

@@ -281,4 +282,8 @@ and starts the consumption from the console:
281282
$ app.php consume
282283
```
283284

285+
## Monitoring
286+
287+
There is a tool that can track sent\consumed messages as well as consumer performance. Read more [here](monitoring.md)
288+
284289
[back to index](index.md)

0 commit comments

Comments
 (0)