Skip to content

Commit 2ce69aa

Browse files
committed
Add sync stream test
1 parent 95acd57 commit 2ce69aa

File tree

6 files changed

+225
-20
lines changed

6 files changed

+225
-20
lines changed

core/build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,8 @@ kotlin {
235235
implementation(libs.test.coroutines)
236236
implementation(libs.test.turbine)
237237
implementation(libs.kermit.test)
238+
implementation(libs.ktor.client.mock)
239+
implementation(libs.test.turbine)
238240
}
239241

240242
// We're putting the native libraries into our JAR, so integration tests for the JVM can run as part of the unit

core/src/commonMain/kotlin/com/powersync/bucket/BucketPriority.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@ import kotlin.jvm.JvmInline
66
@JvmInline
77
@Serializable
88
public value class BucketPriority(private val priorityCode: Int): Comparable<BucketPriority> {
9+
init {
10+
require(priorityCode >= 0)
11+
}
12+
913
override fun compareTo(other: BucketPriority): Int {
1014
return other.priorityCode.compareTo(priorityCode)
1115
}

core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ import com.powersync.bucket.WriteCheckpointResponse
1010
import com.powersync.connectors.PowerSyncBackendConnector
1111
import com.powersync.db.crud.CrudEntry
1212
import com.powersync.utils.JsonUtil
13-
import io.ktor.client.HttpClient
13+
import io.ktor.client.*
1414
import io.ktor.client.call.body
15+
import io.ktor.client.engine.*
1516
import io.ktor.client.plugins.HttpTimeout
1617
import io.ktor.client.plugins.contentnegotiation.ContentNegotiation
1718
import io.ktor.client.plugins.timeout
@@ -45,6 +46,7 @@ internal class SyncStream(
4546
private val retryDelayMs: Long = 5000L,
4647
private val logger: Logger,
4748
private val params: JsonObject,
49+
httpEngine: HttpClientEngine? = null,
4850
) {
4951
private var isUploadingCrud = AtomicBoolean(false)
5052

@@ -55,12 +57,25 @@ internal class SyncStream(
5557

5658
private var clientId: String? = null
5759

58-
private val httpClient: HttpClient =
59-
HttpClient {
60+
private val httpClient: HttpClient
61+
62+
init {
63+
fun HttpClientConfig<*>.configureClient() {
6064
install(HttpTimeout)
6165
install(ContentNegotiation)
6266
}
6367

68+
httpClient = if (httpEngine == null) {
69+
HttpClient {
70+
configureClient()
71+
}
72+
} else {
73+
HttpClient(httpEngine) {
74+
configureClient()
75+
}
76+
}
77+
}
78+
6479
fun invalidateCredentials() {
6580
connector.invalidateCredentials()
6681
}
@@ -239,16 +254,18 @@ internal class SyncStream(
239254
)
240255

241256
streamingSyncRequest(req).collect { value ->
242-
state = handleInstruction(value, state)
257+
val line = JsonUtil.json.decodeFromString<SyncLine>(value)
258+
state = handleInstruction(line, value, state)
243259
}
244260

245261
return state
246262
}
247263

248264
private suspend fun handleInstruction(
265+
line: SyncLine,
249266
jsonString: String,
250267
state: SyncStreamState,
251-
): SyncStreamState = when (val line = JsonUtil.json.decodeFromString<SyncLine>(jsonString)) {
268+
): SyncStreamState = when (line) {
252269
is SyncLine.FullCheckpoint -> handleStreamingSyncCheckpoint(line, state)
253270
is SyncLine.CheckpointDiff -> handleStreamingSyncCheckpointDiff(line, state)
254271
is SyncLine.CheckpointComplete -> handleStreamingSyncCheckpointComplete(state)

core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt

Lines changed: 142 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,40 @@
11
package com.powersync.sync
22

3+
import app.cash.turbine.turbineScope
34
import co.touchlab.kermit.Logger
45
import co.touchlab.kermit.Severity
56
import co.touchlab.kermit.TestConfig
67
import co.touchlab.kermit.TestLogWriter
8+
import com.powersync.bucket.*
9+
import com.powersync.bucket.BucketChecksum
710
import com.powersync.bucket.BucketStorage
11+
import com.powersync.bucket.Checkpoint
12+
import com.powersync.bucket.OplogEntry
813
import com.powersync.connectors.PowerSyncBackendConnector
914
import com.powersync.connectors.PowerSyncCredentials
1015
import com.powersync.db.crud.CrudEntry
1116
import com.powersync.db.crud.UpdateType
17+
import com.powersync.testutils.MockSyncService
18+
import com.powersync.testutils.waitFor
19+
import com.powersync.utils.JsonUtil
20+
import dev.mokkery.*
1221
import dev.mokkery.answering.returns
13-
import dev.mokkery.everySuspend
14-
import dev.mokkery.mock
15-
import dev.mokkery.verify
22+
import dev.mokkery.matcher.any
23+
import dev.mokkery.verify.VerifyMode.Companion.order
24+
import io.ktor.client.engine.mock.*
25+
import kotlinx.coroutines.channels.Channel
1626
import kotlinx.coroutines.delay
27+
import kotlinx.coroutines.flow.receiveAsFlow
1728
import kotlinx.coroutines.launch
1829
import kotlinx.coroutines.test.runTest
1930
import kotlinx.coroutines.withTimeout
31+
import kotlinx.serialization.encodeToString
2032
import kotlinx.serialization.json.JsonObject
2133
import kotlin.test.BeforeTest
2234
import kotlin.test.Test
2335
import kotlin.test.assertContains
2436
import kotlin.test.assertEquals
37+
import kotlin.time.Duration.Companion.seconds
2538

2639
@OptIn(co.touchlab.kermit.ExperimentalKermitApi::class)
2740
class SyncStreamTest {
@@ -39,11 +52,33 @@ class SyncStreamTest {
3952
logWriterList = listOf(testLogWriter),
4053
),
4154
)
55+
private val assertNoHttpEngine = MockEngine { request ->
56+
error("Unexpected HTTP request: $request")
57+
}
4258

4359
@BeforeTest
4460
fun setup() {
45-
bucketStorage = mock<BucketStorage>()
46-
connector = mock<PowerSyncBackendConnector>()
61+
bucketStorage = mock<BucketStorage> {
62+
everySuspend { getClientId() } returns "test-client-id"
63+
everySuspend { getBucketStates() } returns emptyList()
64+
everySuspend { removeBuckets(any()) } returns Unit
65+
everySuspend { setTargetCheckpoint(any()) } returns Unit
66+
everySuspend { saveSyncData(any()) } returns Unit
67+
everySuspend { syncLocalDatabase(any(), any()) } returns SyncLocalDatabaseResult(
68+
ready = true,
69+
checkpointValid = true,
70+
checkpointFailures = emptyList()
71+
)
72+
}
73+
connector =
74+
mock<PowerSyncBackendConnector> {
75+
everySuspend { getCredentialsCached() } returns
76+
PowerSyncCredentials(
77+
token = "test-token",
78+
userId = "test-user",
79+
endpoint = "https://test.com",
80+
)
81+
}
4782
}
4883

4984
@Test
@@ -58,6 +93,7 @@ class SyncStreamTest {
5893
SyncStream(
5994
bucketStorage = bucketStorage,
6095
connector = connector,
96+
httpEngine = assertNoHttpEngine,
6197
uploadCrud = {},
6298
logger = logger,
6399
params = JsonObject(emptyMap()),
@@ -92,6 +128,7 @@ class SyncStreamTest {
92128
SyncStream(
93129
bucketStorage = bucketStorage,
94130
connector = connector,
131+
httpEngine = assertNoHttpEngine,
95132
uploadCrud = { },
96133
retryDelayMs = 10,
97134
logger = logger,
@@ -126,20 +163,11 @@ class SyncStreamTest {
126163
everySuspend { getBucketStates() } returns emptyList()
127164
}
128165

129-
connector =
130-
mock<PowerSyncBackendConnector> {
131-
everySuspend { getCredentialsCached() } returns
132-
PowerSyncCredentials(
133-
token = "test-token",
134-
userId = "test-user",
135-
endpoint = "https://test.com",
136-
)
137-
}
138-
139166
syncStream =
140167
SyncStream(
141168
bucketStorage = bucketStorage,
142169
connector = connector,
170+
httpEngine = assertNoHttpEngine,
143171
uploadCrud = { },
144172
retryDelayMs = 10,
145173
logger = logger,
@@ -166,4 +194,103 @@ class SyncStreamTest {
166194
// Clean up
167195
job.cancel()
168196
}
197+
198+
@Test
199+
fun testPartialSync() = runTest {
200+
// TODO: It would be neat if we could use in-memory sqlite instances instead of mocking everything
201+
// Revisit https://github.com/powersync-ja/powersync-kotlin/pull/117/files at some point
202+
val syncLines = Channel<SyncLine>()
203+
val client = MockSyncService.client(this, syncLines.receiveAsFlow())
204+
205+
syncStream = SyncStream(
206+
bucketStorage = bucketStorage,
207+
connector = connector,
208+
httpEngine = client,
209+
uploadCrud = { },
210+
retryDelayMs = 10,
211+
logger = logger,
212+
params = JsonObject(emptyMap()),
213+
)
214+
215+
val job = launch { syncStream.streamingSync() }
216+
var operationId = 1
217+
218+
suspend fun pushData(priority: Int) {
219+
val id = operationId++
220+
221+
syncLines.send(SyncLine.SyncDataBucket(
222+
bucket = "prio$priority",
223+
data = listOf(OplogEntry(
224+
checksum = (priority + 10).toLong(),
225+
data = JsonUtil.json.encodeToString(mapOf("foo" to "bar")),
226+
op = OpType.PUT,
227+
opId = id.toString(),
228+
rowId = "prio$priority",
229+
rowType = "customers"
230+
)),
231+
after = null,
232+
nextAfter = null,
233+
))
234+
}
235+
236+
turbineScope(timeout=10.0.seconds) {
237+
val turbine = syncStream.status.asFlow().testIn(this)
238+
turbine.waitFor { it.connected }
239+
resetCalls(bucketStorage)
240+
241+
// Start a sync flow
242+
syncLines.send(SyncLine.FullCheckpoint(Checkpoint(
243+
lastOpId = "4",
244+
checksums = buildList {
245+
for (priority in 0..3) {
246+
add(BucketChecksum(
247+
bucket = "prio$priority",
248+
priority = BucketPriority(priority),
249+
checksum = 10 + priority,
250+
))
251+
}
252+
}
253+
)))
254+
255+
// Emit a partial sync complete for each priority but the last.
256+
for (priorityNo in 0..<3) {
257+
val priority = BucketPriority(priorityNo)
258+
pushData(priorityNo)
259+
syncLines.send(SyncLine.CheckpointPartiallyComplete(
260+
lastOpId = operationId.toString(),
261+
priority = priority,
262+
))
263+
264+
turbine.waitFor { it.priorityStatusFor(priority).hasSynced == true }
265+
266+
verifySuspend(order) {
267+
if (priorityNo == 0) {
268+
bucketStorage.removeBuckets(any())
269+
bucketStorage.setTargetCheckpoint(any())
270+
}
271+
272+
bucketStorage.saveSyncData(any())
273+
bucketStorage.syncLocalDatabase(any(), priority)
274+
}
275+
}
276+
277+
// Then complete the sync
278+
pushData(3)
279+
syncLines.send(SyncLine.CheckpointComplete(
280+
lastOpId = operationId.toString(),
281+
))
282+
283+
turbine.waitFor { it.hasSynced == true }
284+
verifySuspend {
285+
bucketStorage.saveSyncData(any())
286+
bucketStorage.syncLocalDatabase(any(), null)
287+
}
288+
289+
turbine.cancel()
290+
}
291+
292+
verifyNoMoreCalls(bucketStorage)
293+
job.cancel()
294+
syncLines.close()
295+
}
169296
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package com.powersync.testutils
2+
3+
import app.cash.turbine.ReceiveTurbine
4+
import com.powersync.sync.SyncLine
5+
import com.powersync.sync.SyncStatusData
6+
import com.powersync.utils.JsonUtil
7+
import io.ktor.client.engine.*
8+
import io.ktor.client.engine.mock.*
9+
import io.ktor.client.request.*
10+
import io.ktor.utils.io.*
11+
import kotlinx.coroutines.CoroutineScope
12+
import kotlinx.coroutines.flow.Flow
13+
import kotlinx.coroutines.launch
14+
import kotlinx.serialization.encodeToString
15+
16+
internal class MockSyncService private constructor(private val scope: CoroutineScope, private val lines: Flow<SyncLine>) {
17+
private fun handleRequest(scope: MockRequestHandleScope, request: HttpRequestData): HttpResponseData {
18+
return if (request.url.encodedPath == "/sync/stream") {
19+
val channel = ByteChannel(autoFlush = true)
20+
this.scope.launch {
21+
lines.collect {
22+
val serializedLine = JsonUtil.json.encodeToString(it)
23+
channel.writeStringUtf8("$serializedLine\n")
24+
}
25+
}
26+
27+
scope.respond(channel)
28+
} else {
29+
scope.respondBadRequest()
30+
}
31+
}
32+
33+
companion object {
34+
fun client(scope: CoroutineScope, lines: Flow<SyncLine>): HttpClientEngine {
35+
val service = MockSyncService(scope, lines)
36+
return MockEngine { request ->
37+
service.handleRequest(this, request)
38+
}
39+
}
40+
}
41+
}
42+
43+
public suspend inline fun ReceiveTurbine<SyncStatusData>.waitFor(matcher: (SyncStatusData) -> Boolean) {
44+
while (true) {
45+
val item = awaitItem()
46+
if (matcher(item)) {
47+
break
48+
}
49+
50+
item.anyError?.let {
51+
error("Unexpected error in $item")
52+
}
53+
}
54+
}

gradle/libs.versions.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ ktor-client-core = { module = "io.ktor:ktor-client-core", version.ref = "ktor" }
7272
ktor-client-ios = { module = "io.ktor:ktor-client-darwin", version.ref = "ktor" }
7373
ktor-client-okhttp = { module = "io.ktor:ktor-client-okhttp", version.ref = "ktor" }
7474
ktor-client-contentnegotiation = { module = "io.ktor:ktor-client-content-negotiation", version.ref = "ktor" }
75+
ktor-client-mock = { module = "io.ktor:ktor-client-mock", version.ref = "ktor" }
7576
ktor-serialization-json = { module = "io.ktor:ktor-serialization-kotlinx-json", version.ref = "ktor" }
7677
kotlinx-coroutines-core = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core", version.ref = "coroutines" }
7778

0 commit comments

Comments
 (0)