Skip to content

Commit dc696b1

Browse files
authored
Clear replication errors when any replication progress has been made (#421)
* MongoDB storage - clear errors on flush. * Postgres storage - clear error on flush. * Record timestamp for replication errors. * Changeset.
1 parent 5b6a544 commit dc696b1

File tree

14 files changed

+128
-23
lines changed

14 files changed

+128
-23
lines changed
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
'@powersync/service-module-postgres-storage': patch
3+
'@powersync/service-module-mongodb-storage': patch
4+
'@powersync/service-core': patch
5+
'@powersync/service-types': patch
6+
'@powersync/service-image': patch
7+
---
8+
9+
Clear replication errors when any replication progress has been made.

modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,7 @@ export class MongoBucketStorage
218218
slot_name: slot_name,
219219
last_checkpoint_ts: null,
220220
last_fatal_error: null,
221+
last_fatal_error_ts: null,
221222
last_keepalive_ts: null
222223
};
223224
await this.db.sync_rules.insertOne(doc);

modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ export class MongoBucketBatch
8282
private batch: OperationBatch | null = null;
8383
private write_checkpoint_batch: storage.CustomWriteCheckpointOptions[] = [];
8484
private markRecordUnavailable: BucketStorageMarkRecordUnavailable | undefined;
85+
private clearedError = false;
8586

8687
/**
8788
* Last LSN received associated with a checkpoint.
@@ -243,6 +244,8 @@ export class MongoBucketBatch
243244
let resumeBatch: OperationBatch | null = null;
244245
let transactionSize = 0;
245246

247+
let didFlush = false;
248+
246249
// Now batch according to the sizes
247250
// This is a single batch if storeCurrentData == false
248251
for await (let b of batch.batched(sizes)) {
@@ -292,7 +295,8 @@ export class MongoBucketBatch
292295
if (persistedBatch!.shouldFlushTransaction()) {
293296
// Transaction is getting big.
294297
// Flush, and resume in a new transaction.
295-
await persistedBatch!.flush(this.db, this.session, options);
298+
const { flushedAny } = await persistedBatch!.flush(this.db, this.session, options);
299+
didFlush ||= flushedAny;
296300
persistedBatch = null;
297301
// Computing our current progress is a little tricky here, since
298302
// we're stopping in the middle of a batch.
@@ -303,10 +307,15 @@ export class MongoBucketBatch
303307

304308
if (persistedBatch) {
305309
transactionSize = persistedBatch.currentSize;
306-
await persistedBatch.flush(this.db, this.session, options);
310+
const { flushedAny } = await persistedBatch.flush(this.db, this.session, options);
311+
didFlush ||= flushedAny;
307312
}
308313
}
309314

315+
if (didFlush) {
316+
await this.clearError();
317+
}
318+
310319
return resumeBatch?.hasData() ? resumeBatch : null;
311320
}
312321

@@ -714,6 +723,7 @@ export class MongoBucketBatch
714723
last_keepalive_ts: now,
715724
snapshot_done: true,
716725
last_fatal_error: null,
726+
last_fatal_error_ts: null,
717727
keepalive_op: null
718728
};
719729

@@ -848,6 +858,7 @@ export class MongoBucketBatch
848858
last_checkpoint_lsn: lsn,
849859
snapshot_done: true,
850860
last_fatal_error: null,
861+
last_fatal_error_ts: null,
851862
last_keepalive_ts: new Date()
852863
},
853864
$unset: { snapshot_lsn: 1 }
@@ -1075,6 +1086,26 @@ export class MongoBucketBatch
10751086
});
10761087
}
10771088

1089+
protected async clearError(): Promise<void> {
1090+
// No need to clear an error more than once per batch, since an error would always result in restarting the batch.
1091+
if (this.clearedError) {
1092+
return;
1093+
}
1094+
1095+
await this.db.sync_rules.updateOne(
1096+
{
1097+
_id: this.group_id
1098+
},
1099+
{
1100+
$set: {
1101+
last_fatal_error: null,
1102+
last_fatal_error_ts: null
1103+
}
1104+
}
1105+
);
1106+
this.clearedError = true;
1107+
}
1108+
10781109
/**
10791110
* Gets relevant {@link SqlEventDescriptor}s for the given {@link SourceTable}
10801111
*/

modules/module-mongodb-storage/src/storage/implementation/MongoPersistedSyncRulesContent.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ export class MongoPersistedSyncRulesContent implements storage.PersistedSyncRule
1313
public readonly sync_rules_content: string;
1414
public readonly last_checkpoint_lsn: string | null;
1515
public readonly last_fatal_error: string | null;
16+
public readonly last_fatal_error_ts: Date | null;
1617
public readonly last_keepalive_ts: Date | null;
1718
public readonly last_checkpoint_ts: Date | null;
1819
public readonly active: boolean;
@@ -29,6 +30,7 @@ export class MongoPersistedSyncRulesContent implements storage.PersistedSyncRule
2930
// Handle legacy values
3031
this.slot_name = doc.slot_name ?? `powersync_${this.id}`;
3132
this.last_fatal_error = doc.last_fatal_error;
33+
this.last_fatal_error_ts = doc.last_fatal_error_ts;
3234
this.last_checkpoint_ts = doc.last_checkpoint_ts;
3335
this.last_keepalive_ts = doc.last_keepalive_ts;
3436
this.active = doc.state == 'ACTIVE';

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ import { MongoParameterCompactor } from './MongoParameterCompactor.js';
3939
import { MongoWriteCheckpointAPI } from './MongoWriteCheckpointAPI.js';
4040
import { idPrefixFilter, mapOpEntry, readSingleBatch, setSessionSnapshotTime } from '../../utils/util.js';
4141

42-
4342
export interface MongoSyncBucketStorageOptions {
4443
checksumOptions?: MongoChecksumOptions;
4544
}
@@ -648,11 +647,11 @@ export class MongoSyncBucketStorage
648647
},
649648
{
650649
$set: {
651-
last_fatal_error: message
650+
last_fatal_error: message,
651+
last_fatal_error_ts: new Date()
652652
}
653653
}
654654
);
655-
await this.db.notifyCheckpoint();
656655
}
657656

658657
async compact(options?: storage.CompactOptions) {

modules/module-mongodb-storage/src/storage/implementation/PersistedBatch.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,12 +351,21 @@ export class PersistedBatch {
351351
}
352352
}
353353

354+
const stats = {
355+
bucketDataCount: this.bucketData.length,
356+
parameterDataCount: this.bucketParameters.length,
357+
currentDataCount: this.currentData.length,
358+
flushedAny: flushedSomething
359+
};
360+
354361
this.bucketData = [];
355362
this.bucketParameters = [];
356363
this.currentData = [];
357364
this.bucketStates.clear();
358365
this.currentSize = 0;
359366
this.debugLastOpId = null;
367+
368+
return stats;
360369
}
361370

362371
private getBucketStateUpdates(): mongo.AnyBulkWriteOperation<BucketStateDocument>[] {

modules/module-mongodb-storage/src/storage/implementation/models.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,8 @@ export interface SyncRuleDocument {
196196
*/
197197
last_fatal_error: string | null;
198198

199+
last_fatal_error_ts: Date | null;
200+
199201
content: string;
200202

201203
lock?: {

modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ export class PostgresBucketBatch
7777
private lastWaitingLogThrottled = 0;
7878
private markRecordUnavailable: BucketStorageMarkRecordUnavailable | undefined;
7979
private needsActivation = true;
80+
private clearedError = false;
8081

8182
constructor(protected options: PostgresBucketBatchOptions) {
8283
super();
@@ -213,7 +214,10 @@ export class PostgresBucketBatch
213214
});
214215
}
215216
}
216-
await persistedBatch.flush(db);
217+
const { flushedAny } = await persistedBatch.flush(db);
218+
if (flushedAny) {
219+
await this.clearError(db);
220+
}
217221
});
218222
}
219223
if (processedCount == 0) {
@@ -570,6 +574,7 @@ export class PostgresBucketBatch
570574

571575
// If set, we need to start a new transaction with this batch.
572576
let resumeBatch: OperationBatch | null = null;
577+
let didFlush = false;
573578

574579
// Now batch according to the sizes
575580
// This is a single batch if storeCurrentData == false
@@ -654,7 +659,8 @@ export class PostgresBucketBatch
654659
}
655660

656661
if (persistedBatch!.shouldFlushTransaction()) {
657-
await persistedBatch!.flush(db);
662+
const { flushedAny } = await persistedBatch!.flush(db);
663+
didFlush ||= flushedAny;
658664
// The operations stored in this batch will be processed in the `resumeBatch`
659665
persistedBatch = null;
660666
// Return the remaining entries for the next resume transaction
@@ -667,10 +673,15 @@ export class PostgresBucketBatch
667673
* The operations were less than the max size if here. Flush now.
668674
* `persistedBatch` will be `null` if the operations should be flushed in a new transaction.
669675
*/
670-
await persistedBatch.flush(db);
676+
const { flushedAny } = await persistedBatch.flush(db);
677+
didFlush ||= flushedAny;
671678
}
672679
}
673680

681+
if (didFlush) {
682+
await this.clearError(db);
683+
}
684+
674685
// Don't return empty batches
675686
if (resumeBatch?.batch.length) {
676687
return resumeBatch;
@@ -1006,6 +1017,24 @@ export class PostgresBucketBatch
10061017
}
10071018
}
10081019

1020+
protected async clearError(
1021+
db: lib_postgres.AbstractPostgresConnection | lib_postgres.DatabaseClient = this.db
1022+
): Promise<void> {
1023+
// No need to clear an error more than once per batch, since an error would always result in restarting the batch.
1024+
if (this.clearedError) {
1025+
return;
1026+
}
1027+
1028+
await db.sql`
1029+
UPDATE sync_rules
1030+
SET
1031+
last_fatal_error = ${{ type: 'varchar', value: null }}
1032+
WHERE
1033+
id = ${{ type: 'int4', value: this.group_id }}
1034+
`.execute();
1035+
this.clearedError = true;
1036+
}
1037+
10091038
private async getLastOpIdSequence(db: lib_postgres.AbstractPostgresConnection) {
10101039
// When no op_id has been generated, last_value = 1 and nextval() will be 1.
10111040
// To cater for this case, we check is_called, and default to 0 if no value has been generated.

modules/module-postgres-storage/src/storage/batch/PostgresPersistedBatch.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,13 @@ export class PostgresPersistedBatch {
236236
}
237237

238238
async flush(db: lib_postgres.WrappedConnection) {
239+
const stats = {
240+
bucketDataCount: this.bucketDataInserts.length,
241+
parameterDataCount: this.parameterDataInserts.length,
242+
currentDataCount: this.currentDataInserts.size + this.currentDataDeletes.length
243+
};
244+
const flushedAny = stats.bucketDataCount > 0 || stats.parameterDataCount > 0 || stats.currentDataCount > 0;
245+
239246
logger.info(
240247
`powersync_${this.group_id} Flushed ${this.bucketDataInserts.length} + ${this.parameterDataInserts.length} + ${
241248
this.currentDataInserts.size + this.currentDataDeletes.length
@@ -251,6 +258,11 @@ export class PostgresPersistedBatch {
251258
this.currentDataDeletes = [];
252259
this.currentDataInserts = new Map();
253260
this.currentSize = 0;
261+
262+
return {
263+
...stats,
264+
flushedAny
265+
};
254266
}
255267

256268
protected async flushBucketData(db: lib_postgres.WrappedConnection) {

packages/service-core/src/api/diagnostics.ts

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { logger } from '@powersync/lib-services-framework';
22
import { DEFAULT_TAG, SourceTableInterface, SqlSyncRules } from '@powersync/service-sync-rules';
3-
import { SyncRulesStatus, TableInfo } from '@powersync/service-types';
3+
import { ReplicationError, SyncRulesStatus, TableInfo } from '@powersync/service-types';
44

55
import * as storage from '../storage/storage-index.js';
66
import { RouteAPI } from './RouteAPI.js';
@@ -39,6 +39,7 @@ export async function getSyncRulesStatus(
3939
const include_content = options.include_content ?? false;
4040
const live_status = options.live_status ?? false;
4141
const check_connection = options.check_connection ?? false;
42+
const now = new Date().toISOString();
4243

4344
let rules: SqlSyncRules;
4445
let persisted: storage.PersistedSyncRules;
@@ -49,7 +50,7 @@ export async function getSyncRulesStatus(
4950
return {
5051
content: include_content ? sync_rules.sync_rules_content : undefined,
5152
connections: [],
52-
errors: [{ level: 'fatal', message: e.message }]
53+
errors: [{ level: 'fatal', message: e.message, ts: now }]
5354
};
5455
}
5556

@@ -99,7 +100,7 @@ export async function getSyncRulesStatus(
99100
data_queries: false,
100101
parameter_queries: false,
101102
replication_id: [],
102-
errors: [{ level: 'fatal', message: 'connection failed' }]
103+
errors: [{ level: 'fatal', message: 'connection failed', ts: now }]
103104
};
104105
} else {
105106
const source: SourceTableInterface = {
@@ -115,21 +116,26 @@ export async function getSyncRulesStatus(
115116
data_queries: syncData,
116117
parameter_queries: syncParameters,
117118
replication_id: [],
118-
errors: [{ level: 'fatal', message: 'connection failed' }]
119+
errors: [{ level: 'fatal', message: 'connection failed', ts: now }]
119120
};
120121
}
121122
});
122123
}
123124

124125
const errors = tables_flat.flatMap((info) => info.errors);
125126
if (sync_rules.last_fatal_error) {
126-
errors.push({ level: 'fatal', message: sync_rules.last_fatal_error });
127+
errors.push({
128+
level: 'fatal',
129+
message: sync_rules.last_fatal_error,
130+
ts: sync_rules.last_fatal_error_ts?.toISOString()
131+
});
127132
}
128133
errors.push(
129134
...rules.errors.map((e) => {
130135
return {
131136
level: e.type,
132-
message: e.message
137+
message: e.message,
138+
ts: now
133139
};
134140
})
135141
);
@@ -140,7 +146,8 @@ export async function getSyncRulesStatus(
140146
if (sync_rules.last_checkpoint_ts == null && sync_rules.last_keepalive_ts == null) {
141147
errors.push({
142148
level: 'warning',
143-
message: 'No checkpoint found, cannot calculate replication lag'
149+
message: 'No checkpoint found, cannot calculate replication lag',
150+
ts: now
144151
});
145152
} else {
146153
const lastTime = Math.max(
@@ -155,12 +162,14 @@ export async function getSyncRulesStatus(
155162
if (lagSeconds > 15 * 60) {
156163
errors.push({
157164
level: 'fatal',
158-
message: `No replicated commit in more than ${lagSeconds}s`
165+
message: `No replicated commit in more than ${lagSeconds}s`,
166+
ts: now
159167
});
160168
} else if (lagSeconds > 5 * 60) {
161169
errors.push({
162170
level: 'warning',
163-
message: `No replicated commit in more than ${lagSeconds}s`
171+
message: `No replicated commit in more than ${lagSeconds}s`,
172+
ts: now
164173
});
165174
}
166175
}
@@ -186,9 +195,9 @@ export async function getSyncRulesStatus(
186195
};
187196
}
188197

189-
function deduplicate(errors: { level: 'warning' | 'fatal'; message: string }[]) {
198+
function deduplicate(errors: ReplicationError[]): ReplicationError[] {
190199
let seen = new Set<string>();
191-
let result: { level: 'warning' | 'fatal'; message: string }[] = [];
200+
let result: ReplicationError[] = [];
192201
for (let error of errors) {
193202
const key = JSON.stringify(error);
194203
if (seen.has(key)) {

0 commit comments

Comments
 (0)