Skip to content

Commit 8c8b6d3

Browse files
committed
Imrpovements after review
1 parent c064236 commit 8c8b6d3

File tree

2 files changed

+34
-32
lines changed

2 files changed

+34
-32
lines changed

src/PostgresLazyTransaction.php

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,8 @@
88
use Amp\Postgres\PostgresTransaction;
99
use Amp\Sql\SqlTransactionError;
1010
use Thesis\MessageBus\Endpoint;
11-
use Thesis\MessageBus\Persistence\Outbox;
12-
use Thesis\MessageBus\Persistence\OutboxAlreadyExists;
1311
use Thesis\MessageBus\Persistence\LazyTransaction;
12+
use Thesis\MessageBus\Persistence\OutboxAlreadyExists;
1413
use Thesis\MessageBus\Persistence\TransactionClosed;
1514

1615
/**
@@ -31,42 +30,41 @@ final class PostgresLazyTransaction implements LazyTransaction
3130
* @param non-empty-string $outboxTable
3231
*/
3332
public function __construct(
34-
private \Closure $begin,
35-
private string $outboxTable,
36-
private Endpoint $endpoint,
33+
private readonly \Closure $begin,
34+
private readonly string $outboxTable,
35+
private readonly Endpoint $endpoint,
3736
) {}
3837

3938
public function recordOutboxes(array $outboxes): void
4039
{
41-
$insertData = [];
42-
43-
$placeholders = [];
40+
$placeholderGroups = [];
41+
$params = [];
4442

4543
foreach ($outboxes as $outbox) {
46-
$insertData[] = $this->endpoint->toString();
47-
$insertData[] = $outbox->incomingMessageId;
48-
$insertData[] = new PostgresByteA(serialize($outbox->commands));
49-
$insertData[] = new PostgresByteA(serialize($outbox->events));
44+
$placeholderGroups[] = '(?, ?, ?, ?, ' . ($outbox->dispatched ? 'now()' : 'null') . ')';
5045

51-
$placeholders[] = '(?, ?, ?, ?,' . ($outbox->dispatched ? 'now()' : 'null') . ')';
46+
$params[] = $this->endpoint->toString();
47+
$params[] = $outbox->incomingMessageId;
48+
$params[] = new PostgresByteA(serialize($outbox->commands));
49+
$params[] = new PostgresByteA(serialize($outbox->events));
5250
}
5351

54-
try {
55-
$placeholders = implode(',', $placeholders);
52+
$placeholders = implode(',', $placeholderGroups);
5653

54+
try {
5755
$result = $this->transaction->execute(
5856
<<<SQL
5957
insert into {$this->outboxTable} (endpoint, incoming_message_id, commands, events, dispatched_at)
6058
values {$placeholders}
6159
on conflict (endpoint, incoming_message_id) do nothing
6260
SQL,
63-
$insertData,
61+
$params,
6462
);
6563
} catch (SqlTransactionError $exception) {
6664
throw new TransactionClosed(previous: $exception);
6765
}
6866

69-
if ($result->getRowCount() !== count($outboxes)) {
67+
if ($result->getRowCount() !== \count($outboxes)) {
7068
throw new OutboxAlreadyExists();
7169
}
7270
}

src/PostgresStorage.php

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,21 @@
55
namespace Thesis\MessageBus\Persistence\Postgres;
66

77
use Amp\Postgres\PostgresLink;
8-
use Amp\Postgres\PostgresTransaction as AmphpPostgresTransaction;
8+
use Amp\Postgres\PostgresTransaction;
99
use Thesis\MessageBus\Endpoint;
1010
use Thesis\MessageBus\Envelope;
11+
use Thesis\MessageBus\Persistence\LazyTransaction;
1112
use Thesis\MessageBus\Persistence\Outbox;
1213
use Thesis\MessageBus\Persistence\Storage;
13-
use Thesis\MessageBus\Persistence\LazyTransaction;
1414

1515
/**
1616
* @api
17-
* @implements Storage<AmphpPostgresTransaction>
17+
* @implements Storage<PostgresTransaction>
1818
*/
1919
final class PostgresStorage implements Storage
2020
{
2121
public array $transactionClasses {
22-
get {
23-
return [AmphpPostgresTransaction::class];
24-
}
22+
get => [PostgresTransaction::class];
2523
}
2624

2725
/**
@@ -61,26 +59,30 @@ public function createLazyTransaction(Endpoint $endpoint): LazyTransaction
6159

6260
public function findOutboxes(Endpoint $endpoint, array $incomingMessageIds): array
6361
{
64-
$placeholders = implode(',', array_fill(0, count($incomingMessageIds), '?'));
62+
$placeholders = str_repeat('?, ', \count($incomingMessageIds) - 1) . '?';
6563

6664
$result = $this
6765
->postgres
6866
->execute(
6967
<<<SQL
70-
select incoming_message_id, commands, events, dispatched_at is not null as dispatched
68+
select
69+
incoming_message_id,
70+
commands,
71+
events,
72+
dispatched_at is not null as dispatched
7173
from {$this->outboxTable}
7274
where endpoint = ? and incoming_message_id in ({$placeholders})
7375
SQL,
74-
[$endpoint->toString(), ...$incomingMessageIds],
76+
[
77+
$endpoint->toString(),
78+
...$incomingMessageIds,
79+
],
7580
);
7681

7782
$outboxes = [];
7883

7984
while (null !== $row = $result->fetchRow()) {
80-
/**
81-
* @var array{commands: string, events: string, incoming_message_id: non-empty-string, dispatched: bool} $row
82-
*/
83-
85+
/** @var array{commands: string, events: string, incoming_message_id: non-empty-string, dispatched: bool} $row */
8486
/** @var list<Envelope> $commands */
8587
$commands = unserialize($row['commands']);
8688
/** @var list<Envelope> $events */
@@ -99,13 +101,15 @@ public function findOutboxes(Endpoint $endpoint, array $incomingMessageIds): arr
99101

100102
public function markOutboxesDispatched(Endpoint $endpoint, array $incomingMessageIds): void
101103
{
102-
$placeholders = implode(',', array_fill(0, count($incomingMessageIds), '?'));
104+
$placeholders = str_repeat('?, ', \count($incomingMessageIds) - 1) . '?';
103105

104106
$this->postgres->execute(
105107
<<<SQL
106108
update {$this->outboxTable}
107109
set dispatched_at = now()
108-
where endpoint = ? and incoming_message_id = ({$placeholders}) and dispatched_at is null
110+
where endpoint = ?
111+
and incoming_message_id in ({$placeholders})
112+
and dispatched_at is null
109113
SQL,
110114
[
111115
$endpoint->toString(),

0 commit comments

Comments
 (0)