Skip to content

Commit 90e34d7

Browse files
Sync with latest MessageBus (#2)
* Compatibility with updated MessageBus * Review * Improved insert * Types of row * Transaction class * Removing no needed types * Imrpovements after review --------- Co-authored-by: Valentin Udaltsov <udaltsov.valentin@gmail.com>
1 parent 46e726a commit 90e34d7

File tree

4 files changed

+221
-255
lines changed

4 files changed

+221
-255
lines changed

composer.lock

Lines changed: 79 additions & 141 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/PostgresLazyTransaction.php

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Thesis\MessageBus\Persistence\Postgres;
6+
7+
use Amp\Postgres\PostgresByteA;
8+
use Amp\Postgres\PostgresTransaction;
9+
use Amp\Sql\SqlTransactionError;
10+
use Thesis\MessageBus\Endpoint;
11+
use Thesis\MessageBus\Persistence\LazyTransaction;
12+
use Thesis\MessageBus\Persistence\OutboxAlreadyExists;
13+
use Thesis\MessageBus\Persistence\TransactionClosed;
14+
15+
/**
16+
* @internal
17+
* @psalm-internal Thesis\MessageBus\Persistence\Postgres
18+
* @implements LazyTransaction<PostgresTransaction>
19+
*/
20+
final class PostgresLazyTransaction implements LazyTransaction
21+
{
22+
private ?PostgresTransaction $postgresTransaction = null;
23+
24+
public object $transaction {
25+
get => $this->postgresTransaction ??= ($this->begin)();
26+
}
27+
28+
/**
29+
* @param \Closure(): PostgresTransaction $begin
30+
* @param non-empty-string $outboxTable
31+
*/
32+
public function __construct(
33+
private readonly \Closure $begin,
34+
private readonly string $outboxTable,
35+
private readonly Endpoint $endpoint,
36+
) {}
37+
38+
public function recordOutboxes(array $outboxes): void
39+
{
40+
$placeholderGroups = [];
41+
$params = [];
42+
43+
foreach ($outboxes as $outbox) {
44+
$placeholderGroups[] = '(?, ?, ?, ?, ' . ($outbox->dispatched ? 'now()' : 'null') . ')';
45+
46+
$params[] = $this->endpoint->toString();
47+
$params[] = $outbox->incomingMessageId;
48+
$params[] = new PostgresByteA(serialize($outbox->commands));
49+
$params[] = new PostgresByteA(serialize($outbox->events));
50+
}
51+
52+
$placeholders = implode(',', $placeholderGroups);
53+
54+
try {
55+
$result = $this->transaction->execute(
56+
<<<SQL
57+
insert into {$this->outboxTable} (endpoint, incoming_message_id, commands, events, dispatched_at)
58+
values {$placeholders}
59+
on conflict (endpoint, incoming_message_id) do nothing
60+
SQL,
61+
$params,
62+
);
63+
} catch (SqlTransactionError $exception) {
64+
throw new TransactionClosed(previous: $exception);
65+
}
66+
67+
if ($result->getRowCount() !== \count($outboxes)) {
68+
throw new OutboxAlreadyExists();
69+
}
70+
}
71+
72+
public function commitIfBegun(): void
73+
{
74+
try {
75+
$this->postgresTransaction?->commit();
76+
} catch (SqlTransactionError $exception) {
77+
throw new TransactionClosed(previous: $exception);
78+
}
79+
}
80+
81+
public function rollbackIfBegun(): void
82+
{
83+
try {
84+
$this->postgresTransaction?->rollback();
85+
} catch (SqlTransactionError $exception) {
86+
throw new TransactionClosed(previous: $exception);
87+
}
88+
}
89+
}

0 commit comments

Comments
 (0)