66use Enqueue \Dsn \Dsn ;
77use InfluxDB \Client ;
88use InfluxDB \Database ;
9+ use InfluxDB \Driver \QueryDriverInterface ;
10+ use InfluxDB \Exception as InfluxDBException ;
911use InfluxDB \Point ;
1012
1113class InfluxDbStorage implements StatsStorage
@@ -38,6 +40,8 @@ class InfluxDbStorage implements StatsStorage
3840 * 'measurementSentMessages' => 'sent-messages',
3941 * 'measurementConsumedMessages' => 'consumed-messages',
4042 * 'measurementConsumers' => 'consumers',
43+ * 'client' => null, # Client instance. Null by default.
44+ * 'retentionPolicy' => null,
4145 * ]
4246 *
4347 * or
@@ -59,6 +63,13 @@ public function __construct($config = 'influxdb:')
5963 } elseif (is_array ($ config )) {
6064 $ config = empty ($ config ['dsn ' ]) ? $ config : $ this ->parseDsn ($ config ['dsn ' ]);
6165 } elseif ($ config instanceof Client) {
66+ // Passing Client instead of array config is deprecated because it prevents setting any configuration values
67+ // and causes library to use defaults.
68+ @trigger_error (
69+ sprintf ('Passing %s as %s argument is deprecated. Pass it as "client" array property instead ' ,
70+ Client::class,
71+ __METHOD__
72+ ), E_USER_DEPRECATED );
6273 $ this ->client = $ config ;
6374 $ config = [];
6475 } else {
@@ -74,8 +85,22 @@ public function __construct($config = 'influxdb:')
7485 'measurementSentMessages ' => 'sent-messages ' ,
7586 'measurementConsumedMessages ' => 'consumed-messages ' ,
7687 'measurementConsumers ' => 'consumers ' ,
88+ 'client ' => null ,
89+ 'retentionPolicy ' => null ,
7790 ], $ config );
7891
92+ if (null !== $ config ['client ' ]) {
93+ if (!$ config ['client ' ] instanceof Client) {
94+ throw new \InvalidArgumentException (sprintf (
95+ '%s configuration property is expected to be an instance of %s class. %s was passed instead. ' ,
96+ 'client ' ,
97+ Client::class,
98+ gettype ($ config ['client ' ])
99+ ));
100+ }
101+ $ this ->client = $ config ['client ' ];
102+ }
103+
79104 $ this ->config = $ config ;
80105 }
81106
@@ -109,7 +134,7 @@ public function pushConsumerStats(ConsumerStats $stats): void
109134 $ points [] = new Point ($ this ->config ['measurementConsumers ' ], null , $ tags , $ values , $ stats ->getTimestampMs ());
110135 }
111136
112- $ this ->getDb ()-> writePoints ( $ points, Database:: PRECISION_MILLISECONDS );
137+ $ this ->doWrite ( $ points );
113138 }
114139
115140 public function pushConsumedMessageStats (ConsumedMessageStats $ stats ): void
@@ -135,7 +160,7 @@ public function pushConsumedMessageStats(ConsumedMessageStats $stats): void
135160 new Point ($ this ->config ['measurementConsumedMessages ' ], $ runtime , $ tags , $ values , $ stats ->getTimestampMs ()),
136161 ];
137162
138- $ this ->getDb ()-> writePoints ( $ points, Database:: PRECISION_MILLISECONDS );
163+ $ this ->doWrite ( $ points );
139164 }
140165
141166 public function pushSentMessageStats (SentMessageStats $ stats ): void
@@ -158,21 +183,44 @@ public function pushSentMessageStats(SentMessageStats $stats): void
158183 new Point ($ this ->config ['measurementSentMessages ' ], 1 , $ tags , [], $ stats ->getTimestampMs ()),
159184 ];
160185
161- $ this ->getDb ()-> writePoints ( $ points, Database:: PRECISION_MILLISECONDS );
186+ $ this ->doWrite ( $ points );
162187 }
163188
164- private function getDb ( ): Database
189+ private function doWrite ( array $ points ): void
165190 {
166- if (null === $ this ->database ) {
167- if (null === $ this ->client ) {
168- $ this ->client = new Client (
169- $ this ->config ['host ' ],
170- $ this ->config ['port ' ],
171- $ this ->config ['user ' ],
172- $ this ->config ['password ' ]
173- );
191+ if (!$ this ->client || $ this ->client ->getDriver () instanceof QueryDriverInterface) {
192+ $ this ->getDb ()->writePoints ($ points , Database::PRECISION_MILLISECONDS , $ this ->config ['retentionPolicy ' ]);
193+ } else {
194+ // Code below mirrors what `writePoints` method of Database does.
195+ try {
196+ $ parameters = [
197+ 'url ' => sprintf ('write?db=%s&precision=%s ' , $ this ->config ['db ' ], Database::PRECISION_MILLISECONDS ),
198+ 'database ' => $ this ->config ['db ' ],
199+ 'method ' => 'post ' ,
200+ ];
201+ if (null !== $ this ->config ['retentionPolicy ' ]) {
202+ $ parameters ['url ' ] .= sprintf ('&rp=%s ' , $ this ->config ['retentionPolicy ' ]);
203+ }
204+
205+ $ this ->client ->write ($ parameters , $ points );
206+ } catch (\Exception $ e ) {
207+ throw new InfluxDBException ($ e ->getMessage (), $ e ->getCode ());
174208 }
209+ }
210+ }
211+
212+ private function getDb (): Database
213+ {
214+ if (null === $ this ->client ) {
215+ $ this ->client = new Client (
216+ $ this ->config ['host ' ],
217+ $ this ->config ['port ' ],
218+ $ this ->config ['user ' ],
219+ $ this ->config ['password ' ]
220+ );
221+ }
175222
223+ if (null === $ this ->database ) {
176224 $ this ->database = $ this ->client ->selectDB ($ this ->config ['db ' ]);
177225 $ this ->database ->create ();
178226 }
@@ -200,6 +248,7 @@ private function parseDsn(string $dsn): array
200248 'measurementSentMessages ' => $ dsn ->getString ('measurementSentMessages ' ),
201249 'measurementConsumedMessages ' => $ dsn ->getString ('measurementConsumedMessages ' ),
202250 'measurementConsumers ' => $ dsn ->getString ('measurementConsumers ' ),
251+ 'retentionPolicy ' => $ dsn ->getString ('retentionPolicy ' ),
203252 ]), function ($ value ) { return null !== $ value ; });
204253 }
205254}
0 commit comments