Skip to content

Commit 8cc27bd

Browse files
committed
Improve transaction isolation for postgres storage batches to avoid
concurrency issues.
1 parent b68ffa6 commit 8cc27bd

File tree

1 file changed

+26
-4
lines changed

1 file changed

+26
-4
lines changed

modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ export class PostgresBucketBatch
198198
AND source_table = ${{ type: 'varchar', value: sourceTable.id }}
199199
LIMIT
200200
${{ type: 'int4', value: BATCH_LIMIT }}
201+
FOR NO KEY UPDATE
201202
`)) {
202203
lastBatchCount += rows.length;
203204
processedCount += rows.length;
@@ -662,6 +663,7 @@ export class PostgresBucketBatch
662663
AND c.source_key = f.source_key
663664
WHERE
664665
c.group_id = ${{ type: 'int4', value: this.group_id }}
666+
FOR NO KEY UPDATE
665667
`)) {
666668
for (const row of rows) {
667669
const key = cacheKey(row.source_table, row.source_key);
@@ -707,7 +709,8 @@ export class PostgresBucketBatch
707709
) f ON c.source_table = f.source_table_id
708710
AND c.source_key = f.source_key
709711
WHERE
710-
c.group_id = $2;
712+
c.group_id = $2
713+
FOR NO KEY UPDATE;
711714
`,
712715
params: [
713716
{
@@ -1049,6 +1052,7 @@ export class PostgresBucketBatch
10491052
sync_rules
10501053
WHERE
10511054
id = ${{ type: 'int4', value: this.group_id }}
1055+
FOR NO KEY UPDATE;
10521056
`
10531057
.decoded(pick(models.SyncRules, ['state', 'snapshot_done']))
10541058
.first();
@@ -1098,9 +1102,27 @@ export class PostgresBucketBatch
10981102
callback: (tx: lib_postgres.WrappedConnection) => Promise<T>
10991103
): Promise<T> {
11001104
try {
1101-
return await this.db.transaction(async (db) => {
1102-
return await callback(db);
1103-
});
1105+
// Try for up to a minute
1106+
const lastTry = Date.now() + 60_000;
1107+
while (true) {
1108+
try {
1109+
return await this.db.transaction(async (db) => {
1110+
// The isolation level is required to protect against concurrent updates to the same data.
1111+
// In theory the "select ... for update" locks may be able to protect against this, but we
1112+
// still have failing tests if we use that as the only isolation mechanism.
1113+
await db.query('SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;');
1114+
return await callback(db);
1115+
});
1116+
} catch (err) {
1117+
// Serialization failure, retry
1118+
if (err[Symbol.for('pg.ErrorCode')] === '40001' && Date.now() < lastTry) {
1119+
this.logger.warn(`Serialization failure during replication transaction, retrying: ${err.message}`);
1120+
await timers.setTimeout(100 + Math.random() * 200);
1121+
continue;
1122+
}
1123+
throw err;
1124+
}
1125+
}
11041126
} finally {
11051127
await this.db.sql`
11061128
UPDATE sync_rules

0 commit comments

Comments
 (0)