Skip to content

Commit eefddef

Browse files
committed
Merge remote-tracking branch 'origin/main' into stream-during-snapshot
2 parents 6a5590f + dc696b1 commit eefddef

File tree

25 files changed

+331
-44
lines changed

25 files changed

+331
-44
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/service-image': minor
3+
---
4+
5+
Dynamically load connection modules for reduced memory usage
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
'@powersync/service-module-postgres-storage': patch
3+
'@powersync/service-module-mongodb-storage': patch
4+
'@powersync/service-core': patch
5+
'@powersync/service-types': patch
6+
'@powersync/service-image': patch
7+
---
8+
9+
Clear replication errors when any replication progress has been made.

.github/workflows/test.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ jobs:
1919
uses: actions/checkout@v5
2020

2121
- name: Login to Docker Hub
22+
if: github.event_name != 'pull_request'
2223
uses: docker/login-action@v3
2324
with:
2425
username: ${{ secrets.DOCKERHUB_USERNAME }}
@@ -48,6 +49,7 @@ jobs:
4849
- uses: actions/checkout@v5
4950

5051
- name: Login to Docker Hub
52+
if: github.event_name != 'pull_request'
5153
uses: docker/login-action@v3
5254
with:
5355
username: ${{ secrets.DOCKERHUB_USERNAME }}
@@ -95,6 +97,7 @@ jobs:
9597
- uses: actions/checkout@v5
9698

9799
- name: Login to Docker Hub
100+
if: github.event_name != 'pull_request'
98101
uses: docker/login-action@v3
99102
with:
100103
username: ${{ secrets.DOCKERHUB_USERNAME }}
@@ -171,6 +174,7 @@ jobs:
171174
- uses: actions/checkout@v5
172175

173176
- name: Login to Docker Hub
177+
if: github.event_name != 'pull_request'
174178
uses: docker/login-action@v3
175179
with:
176180
username: ${{ secrets.DOCKERHUB_USERNAME }}
@@ -244,6 +248,7 @@ jobs:
244248
- uses: actions/checkout@v4
245249

246250
- name: Login to Docker Hub
251+
if: github.event_name != 'pull_request'
247252
uses: docker/login-action@v3
248253
with:
249254
username: ${{ secrets.DOCKERHUB_USERNAME }}

modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,7 @@ export class MongoBucketStorage
218218
slot_name: slot_name,
219219
last_checkpoint_ts: null,
220220
last_fatal_error: null,
221+
last_fatal_error_ts: null,
221222
last_keepalive_ts: null
222223
};
223224
await this.db.sync_rules.insertOne(doc);

modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ export class MongoBucketBatch
8484
private batch: OperationBatch | null = null;
8585
private write_checkpoint_batch: storage.CustomWriteCheckpointOptions[] = [];
8686
private markRecordUnavailable: BucketStorageMarkRecordUnavailable | undefined;
87+
private clearedError = false;
8788

8889
/**
8990
* Last LSN received associated with a checkpoint.
@@ -242,6 +243,8 @@ export class MongoBucketBatch
242243
let resumeBatch: OperationBatch | null = null;
243244
let transactionSize = 0;
244245

246+
let didFlush = false;
247+
245248
// Now batch according to the sizes
246249
// This is a single batch if storeCurrentData == false
247250
for await (let b of batch.batched(sizes)) {
@@ -291,7 +294,8 @@ export class MongoBucketBatch
291294
if (persistedBatch!.shouldFlushTransaction()) {
292295
// Transaction is getting big.
293296
// Flush, and resume in a new transaction.
294-
await persistedBatch!.flush(this.db, this.session, options);
297+
const { flushedAny } = await persistedBatch!.flush(this.db, this.session, options);
298+
didFlush ||= flushedAny;
295299
persistedBatch = null;
296300
// Computing our current progress is a little tricky here, since
297301
// we're stopping in the middle of a batch.
@@ -302,10 +306,15 @@ export class MongoBucketBatch
302306

303307
if (persistedBatch) {
304308
transactionSize = persistedBatch.currentSize;
305-
await persistedBatch.flush(this.db, this.session, options);
309+
const { flushedAny } = await persistedBatch.flush(this.db, this.session, options);
310+
didFlush ||= flushedAny;
306311
}
307312
}
308313

314+
if (didFlush) {
315+
await this.clearError();
316+
}
317+
309318
return resumeBatch?.hasData() ? resumeBatch : null;
310319
}
311320

@@ -746,9 +755,8 @@ export class MongoBucketBatch
746755
$cond: ['$_can_checkpoint', { $literal: now }, '$last_checkpoint_ts']
747756
},
748757
last_keepalive_ts: { $literal: now },
749-
last_fatal_error: {
750-
$cond: ['$_can_checkpoint', { $literal: null }, '$last_fatal_error']
751-
},
758+
last_fatal_error: { $literal: null },
759+
last_fatal_error_ts: { $literal: null },
752760
keepalive_op: new_keepalive_op,
753761
last_checkpoint: new_last_checkpoint,
754762
// Unset snapshot_lsn on checkpoint
@@ -1143,6 +1151,26 @@ export class MongoBucketBatch
11431151
});
11441152
}
11451153

1154+
protected async clearError(): Promise<void> {
1155+
// No need to clear an error more than once per batch, since an error would always result in restarting the batch.
1156+
if (this.clearedError) {
1157+
return;
1158+
}
1159+
1160+
await this.db.sync_rules.updateOne(
1161+
{
1162+
_id: this.group_id
1163+
},
1164+
{
1165+
$set: {
1166+
last_fatal_error: null,
1167+
last_fatal_error_ts: null
1168+
}
1169+
}
1170+
);
1171+
this.clearedError = true;
1172+
}
1173+
11461174
/**
11471175
* Gets relevant {@link SqlEventDescriptor}s for the given {@link SourceTable}
11481176
*/

modules/module-mongodb-storage/src/storage/implementation/MongoPersistedSyncRulesContent.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ export class MongoPersistedSyncRulesContent implements storage.PersistedSyncRule
1313
public readonly sync_rules_content: string;
1414
public readonly last_checkpoint_lsn: string | null;
1515
public readonly last_fatal_error: string | null;
16+
public readonly last_fatal_error_ts: Date | null;
1617
public readonly last_keepalive_ts: Date | null;
1718
public readonly last_checkpoint_ts: Date | null;
1819
public readonly active: boolean;
@@ -29,6 +30,7 @@ export class MongoPersistedSyncRulesContent implements storage.PersistedSyncRule
2930
// Handle legacy values
3031
this.slot_name = doc.slot_name ?? `powersync_${this.id}`;
3132
this.last_fatal_error = doc.last_fatal_error;
33+
this.last_fatal_error_ts = doc.last_fatal_error_ts;
3234
this.last_checkpoint_ts = doc.last_checkpoint_ts;
3335
this.last_keepalive_ts = doc.last_keepalive_ts;
3436
this.active = doc.state == 'ACTIVE';

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -646,11 +646,11 @@ export class MongoSyncBucketStorage
646646
},
647647
{
648648
$set: {
649-
last_fatal_error: message
649+
last_fatal_error: message,
650+
last_fatal_error_ts: new Date()
650651
}
651652
}
652653
);
653-
await this.db.notifyCheckpoint();
654654
}
655655

656656
async compact(options?: storage.CompactOptions) {

modules/module-mongodb-storage/src/storage/implementation/PersistedBatch.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,12 +370,21 @@ export class PersistedBatch {
370370
}
371371
}
372372

373+
const stats = {
374+
bucketDataCount: this.bucketData.length,
375+
parameterDataCount: this.bucketParameters.length,
376+
currentDataCount: this.currentData.length,
377+
flushedAny: flushedSomething
378+
};
379+
373380
this.bucketData = [];
374381
this.bucketParameters = [];
375382
this.currentData = [];
376383
this.bucketStates.clear();
377384
this.currentSize = 0;
378385
this.debugLastOpId = null;
386+
387+
return stats;
379388
}
380389

381390
private getBucketStateUpdates(): mongo.AnyBulkWriteOperation<BucketStateDocument>[] {

modules/module-mongodb-storage/src/storage/implementation/models.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,8 @@ export interface SyncRuleDocument {
202202
*/
203203
last_fatal_error: string | null;
204204

205+
last_fatal_error_ts: Date | null;
206+
205207
content: string;
206208

207209
lock?: {

modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ export class PostgresBucketBatch
8888
private lastWaitingLogThrottled = 0;
8989
private markRecordUnavailable: BucketStorageMarkRecordUnavailable | undefined;
9090
private needsActivation = true;
91+
private clearedError = false;
9192

9293
constructor(protected options: PostgresBucketBatchOptions) {
9394
super();
@@ -227,7 +228,10 @@ export class PostgresBucketBatch
227228
});
228229
}
229230
}
230-
await persistedBatch.flush(db);
231+
const { flushedAny } = await persistedBatch.flush(db);
232+
if (flushedAny) {
233+
await this.clearError(db);
234+
}
231235
});
232236
}
233237
if (processedCount == 0) {
@@ -677,6 +681,7 @@ export class PostgresBucketBatch
677681

678682
// If set, we need to start a new transaction with this batch.
679683
let resumeBatch: OperationBatch | null = null;
684+
let didFlush = false;
680685

681686
// Now batch according to the sizes
682687
// This is a single batch if storeCurrentData == false
@@ -762,7 +767,8 @@ export class PostgresBucketBatch
762767
}
763768

764769
if (persistedBatch!.shouldFlushTransaction()) {
765-
await persistedBatch!.flush(db);
770+
const { flushedAny } = await persistedBatch!.flush(db);
771+
didFlush ||= flushedAny;
766772
// The operations stored in this batch will be processed in the `resumeBatch`
767773
persistedBatch = null;
768774
// Return the remaining entries for the next resume transaction
@@ -775,10 +781,15 @@ export class PostgresBucketBatch
775781
* The operations were less than the max size if here. Flush now.
776782
* `persistedBatch` will be `null` if the operations should be flushed in a new transaction.
777783
*/
778-
await persistedBatch.flush(db);
784+
const { flushedAny } = await persistedBatch.flush(db);
785+
didFlush ||= flushedAny;
779786
}
780787
}
781788

789+
if (didFlush) {
790+
await this.clearError(db);
791+
}
792+
782793
// Don't return empty batches
783794
if (resumeBatch?.batch.length) {
784795
return resumeBatch;
@@ -1139,6 +1150,24 @@ export class PostgresBucketBatch
11391150
}
11401151
}
11411152

1153+
protected async clearError(
1154+
db: lib_postgres.AbstractPostgresConnection | lib_postgres.DatabaseClient = this.db
1155+
): Promise<void> {
1156+
// No need to clear an error more than once per batch, since an error would always result in restarting the batch.
1157+
if (this.clearedError) {
1158+
return;
1159+
}
1160+
1161+
await db.sql`
1162+
UPDATE sync_rules
1163+
SET
1164+
last_fatal_error = ${{ type: 'varchar', value: null }}
1165+
WHERE
1166+
id = ${{ type: 'int4', value: this.group_id }}
1167+
`.execute();
1168+
this.clearedError = true;
1169+
}
1170+
11421171
private async getLastOpIdSequence(db: lib_postgres.AbstractPostgresConnection) {
11431172
// When no op_id has been generated, last_value = 1 and nextval() will be 1.
11441173
// To cater for this case, we check is_called, and default to 0 if no value has been generated.

0 commit comments

Comments
 (0)