Skip to content

Commit 1b66145

Browse files
[Fix] CRUD upload retries (#205)
1 parent f0cef67 commit 1b66145

File tree

4 files changed

+186
-38
lines changed

4 files changed

+186
-38
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/common': patch
3+
---
4+
5+
Fixed CRUD uploads which would not retry after failing until the connection status was updated. A failed CRUD operation should not change the status to `connected: false`.

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,13 +221,18 @@ export abstract class AbstractStreamingSyncImplementation
221221
}
222222
} catch (ex) {
223223
this.updateSyncStatus({
224-
connected: false,
225224
dataFlow: {
226225
uploading: false
227226
}
228227
});
229228
await this.delayRetry();
230-
break;
229+
if (!this.isConnected) {
230+
// Exit the upload loop if the sync stream is no longer connected
231+
break;
232+
}
233+
this.logger.debug(
234+
`Caught exception when uploading. Upload will retry after a delay. Exception: ${ex.message}`
235+
);
231236
} finally {
232237
this.updateSyncStatus({
233238
dataFlow: {

packages/web/tests/stream.test.ts

Lines changed: 162 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
import _ from 'lodash';
22
import Logger from 'js-logger';
3-
import { beforeAll, describe, expect, it } from 'vitest';
3+
import { beforeAll, describe, expect, it, vi } from 'vitest';
44
import { v4 as uuid } from 'uuid';
55
import { AbstractPowerSyncDatabase, Schema, SyncStatusOptions, TableV2, column } from '@powersync/common';
66
import { MockRemote, MockStreamOpenFactory, TestConnector } from './utils/MockStreamOpenFactory';
77

8+
const UPLOAD_TIMEOUT_MS = 3000;
9+
810
export async function waitForConnectionStatus(
911
db: AbstractPowerSyncDatabase,
1012
statusCheck: SyncStatusOptions = { connected: true }
@@ -30,7 +32,9 @@ export async function generateConnectedDatabase({ useWebWorker } = { useWebWorke
3032
* Required since we cannot extend multiple classes.
3133
*/
3234
const callbacks: Map<string, () => void> = new Map();
33-
const remote = new MockRemote(new TestConnector(), () => callbacks.forEach((c) => c()));
35+
const connector = new TestConnector();
36+
const uploadSpy = vi.spyOn(connector, 'uploadData');
37+
const remote = new MockRemote(connector, () => callbacks.forEach((c) => c()));
3438

3539
const users = new TableV2({
3640
name: column.text
@@ -47,6 +51,8 @@ export async function generateConnectedDatabase({ useWebWorker } = { useWebWorke
4751
enableMultiTabs: false,
4852
useWebWorker
4953
},
54+
// Makes tests faster
55+
crudUploadThrottleMs: 0,
5056
schema
5157
},
5258
remote
@@ -62,69 +68,192 @@ export async function generateConnectedDatabase({ useWebWorker } = { useWebWorke
6268
});
6369
});
6470

65-
const streamOpened = waitForStream();
71+
const connect = async () => {
72+
const streamOpened = waitForStream();
73+
74+
const connectedPromise = powersync.connect(connector);
6675

67-
const connectedPromise = powersync.connect(new TestConnector());
76+
await streamOpened;
6877

69-
await streamOpened;
78+
remote.streamController?.enqueue(new TextEncoder().encode('{"token_expires_in":3426}\n'));
7079

71-
remote.streamController?.enqueue(new TextEncoder().encode('{"token_expires_in":3426}\n'));
80+
// Wait for connected to be true
81+
await connectedPromise;
82+
};
7283

73-
// Wait for connected to be true
74-
await connectedPromise;
84+
await connect();
7585

7686
return {
87+
connector,
88+
connect,
7789
factory,
7890
powersync,
7991
remote,
92+
uploadSpy,
8093
waitForStream
8194
};
8295
}
8396

84-
describe('Stream test', () => {
97+
describe('Streaming', () => {
8598
/**
8699
* Declares a test to be executed with different generated db functions
87100
*/
88-
const itWithGenerators = async (name: string, test: (func: () => any) => Promise<void>) => {
101+
const itWithGenerators = async (
102+
name: string,
103+
test: (createConnectedDatabase: () => ReturnType<typeof generateConnectedDatabase>) => Promise<void>
104+
) => {
89105
const funcWithWebWorker = generateConnectedDatabase;
90106
const funcWithoutWebWorker = () => generateConnectedDatabase({ useWebWorker: false });
91107

92108
it(`${name} - with web worker`, () => test(funcWithWebWorker));
93109
it(`${name} - without web worker`, () => test(funcWithoutWebWorker));
94110
};
95111

96-
describe('With Web Worker', () => {
97-
beforeAll(() => Logger.useDefaults());
112+
beforeAll(() => Logger.useDefaults());
113+
114+
itWithGenerators('PowerSync reconnect on closed stream', async (createConnectedDatabase) => {
115+
const { powersync, waitForStream, remote } = await createConnectedDatabase();
116+
expect(powersync.connected).toBe(true);
117+
118+
// Close the stream
119+
const newStream = waitForStream();
120+
remote.streamController?.close();
121+
122+
// A new stream should be requested
123+
await newStream;
124+
125+
await powersync.disconnectAndClear();
126+
await powersync.close();
127+
});
128+
129+
itWithGenerators('PowerSync reconnect multiple connect calls', async (createConnectedDatabase) => {
130+
// This initially performs a connect call
131+
const { powersync, waitForStream } = await createConnectedDatabase();
132+
expect(powersync.connected).toBe(true);
133+
134+
// Call connect again, a new stream should be requested
135+
const newStream = waitForStream();
136+
powersync.connect(new TestConnector());
98137

99-
itWithGenerators('PowerSync reconnect on closed stream', async (func) => {
100-
const { powersync, waitForStream, remote } = await func();
101-
expect(powersync.connected).toBe(true);
138+
// A new stream should be requested
139+
await newStream;
102140

103-
// Close the stream
104-
const newStream = waitForStream();
105-
remote.streamController?.close();
141+
await powersync.disconnectAndClear();
142+
await powersync.close();
143+
});
144+
145+
itWithGenerators('Should trigger upload connector when connected', async (createConnectedDatabase) => {
146+
const { powersync, uploadSpy } = await createConnectedDatabase();
147+
expect(powersync.connected).toBe(true);
148+
149+
// do something which should trigger an upload
150+
await powersync.execute('INSERT INTO users (id, name) VALUES (uuid(), ?)', ['name']);
151+
// It should try and upload
152+
await vi.waitFor(
153+
() => {
154+
// to-have-been-called seems to not work after failing the first check
155+
expect(uploadSpy.mock.calls.length).equals(1);
156+
},
157+
{
158+
timeout: UPLOAD_TIMEOUT_MS
159+
}
160+
);
106161

107-
// A new stream should be requested
108-
await newStream;
162+
await powersync.disconnectAndClear();
163+
await powersync.close();
164+
});
165+
166+
itWithGenerators('Should retry failed uploads when connected', async (createConnectedDatabase) => {
167+
const { powersync, uploadSpy } = await createConnectedDatabase();
168+
expect(powersync.connected).toBe(true);
109169

110-
await powersync.disconnectAndClear();
111-
await powersync.close();
170+
let uploadCounter = 0;
171+
// This test will throw an exception a few times before uploading
172+
const throwCounter = 2;
173+
uploadSpy.mockImplementation(async (db) => {
174+
if (uploadCounter++ < throwCounter) {
175+
throw new Error('No uploads yet');
176+
}
177+
// Now actually do the upload
178+
const tx = await db.getNextCrudTransaction();
179+
await tx?.complete();
112180
});
113181

114-
itWithGenerators('PowerSync reconnect multiple connect calls', async (func) => {
115-
// This initially performs a connect call
116-
const { powersync, waitForStream } = await func();
117-
expect(powersync.connected).toBe(true);
182+
// do something which should trigger an upload
183+
await powersync.execute('INSERT INTO users (id, name) VALUES (uuid(), ?)', ['name']);
118184

119-
// Call connect again, a new stream should be requested
120-
const newStream = waitForStream();
121-
powersync.connect(new TestConnector());
185+
// It should try and upload
186+
await vi.waitFor(
187+
() => {
188+
// to-have-been-called seems to not work after failing a check
189+
expect(uploadSpy.mock.calls.length).equals(throwCounter + 1);
190+
},
191+
{
192+
timeout: UPLOAD_TIMEOUT_MS
193+
}
194+
);
122195

123-
// A new stream should be requested
124-
await newStream;
196+
await powersync.disconnectAndClear();
197+
await powersync.close();
198+
});
199+
200+
itWithGenerators('Should upload after reconnecting', async (createConnectedDatabase) => {
201+
const { connect, powersync, uploadSpy } = await createConnectedDatabase();
202+
expect(powersync.connected).toBe(true);
203+
204+
await powersync.disconnect();
205+
206+
// do something (offline) which should trigger an upload
207+
await powersync.execute('INSERT INTO users (id, name) VALUES (uuid(), ?)', ['name']);
208+
209+
await connect();
210+
211+
// It should try and upload
212+
await vi.waitFor(
213+
() => {
214+
// to-have-been-called seems to not work after failing a check
215+
expect(uploadSpy.mock.calls.length).equals(1);
216+
},
217+
{
218+
timeout: UPLOAD_TIMEOUT_MS
219+
}
220+
);
221+
222+
await powersync.disconnectAndClear();
223+
await powersync.close();
224+
});
125225

126-
await powersync.disconnectAndClear();
127-
await powersync.close();
226+
itWithGenerators('Should update status when uploading', async (createConnectedDatabase) => {
227+
const { powersync, uploadSpy } = await createConnectedDatabase();
228+
expect(powersync.connected).toBe(true);
229+
230+
let uploadStartedPromise = new Promise<void>((resolve) => {
231+
uploadSpy.mockImplementation(async (db) => {
232+
resolve();
233+
// Now actually do the upload
234+
const tx = await db.getNextCrudTransaction();
235+
await tx?.complete();
236+
});
128237
});
238+
239+
// do something which should trigger an upload
240+
await powersync.execute('INSERT INTO users (id, name) VALUES (uuid(), ?)', ['name']);
241+
242+
await uploadStartedPromise;
243+
expect(powersync.currentStatus.dataFlowStatus.uploading).true;
244+
245+
// Status should update after uploads are completed
246+
await vi.waitFor(
247+
() => {
248+
// to-have-been-called seems to not work after failing a check
249+
expect(powersync.currentStatus.dataFlowStatus.uploading).false;
250+
},
251+
{
252+
timeout: UPLOAD_TIMEOUT_MS
253+
}
254+
);
255+
256+
await powersync.disconnectAndClear();
257+
await powersync.close();
129258
});
130259
});

packages/web/tests/utils/MockStreamOpenFactory.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ export class TestConnector implements PowerSyncBackendConnector {
2727
};
2828
}
2929
async uploadData(database: AbstractPowerSyncDatabase): Promise<void> {
30-
return;
30+
const tx = await database.getNextCrudTransaction();
31+
await tx?.complete();
3132
}
3233
}
3334

@@ -49,8 +50,16 @@ export class MockRemote extends AbstractRemote {
4950
post(path: string, data: any, headers?: Record<string, string> | undefined): Promise<any> {
5051
throw new Error('Method not implemented.');
5152
}
52-
get(path: string, headers?: Record<string, string> | undefined): Promise<any> {
53-
throw new Error('Method not implemented.');
53+
async get(path: string, headers?: Record<string, string> | undefined): Promise<any> {
54+
// mock a response for write checkpoint API
55+
if (path.includes('checkpoint')) {
56+
return {
57+
data: {
58+
write_checkpoint: '1000'
59+
}
60+
};
61+
}
62+
throw new Error('Not implemented');
5463
}
5564
async postStreaming(
5665
path: string,

0 commit comments

Comments
 (0)