Skip to content

Commit cbae24e

Browse files
committed
Fix checkpoints during uploads not being applied
1 parent ad5e6b3 commit cbae24e

File tree

6 files changed

+196
-37
lines changed

6 files changed

+196
-37
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## 1.0.0-BETA29 (unreleased)
44

55
* Fix potential race condition between jobs in `connect()` and `disconnect()`.
6+
* Fix race condition causing data received during uploads not to be applied.
67

78
## 1.0.0-BETA28
89

core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt

Lines changed: 107 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import com.powersync.bucket.BucketPriority
1111
import com.powersync.bucket.Checkpoint
1212
import com.powersync.bucket.OpType
1313
import com.powersync.bucket.OplogEntry
14+
import com.powersync.bucket.WriteCheckpointData
15+
import com.powersync.bucket.WriteCheckpointResponse
1416
import com.powersync.connectors.PowerSyncBackendConnector
1517
import com.powersync.connectors.PowerSyncCredentials
1618
import com.powersync.db.PowerSyncDatabaseImpl
@@ -24,11 +26,15 @@ import com.powersync.testutils.factory
2426
import com.powersync.testutils.generatePrintLogWriter
2527
import com.powersync.testutils.waitFor
2628
import com.powersync.utils.JsonUtil
29+
import dev.mokkery.answering.calls
2730
import dev.mokkery.answering.returns
2831
import dev.mokkery.everySuspend
32+
import dev.mokkery.matcher.any
2933
import dev.mokkery.mock
3034
import dev.mokkery.verify
35+
import kotlinx.coroutines.CompletableDeferred
3136
import kotlinx.coroutines.DelicateCoroutinesApi
37+
import kotlinx.coroutines.GlobalScope
3238
import kotlinx.coroutines.channels.Channel
3339
import kotlinx.coroutines.runBlocking
3440
import kotlinx.coroutines.test.runTest
@@ -61,6 +67,7 @@ class SyncIntegrationTest {
6167
private lateinit var database: PowerSyncDatabaseImpl
6268
private lateinit var connector: PowerSyncBackendConnector
6369
private lateinit var syncLines: Channel<SyncLine>
70+
private lateinit var checkpointResponse: () -> WriteCheckpointResponse
6471

6572
@BeforeTest
6673
fun setup() {
@@ -79,6 +86,7 @@ class SyncIntegrationTest {
7986
everySuspend { invalidateCredentials() } returns Unit
8087
}
8188
syncLines = Channel()
89+
checkpointResponse = { WriteCheckpointResponse(WriteCheckpointData("1000")) }
8290

8391
runBlocking {
8492
database.disconnectAndClear(true)
@@ -100,16 +108,18 @@ class SyncIntegrationTest {
100108
dbFilename = "testdb",
101109
) as PowerSyncDatabaseImpl
102110

111+
@OptIn(DelicateCoroutinesApi::class)
103112
private fun syncStream(): SyncStream {
104-
val client = MockSyncService(syncLines)
113+
val client = MockSyncService(syncLines, { checkpointResponse() })
105114
return SyncStream(
106115
bucketStorage = database.bucketStorage,
107116
connector = connector,
108117
httpEngine = client,
109-
uploadCrud = { },
118+
uploadCrud = { connector.uploadData(database) },
110119
retryDelayMs = 10,
111120
logger = logger,
112121
params = JsonObject(emptyMap()),
122+
scope = GlobalScope,
113123
)
114124
}
115125

@@ -529,4 +539,99 @@ class SyncIntegrationTest {
529539
database.close()
530540
syncLines.close()
531541
}
542+
543+
@Test
544+
fun `handles checkpoints during uploads`() = runTest {
545+
database.connectInternal(syncStream(), 1000L)
546+
547+
suspend fun expectUserRows(amount: Int) {
548+
val row = database.get("SELECT COUNT(*) FROM users") { it.getLong(0)!! }
549+
assertEquals(amount, row.toInt())
550+
}
551+
552+
val completeUpload = CompletableDeferred<Unit>()
553+
val uploadStarted = CompletableDeferred<Unit>()
554+
everySuspend { connector.uploadData(any()) } calls { (db: PowerSyncDatabase) ->
555+
val batch = db.getCrudBatch()
556+
if (batch == null) return@calls
557+
558+
uploadStarted.complete(Unit)
559+
completeUpload.await()
560+
batch.complete.invoke(null)
561+
}
562+
563+
// Trigger an upload (adding a keep-alive sync line because the execute could start before the database is fully
564+
// connected).
565+
database.execute("INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", listOf("local", "local@example.org"))
566+
syncLines.send(SyncLine.KeepAlive(1234))
567+
expectUserRows(1)
568+
uploadStarted.await()
569+
570+
// Pretend that the connector takes forever in uploadData, but the data gets uploaded before the method returns.
571+
syncLines.send(SyncLine.FullCheckpoint(Checkpoint(
572+
writeCheckpoint = "1",
573+
lastOpId = "2",
574+
checksums = listOf(BucketChecksum("a", checksum = 0))
575+
)))
576+
turbineScope {
577+
val turbine = database.currentStatus.asFlow().testIn(this)
578+
turbine.waitFor { it.downloading }
579+
turbine.cancel()
580+
}
581+
582+
syncLines.send(SyncLine.SyncDataBucket(
583+
bucket = "a",
584+
data = listOf(
585+
OplogEntry(
586+
checksum = 0,
587+
opId = "1",
588+
op = OpType.PUT,
589+
rowId = "1",
590+
rowType = "users",
591+
data = """{"id": "test1", "name": "from local", "email": ""}"""
592+
),
593+
OplogEntry(
594+
checksum = 0,
595+
opId = "2",
596+
op = OpType.PUT,
597+
rowId = "2",
598+
rowType = "users",
599+
data = """{"id": "test1", "name": "additional entry", "email": ""}"""
600+
),
601+
),
602+
after = null,
603+
nextAfter = null,
604+
hasMore = false,
605+
))
606+
syncLines.send(SyncLine.CheckpointComplete(lastOpId = "2"))
607+
608+
// Despite receiving a valid checkpoint with two rows, it should not be visible because we have local data.
609+
waitFor {
610+
assertNotNull(
611+
logWriter.logs.find {
612+
it.message.contains("Could not apply checkpoint due to local data")
613+
},
614+
)
615+
}
616+
expectUserCount(1)
617+
618+
// Mark the upload as completed, this should trigger a write_checkpoint.json request
619+
val requestedCheckpoint = CompletableDeferred<Unit>()
620+
checkpointResponse = {
621+
requestedCheckpoint.complete(Unit)
622+
WriteCheckpointResponse(WriteCheckpointData(""))
623+
}
624+
completeUpload.complete(Unit)
625+
requestedCheckpoint.await()
626+
627+
// This should apply the checkpoint
628+
turbineScope {
629+
val turbine = database.currentStatus.asFlow().testIn(this)
630+
turbine.waitFor { !it.downloading }
631+
turbine.cancel()
632+
}
633+
634+
// Meaning that the two rows are now visible
635+
expectUserCount(2)
636+
}
532637
}

core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ internal class PowerSyncDatabaseImpl(
150150
retryDelayMs = retryDelayMs,
151151
logger = logger,
152152
params = params.toJsonObject(),
153+
scope = scope,
153154
),
154155
crudThrottleMs,
155156
)
@@ -222,7 +223,7 @@ internal class PowerSyncDatabaseImpl(
222223
.filter { it.contains(InternalTable.CRUD.toString()) }
223224
.throttle(crudThrottleMs)
224225
.collect {
225-
stream.triggerCrudUpload()
226+
stream.triggerCrudUploadAsync().join()
226227
}
227228
}
228229
}

core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt

Lines changed: 64 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.powersync.sync
22

33
import co.touchlab.kermit.Logger
4-
import co.touchlab.stately.concurrency.AtomicBoolean
4+
import co.touchlab.stately.concurrency.AtomicReference
55
import com.powersync.bucket.BucketChecksum
66
import com.powersync.bucket.BucketRequest
77
import com.powersync.bucket.BucketStorage
@@ -29,9 +29,13 @@ import io.ktor.http.contentType
2929
import io.ktor.utils.io.ByteReadChannel
3030
import io.ktor.utils.io.readUTF8Line
3131
import kotlinx.coroutines.CancellationException
32+
import kotlinx.coroutines.CompletableDeferred
33+
import kotlinx.coroutines.CoroutineScope
34+
import kotlinx.coroutines.Job
3235
import kotlinx.coroutines.delay
3336
import kotlinx.coroutines.flow.Flow
3437
import kotlinx.coroutines.flow.flow
38+
import kotlinx.coroutines.launch
3539
import kotlinx.datetime.Clock
3640
import kotlinx.serialization.encodeToString
3741
import kotlinx.serialization.json.JsonObject
@@ -43,9 +47,10 @@ internal class SyncStream(
4347
private val retryDelayMs: Long = 5000L,
4448
private val logger: Logger,
4549
private val params: JsonObject,
50+
private val scope: CoroutineScope,
4651
httpEngine: HttpClientEngine? = null,
4752
) {
48-
private var isUploadingCrud = AtomicBoolean(false)
53+
private var isUploadingCrud = AtomicReference<PendingCrudUpload?>(null)
4954

5055
/**
5156
* The current sync status. This instance is updated as changes occur
@@ -116,13 +121,18 @@ internal class SyncStream(
116121
}
117122
}
118123

119-
suspend fun triggerCrudUpload() {
120-
if (!status.connected || isUploadingCrud.value) {
121-
return
124+
fun triggerCrudUploadAsync(): Job = scope.launch {
125+
val thisIteration = PendingCrudUpload(CompletableDeferred())
126+
try {
127+
if (!status.connected || !isUploadingCrud.compareAndSet(null, thisIteration)) {
128+
return@launch
129+
}
130+
131+
uploadAllCrud()
132+
} finally {
133+
isUploadingCrud.set(null)
134+
thisIteration.done.complete(Unit)
122135
}
123-
isUploadingCrud.value = true
124-
uploadAllCrud()
125-
isUploadingCrud.value = false
126136
}
127137

128138
private suspend fun uploadAllCrud() {
@@ -153,8 +163,13 @@ internal class SyncStream(
153163
break
154164
}
155165
} catch (e: Exception) {
156-
logger.e { "Error uploading crud: ${e.message}" }
157166
status.update(uploading = false, uploadError = e)
167+
168+
if (e is CancellationException) {
169+
throw e
170+
}
171+
172+
logger.e { "Error uploading crud: ${e.message}" }
158173
delay(retryDelayMs)
159174
break
160175
}
@@ -237,7 +252,6 @@ internal class SyncStream(
237252
validatedCheckpoint = null,
238253
appliedCheckpoint = null,
239254
bucketSet = initialBuckets.keys.toMutableSet(),
240-
retry = false,
241255
)
242256

243257
bucketEntries.forEach { entry ->
@@ -253,7 +267,12 @@ internal class SyncStream(
253267

254268
streamingSyncRequest(req).collect { value ->
255269
val line = JsonUtil.json.decodeFromString<SyncLine>(value)
270+
256271
state = handleInstruction(line, value, state)
272+
273+
if (state.abortIteration) {
274+
return@collect
275+
}
257276
}
258277

259278
status.update(downloading = false)
@@ -314,30 +333,40 @@ internal class SyncStream(
314333
}
315334

316335
private suspend fun handleStreamingSyncCheckpointComplete(state: SyncStreamState): SyncStreamState {
317-
val result = bucketStorage.syncLocalDatabase(state.targetCheckpoint!!)
336+
val checkpoint = state.targetCheckpoint!!
337+
var result = bucketStorage.syncLocalDatabase(checkpoint)
338+
val pending = isUploadingCrud.get()
339+
318340
if (!result.checkpointValid) {
319341
// This means checksums failed. Start again with a new checkpoint.
320342
// TODO: better back-off
321343
delay(50)
322-
state.retry = true
344+
state.abortIteration = true
323345
// TODO handle retries
324346
return state
325-
} else if (!result.ready) {
326-
// Checksums valid, but need more data for a consistent checkpoint.
327-
// Continue waiting.
328-
// landing here the whole time
329-
} else {
330-
state.appliedCheckpoint = state.targetCheckpoint!!.clone()
331-
logger.i { "validated checkpoint ${state.appliedCheckpoint}" }
347+
} else if (!result.ready && pending != null) {
348+
// We have pending entries in the local upload queue or are waiting to confirm a write checkpoint, which
349+
// prevented this checkpoint from applying. Wait for that to complete and try again.
350+
logger.d { "Could not apply checkpoint due to local data. Waiting for in-progress upload before retrying." }
351+
pending.done.await()
352+
353+
result = bucketStorage.syncLocalDatabase(checkpoint)
332354
}
333355

334-
state.validatedCheckpoint = state.targetCheckpoint
335-
status.update(
336-
lastSyncedAt = Clock.System.now(),
337-
downloading = false,
338-
hasSynced = true,
339-
clearDownloadError = true,
340-
)
356+
if (result.checkpointValid && result.ready) {
357+
state.appliedCheckpoint = checkpoint.clone()
358+
logger.i { "validated checkpoint ${state.appliedCheckpoint}" }
359+
360+
state.validatedCheckpoint = state.targetCheckpoint
361+
status.update(
362+
lastSyncedAt = Clock.System.now(),
363+
downloading = false,
364+
hasSynced = true,
365+
clearDownloadError = true,
366+
)
367+
} else {
368+
logger.d { "Could not apply checkpoint. Waiting for next sync complete line" }
369+
}
341370

342371
return state
343372
}
@@ -352,12 +381,12 @@ internal class SyncStream(
352381
// This means checksums failed. Start again with a new checkpoint.
353382
// TODO: better back-off
354383
delay(50)
355-
state.retry = true
384+
state.abortIteration = true
356385
// TODO handle retries
357386
return state
358387
} else if (!result.ready) {
359-
// Checksums valid, but need more data for a consistent checkpoint.
360-
// Continue waiting.
388+
// Checkpoint is valid, but we have local data preventing this to be published. We'll try to resolve this
389+
// once we have a complete checkpoint if the problem persists.
361390
} else {
362391
logger.i { "validated partial checkpoint ${state.appliedCheckpoint} up to priority of $priority" }
363392
}
@@ -441,10 +470,11 @@ internal class SyncStream(
441470
// Connection would be closed automatically right after this
442471
logger.i { "Token expiring reconnect" }
443472
connector.invalidateCredentials()
444-
state.retry = true
473+
state.abortIteration = true
445474
return state
446475
}
447-
triggerCrudUpload()
476+
// Don't await the upload job, we can keep receiving sync lines
477+
triggerCrudUploadAsync()
448478
return state
449479
}
450480
}
@@ -454,5 +484,7 @@ internal data class SyncStreamState(
454484
var validatedCheckpoint: Checkpoint?,
455485
var appliedCheckpoint: Checkpoint?,
456486
var bucketSet: MutableSet<String>?,
457-
var retry: Boolean,
487+
var abortIteration: Boolean = false
458488
)
489+
490+
private class PendingCrudUpload(val done: CompletableDeferred<Unit>)

0 commit comments

Comments
 (0)