Skip to content

Commit 37e266d

Browse files
[Fix] concurrent connections issue (#101)
1 parent c30e308 commit 37e266d

File tree

7 files changed

+64
-8
lines changed

7 files changed

+64
-8
lines changed

.changeset/lemon-zoos-sort.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@journeyapps/powersync-sdk-web': patch
3+
---
4+
5+
Added some serialization checks for broadcasted logs from shared web worker. Unserializable items will return a warning.

.changeset/lucky-planes-prove.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@journeyapps/powersync-sdk-common': patch
3+
---
4+
5+
Fixed issue where sync stream exceptions would not close previous streaming connections.

.changeset/modern-terms-admire.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@journeyapps/powersync-sdk-web': patch
3+
---
4+
5+
Fixed issue where SyncBucketStorage logs would not be broadcasted from the shared sync worker to individual tabs.

packages/powersync-sdk-common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,10 @@ export abstract class AbstractStreamingSyncImplementation
220220
}
221221
}
222222

223-
connect() {
223+
async connect() {
224+
if (this.abortController) {
225+
await this.disconnect();
226+
}
224227
this.abortController = new AbortController();
225228
this.streamingSync(this.abortController.signal);
226229
return this.waitForStatus({ connected: true });
@@ -231,6 +234,8 @@ export abstract class AbstractStreamingSyncImplementation
231234
throw new Error('Disconnect not possible');
232235
}
233236
this.abortController.abort('Disconnected');
237+
this.abortController = null;
238+
this.updateSyncStatus({ connected: false });
234239
}
235240

236241
/**
@@ -249,7 +254,14 @@ export abstract class AbstractStreamingSyncImplementation
249254
crudUpdate: () => this.triggerCrudUpload()
250255
});
251256

257+
/**
258+
* Create a new abort controller which aborts items downstream.
259+
* This is needed to close any previous connections on exception.
260+
*/
261+
let nestedAbortController = new AbortController();
262+
252263
signal.addEventListener('abort', () => {
264+
nestedAbortController.abort();
253265
this.crudUpdateListener?.();
254266
this.crudUpdateListener = undefined;
255267
this.updateSyncStatus({
@@ -265,7 +277,10 @@ export abstract class AbstractStreamingSyncImplementation
265277
if (signal?.aborted) {
266278
break;
267279
}
268-
await this.streamingSyncIteration(signal);
280+
const { retry } = await this.streamingSyncIteration(nestedAbortController.signal);
281+
if (!retry) {
282+
break;
283+
}
269284
// Continue immediately
270285
} catch (ex) {
271286
this.logger.error(ex);
@@ -274,8 +289,17 @@ export abstract class AbstractStreamingSyncImplementation
274289
});
275290
// On error, wait a little before retrying
276291
await this.delayRetry();
292+
} finally {
293+
// Abort any open network requests. Create a new nested controller for retry.
294+
nestedAbortController.abort();
295+
nestedAbortController = new AbortController();
277296
}
278297
}
298+
299+
// Mark as disconnected if here
300+
if (this.abortController) {
301+
await this.disconnect();
302+
}
279303
}
280304

281305
protected async streamingSyncIteration(signal: AbortSignal, progress?: () => void): Promise<{ retry?: boolean }> {

packages/powersync-sdk-web/src/db/sync/SharedWebStreamingSyncImplementation.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,6 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem
165165
data: {}
166166
};
167167

168-
(localStorage as any).setItem('posting close' + Math.random(), `$}`);
169-
170168
this.messagePort.postMessage(closeMessagePayload);
171169

172170
// Release the proxy

packages/powersync-sdk-web/src/worker/sync/BroadcastLogger.ts

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,17 @@ export class BroadcastLogger implements ILogger {
4040

4141
log(...x: any[]): void {
4242
console.log(...x);
43-
this.clients.forEach((p) => p.clientProvider.log(...x));
43+
this.sanitizeArgs(x, (params) => this.clients.forEach((p) => p.clientProvider.log(...params)));
4444
}
4545

4646
warn(...x: any[]): void {
4747
console.warn(...x);
48-
this.clients.forEach((p) => p.clientProvider.warn(...x));
48+
this.sanitizeArgs(x, (params) => this.clients.forEach((p) => p.clientProvider.warn(...params)));
4949
}
5050

5151
error(...x: any[]): void {
5252
console.error(...x);
53-
this.clients.forEach((p) => p.clientProvider.error(...x));
53+
this.sanitizeArgs(x, (params) => this.clients.forEach((p) => p.clientProvider.error(...params)));
5454
}
5555

5656
time(label: string): void {
@@ -76,4 +76,22 @@ export class BroadcastLogger implements ILogger {
7676
// Levels are not adjustable on this level.
7777
return true;
7878
}
79+
80+
/**
81+
* Guards against any logging errors.
82+
* We don't want a logging exception to cause further issues upstream
83+
*/
84+
private sanitizeArgs(x: any[], handler: (...params: any[]) => void) {
85+
const sanitizedParams = x.map((param) => {
86+
try {
87+
// Try and clone here first. If it fails it won't be passable over a MessagePort
88+
return structuredClone(x);
89+
} catch (ex) {
90+
console.error(ex);
91+
return 'Could not serialize log params. Check shared worker logs for more details.';
92+
}
93+
});
94+
95+
return handler(...sanitizedParams);
96+
}
7997
}

packages/powersync-sdk-web/src/worker/sync/SharedSyncImplementation.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,8 @@ export class SharedSyncImplementation
125125
flags: { enableMultiTabs: true },
126126
logger: this.broadCastLogger
127127
}),
128-
new Mutex()
128+
new Mutex(),
129+
this.broadCastLogger
129130
),
130131
remote: new WebRemote({
131132
fetchCredentials: async () => {

0 commit comments

Comments
 (0)