Skip to content

Commit ed1f671

Browse files
authored
Merge pull request #716 from uro/feature/174-monitoring-datadog-support
[monitoring] Add support of Datadog
2 parents 290f289 + 961117c commit ed1f671

File tree

7 files changed

+204
-14
lines changed

7 files changed

+204
-14
lines changed

DatadogStorage.php

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Enqueue\Monitoring;
6+
7+
use DataDog\BatchedDogStatsd;
8+
use DataDog\DogStatsd;
9+
use Enqueue\Client\Config;
10+
use Enqueue\Dsn\Dsn;
11+
12+
class DatadogStorage implements StatsStorage
13+
{
14+
/**
15+
* @var array
16+
*/
17+
private $config;
18+
19+
/**
20+
* @var BatchedDogStatsd
21+
*/
22+
private $datadog;
23+
24+
public function __construct($config = 'datadog:')
25+
{
26+
if (false === class_exists(DogStatsd::class)) {
27+
throw new \LogicException('Seems client library is not installed. Please install "datadog/php-datadogstatsd"');
28+
}
29+
30+
$this->config = $this->prepareConfig($config);
31+
32+
if (null === $this->datadog) {
33+
if (true === filter_var($this->config['batched'], FILTER_VALIDATE_BOOLEAN)) {
34+
$this->datadog = new BatchedDogStatsd($this->config);
35+
} else {
36+
$this->datadog = new DogStatsd($this->config);
37+
}
38+
}
39+
}
40+
41+
public function pushConsumerStats(ConsumerStats $stats): void
42+
{
43+
$queues = $stats->getQueues();
44+
array_walk($queues, function (string $queue) use ($stats) {
45+
$tags = [
46+
'queue' => $queue,
47+
'consumerId' => $stats->getConsumerId(),
48+
];
49+
50+
if ($stats->getFinishedAtMs()) {
51+
$values['finishedAtMs'] = $stats->getFinishedAtMs();
52+
}
53+
54+
$this->datadog->gauge($this->config['metric.consumers.started'], (int) $stats->isStarted(), 1, $tags);
55+
$this->datadog->gauge($this->config['metric.consumers.finished'], (int) $stats->isFinished(), 1, $tags);
56+
$this->datadog->gauge($this->config['metric.consumers.failed'], (int) $stats->isFailed(), 1, $tags);
57+
$this->datadog->gauge($this->config['metric.consumers.received'], $stats->getReceived(), 1, $tags);
58+
$this->datadog->gauge($this->config['metric.consumers.acknowledged'], $stats->getAcknowledged(), 1, $tags);
59+
$this->datadog->gauge($this->config['metric.consumers.rejected'], $stats->getRejected(), 1, $tags);
60+
$this->datadog->gauge($this->config['metric.consumers.requeued'], $stats->getRejected(), 1, $tags);
61+
$this->datadog->gauge($this->config['metric.consumers.memoryUsage'], $stats->getMemoryUsage(), 1, $tags);
62+
});
63+
}
64+
65+
public function pushSentMessageStats(SentMessageStats $stats): void
66+
{
67+
$tags = [
68+
'destination' => $stats->getDestination(),
69+
];
70+
71+
$properties = $stats->getProperties();
72+
if (false === empty($properties[Config::TOPIC])) {
73+
$tags['topic'] = $properties[Config::TOPIC];
74+
}
75+
76+
if (false === empty($properties[Config::COMMAND])) {
77+
$tags['command'] = $properties[Config::COMMAND];
78+
}
79+
80+
$this->datadog->increment($this->config['metric.messages.sent'], 1, $tags);
81+
}
82+
83+
public function pushConsumedMessageStats(ConsumedMessageStats $stats): void
84+
{
85+
$tags = [
86+
'queue' => $stats->getQueue(),
87+
'status' => $stats->getStatus(),
88+
];
89+
90+
if (ConsumedMessageStats::STATUS_FAILED === $stats->getStatus()) {
91+
$this->datadog->increment($this->config['metric.messages.failed'], 1, $tags);
92+
}
93+
94+
if ($stats->isRedelivered()) {
95+
$this->datadog->increment($this->config['metric.messages.redelivered'], 1, $tags);
96+
}
97+
98+
$runtime = $stats->getTimestampMs() - $stats->getReceivedAtMs();
99+
$this->datadog->histogram($this->config['metric.messages.consumed'], $runtime, 1, $tags);
100+
}
101+
102+
private function parseDsn(string $dsn): array
103+
{
104+
$dsn = Dsn::parseFirst($dsn);
105+
106+
if ('datadog' !== $dsn->getSchemeProtocol()) {
107+
throw new \LogicException(sprintf(
108+
'The given scheme protocol "%s" is not supported. It must be "datadog"',
109+
$dsn->getSchemeProtocol()
110+
));
111+
}
112+
113+
return array_filter(array_replace($dsn->getQuery(), [
114+
'host' => $dsn->getHost(),
115+
'port' => $dsn->getPort(),
116+
'global_tags' => $dsn->getString('global_tags'),
117+
'batched' => $dsn->getString('batched'),
118+
'metric.messages.sent' => $dsn->getString('metric.messages.sent'),
119+
'metric.messages.consumed' => $dsn->getString('metric.messages.consumed'),
120+
'metric.messages.redelivered' => $dsn->getString('metric.messages.redelivered'),
121+
'metric.messages.failed' => $dsn->getString('metric.messages.failed'),
122+
'metric.consumers.started' => $dsn->getString('metric.consumers.started'),
123+
'metric.consumers.finished' => $dsn->getString('metric.consumers.finished'),
124+
'metric.consumers.failed' => $dsn->getString('metric.consumers.failed'),
125+
'metric.consumers.received' => $dsn->getString('metric.consumers.received'),
126+
'metric.consumers.acknowledged' => $dsn->getString('metric.consumers.acknowledged'),
127+
'metric.consumers.rejected' => $dsn->getString('metric.consumers.rejected'),
128+
'metric.consumers.requeued' => $dsn->getString('metric.consumers.requeued'),
129+
'metric.consumers.memoryUsage' => $dsn->getString('metric.consumers.memoryUsage'),
130+
]), function ($value) {
131+
return null !== $value;
132+
});
133+
}
134+
135+
/**
136+
* @param $config
137+
*
138+
* @return array
139+
*/
140+
private function prepareConfig($config): array
141+
{
142+
if (empty($config)) {
143+
$config = $this->parseDsn('datadog:');
144+
} elseif (\is_string($config)) {
145+
$config = $this->parseDsn($config);
146+
} elseif (\is_array($config)) {
147+
$config = empty($config['dsn']) ? $config : $this->parseDsn($config['dsn']);
148+
} elseif ($config instanceof DogStatsd) {
149+
$this->datadog = $config;
150+
$config = [];
151+
} else {
152+
throw new \LogicException('The config must be either an array of options, a DSN string or null');
153+
}
154+
155+
return array_replace([
156+
'host' => 'localhost',
157+
'port' => 8125,
158+
'batched' => true,
159+
'metric.messages.sent' => 'enqueue.messages.sent',
160+
'metric.messages.consumed' => 'enqueue.messages.consumed',
161+
'metric.messages.redelivered' => 'enqueue.messages.redelivered',
162+
'metric.messages.failed' => 'enqueue.messages.failed',
163+
'metric.consumers.started' => 'enqueue.consumers.started',
164+
'metric.consumers.finished' => 'enqueue.consumers.finished',
165+
'metric.consumers.failed' => 'enqueue.consumers.failed',
166+
'metric.consumers.received' => 'enqueue.consumers.received',
167+
'metric.consumers.acknowledged' => 'enqueue.consumers.acknowledged',
168+
'metric.consumers.rejected' => 'enqueue.consumers.rejected',
169+
'metric.consumers.requeued' => 'enqueue.consumers.requeued',
170+
'metric.consumers.memoryUsage' => 'enqueue.consumers.memoryUsage',
171+
], $config);
172+
}
173+
}

GenericStatsStorageFactory.php

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,22 @@ class GenericStatsStorageFactory implements StatsStorageFactory
1010
{
1111
public function create($config): StatsStorage
1212
{
13-
if (is_string($config)) {
13+
if (\is_string($config)) {
1414
$config = ['dsn' => $config];
1515
}
1616

17-
if (false == is_array($config)) {
17+
if (false === \is_array($config)) {
1818
throw new \InvalidArgumentException('The config must be either array or DSN string.');
1919
}
2020

21-
if (false == array_key_exists('dsn', $config)) {
21+
if (false === array_key_exists('dsn', $config)) {
2222
throw new \InvalidArgumentException('The config must have dsn key set.');
2323
}
2424

2525
$dsn = Dsn::parseFirst($config['dsn']);
2626

2727
if ($storageClass = $this->findStorageClass($dsn, Resources::getKnownStorages())) {
28-
return new $storageClass(1 === count($config) ? $config['dsn'] : $config);
28+
return new $storageClass(1 === \count($config) ? $config['dsn'] : $config);
2929
}
3030

3131
throw new \LogicException(sprintf('A given scheme "%s" is not supported.', $dsn->getScheme()));
@@ -41,7 +41,7 @@ private function findStorageClass(Dsn $dsn, array $factories): ?string
4141
continue;
4242
}
4343

44-
if (false == in_array($protocol, $info['schemes'], true)) {
44+
if (false === \in_array($protocol, $info['schemes'], true)) {
4545
continue;
4646
}
4747

@@ -53,7 +53,7 @@ private function findStorageClass(Dsn $dsn, array $factories): ?string
5353
}
5454

5555
foreach ($factories as $storageClass => $info) {
56-
if (false == in_array($protocol, $info['schemes'], true)) {
56+
if (false === \in_array($protocol, $info['schemes'], true)) {
5757
continue;
5858
}
5959

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ Queue Monitoring tool. Track sent, consumed messages. Consumers performances.
1414
* Could be used with any message queue library.
1515
* Could be intergrated to any PHP framework
1616
* Could send stats to any analytical platform
17-
* Supports Grafana and WAMP out of the box.
17+
* Supports Datadog, InfluxDb, Grafana and WAMP out of the box.
1818
* Provides integration for Enqueue
1919

2020
[![Gitter](https://badges.gitter.im/php-enqueue/Lobby.svg)](https://gitter.im/php-enqueue/Lobby)

Resources.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ public static function getKnownStorages(): array
4242
'supportedSchemeExtensions' => [],
4343
];
4444

45+
$map[DatadogStorage::class] = [
46+
'schemes' => ['datadog'],
47+
'supportedSchemeExtensions' => [],
48+
];
49+
4550
self::$knownStorages = $map;
4651
}
4752

Symfony/DependencyInjection/MonitoringFactory.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ public static function getConfiguration(string $name = 'monitoring'): ArrayNodeD
4242
->info(sprintf('The "%s" option could accept a string DSN, an array with DSN key, or null. It accept extra options. To find out what option you can set, look at stats storage constructor doc block.', $name))
4343
->beforeNormalization()
4444
->always(function ($v) {
45-
if (is_array($v)) {
46-
if (isset($v['storage_factory_class']) && isset($v['storage_factory_service'])) {
45+
if (\is_array($v)) {
46+
if (isset($v['storage_factory_class'], $v['storage_factory_service'])) {
4747
throw new \LogicException('Both options storage_factory_class and storage_factory_service are set. Please choose one.');
4848
}
4949

Tests/GenericStatsStorageFactoryTest.php

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
<?php
22

3+
declare(strict_types=1);
4+
35
namespace Enqueue\Monitoring\Tests;
46

7+
use Enqueue\Monitoring\DatadogStorage;
58
use Enqueue\Monitoring\GenericStatsStorageFactory;
69
use Enqueue\Monitoring\InfluxDbStorage;
710
use Enqueue\Monitoring\StatsStorageFactory;
@@ -13,26 +16,33 @@ class GenericStatsStorageFactoryTest extends TestCase
1316
{
1417
use ClassExtensionTrait;
1518

16-
public function testShouldImplementStatsStorageFactoryInterface()
19+
public function testShouldImplementStatsStorageFactoryInterface(): void
1720
{
1821
$this->assertClassImplements(StatsStorageFactory::class, GenericStatsStorageFactory::class);
1922
}
2023

21-
public function testShouldCreateInfluxDbStorage()
24+
public function testShouldCreateInfluxDbStorage(): void
2225
{
2326
$storage = (new GenericStatsStorageFactory())->create('influxdb:');
2427

2528
$this->assertInstanceOf(InfluxDbStorage::class, $storage);
2629
}
2730

28-
public function testShouldCreateWampStorage()
31+
public function testShouldCreateWampStorage(): void
2932
{
3033
$storage = (new GenericStatsStorageFactory())->create('wamp:');
3134

3235
$this->assertInstanceOf(WampStorage::class, $storage);
3336
}
3437

35-
public function testShouldThrowIfStorageIsNotSupported()
38+
public function testShouldCreateDatadogStorage(): void
39+
{
40+
$storage = (new GenericStatsStorageFactory())->create('datadog:');
41+
42+
$this->assertInstanceOf(DatadogStorage::class, $storage);
43+
}
44+
45+
public function testShouldThrowIfStorageIsNotSupported(): void
3646
{
3747
$this->expectException(\LogicException::class);
3848
$this->expectExceptionMessage('A given scheme "unsupported" is not supported.');

composer.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@
1515
"phpunit/phpunit": "~5.4.0",
1616
"enqueue/test": "0.9.x-dev",
1717
"influxdb/influxdb-php": "^1.14",
18+
"datadog/php-datadogstatsd": "^1.3",
1819
"thruway/client": "^0.5",
1920
"thruway/pawl-transport": "^0.5"
2021
},
2122
"suggest": {
2223
"thruway/client": "Client for Thruway and the WAMP (Web Application Messaging Protocol).",
2324
"thruway/pawl-transport": "Pawl WebSocket Transport for Thruway Client",
24-
"influxdb/influxdb-php": "A PHP Client for InfluxDB, a time series database"
25+
"influxdb/influxdb-php": "A PHP Client for InfluxDB, a time series database",
26+
"datadog/php-datadogstatsd": "Datadog monitoring tool PHP integration"
2527
},
2628
"support": {
2729
"email": "opensource@forma-pro.com",

0 commit comments

Comments
 (0)