@@ -4,6 +4,7 @@ import 'package:http/http.dart' as http;
44import 'package:powersync/src/abort_controller.dart' ;
55import 'package:powersync/src/exceptions.dart' ;
66import 'package:powersync/src/log_internal.dart' ;
7+ import 'package:sqlite_async/mutex.dart' ;
78
89import 'bucket_storage.dart' ;
910import 'connector.dart' ;
@@ -37,14 +38,22 @@ class StreamingSyncImplementation {
3738
3839 SyncStatus lastStatus = const SyncStatus ();
3940
41+ final Mutex syncMutex, crudMutex;
42+
4043 StreamingSyncImplementation (
4144 {required this .adapter,
4245 required this .credentialsCallback,
4346 this .invalidCredentialsCallback,
4447 required this .uploadCrud,
4548 required this .updateStream,
4649 required this .retryDelay,
47- required http.Client client}) {
50+ required http.Client client,
51+
52+ /// A unique identifier for this streaming sync implementation
53+ /// A good value is typically the DB file path which it will mutate when syncing.
54+ String ? identifier = "unknown" })
55+ : syncMutex = Mutex (identifier: "sync-${identifier }" ),
56+ crudMutex = Mutex (identifier: "crud-${identifier }" ) {
4857 _client = client;
4958 statusStream = _statusStreamController.stream;
5059 }
@@ -65,8 +74,10 @@ class StreamingSyncImplementation {
6574 await invalidCredentialsCallback !();
6675 invalidCredentials = false ;
6776 }
68- await streamingSyncIteration (abortController: abortController);
69- // Continue immediately
77+ // Protect sync iterations with exclusivity (if a valid Mutex is provided)
78+ await syncMutex.lock (
79+ () => streamingSyncIteration (abortController: abortController),
80+ timeout: retryDelay);
7081 } catch (e, stacktrace) {
7182 final message = _syncErrorMessage (e);
7283 isolateLogger.warning ('Sync error: $message ' , e, stacktrace);
@@ -110,21 +121,23 @@ class StreamingSyncImplementation {
110121 }
111122
112123 Future <bool > uploadCrudBatch () async {
113- if ((await adapter.hasCrud ())) {
114- _updateStatus (uploading: true );
115- await uploadCrud ();
116- return false ;
117- } else {
118- // This isolate is the only one triggering
119- final updated = await adapter.updateLocalTarget (() async {
120- return getWriteCheckpoint ();
121- });
122- if (updated) {
123- _localPingController.add (null );
124- }
124+ return crudMutex.lock (() async {
125+ if ((await adapter.hasCrud ())) {
126+ _updateStatus (uploading: true );
127+ await uploadCrud ();
128+ return false ;
129+ } else {
130+ // This isolate is the only one triggering
131+ final updated = await adapter.updateLocalTarget (() async {
132+ return getWriteCheckpoint ();
133+ });
134+ if (updated) {
135+ _localPingController.add (null );
136+ }
125137
126- return true ;
127- }
138+ return true ;
139+ }
140+ }, timeout: retryDelay);
128141 }
129142
130143 Future <String > getWriteCheckpoint () async {
0 commit comments