11import 'dart:async' ;
22import 'dart:convert' as convert;
33
4+ import 'package:async/async.dart' ;
45import 'package:http/http.dart' as http;
56import 'package:powersync/src/abort_controller.dart' ;
67import 'package:powersync/src/exceptions.dart' ;
@@ -27,6 +28,11 @@ class StreamingSyncImplementation {
2728
2829 final Future <void > Function () uploadCrud;
2930
31+ // An internal controller which is used to trigger CRUD uploads internally
32+ // e.g. when reconnecting.
33+ final StreamController <Null > _internalCrudTriggerController =
34+ StreamController <Null >.broadcast ();
35+
3036 final Stream crudUpdateTriggerStream;
3137
3238 final StreamController <SyncStatus > _statusStreamController =
@@ -83,6 +89,7 @@ class StreamingSyncImplementation {
8389 // However, we still need to close the underlying stream explicitly, otherwise
8490 // the break will wait for the next line of data received on the stream.
8591 _localPingController.add (null );
92+ await _internalCrudTriggerController.close ();
8693 // According to the documentation, the behavior is undefined when calling
8794 // close() while requests are pending. However, this is no other
8895 // known way to cancel open streams, and this appears to end the stream with
@@ -92,6 +99,9 @@ class StreamingSyncImplementation {
9299 if (_safeToClose) {
93100 _client.close ();
94101 }
102+
103+ await _internalCrudTriggerController.close ();
104+
95105 // wait for completeAbort() to be called
96106 await future;
97107
@@ -155,7 +165,8 @@ class StreamingSyncImplementation {
155165 Future <void > crudLoop () async {
156166 await uploadAllCrud ();
157167
158- await for (var _ in crudUpdateTriggerStream) {
168+ await for (var _ in StreamGroup .merge (
169+ [crudUpdateTriggerStream, _internalCrudTriggerController.stream])) {
159170 if (_abort? .aborted == true ) {
160171 break ;
161172 }
@@ -298,6 +309,9 @@ class StreamingSyncImplementation {
298309 Future <void >? credentialsInvalidation;
299310 bool haveInvalidated = false ;
300311
312+ // Trigger a CRUD upload on reconnect
313+ _internalCrudTriggerController.add (null );
314+
301315 await for (var line in merged) {
302316 if (aborted) {
303317 break ;
0 commit comments