Skip to content

Commit c331ce1

Browse files
committed
Fix some tests
1 parent 631db90 commit c331ce1

File tree

5 files changed

+97
-50
lines changed

5 files changed

+97
-50
lines changed

core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ import com.powersync.utils.JsonUtil
2727
import dev.mokkery.answering.returns
2828
import dev.mokkery.everySuspend
2929
import dev.mokkery.mock
30-
import kotlinx.coroutines.CoroutineScope
30+
import kotlinx.coroutines.CompletableDeferred
31+
import kotlinx.coroutines.DelicateCoroutinesApi
3132
import kotlinx.coroutines.channels.Channel
32-
import kotlinx.coroutines.flow.receiveAsFlow
3333
import kotlinx.coroutines.runBlocking
3434
import kotlinx.coroutines.test.runTest
3535
import kotlinx.serialization.encodeToString
@@ -99,8 +99,8 @@ class SyncIntegrationTest {
9999
dbFilename = "testdb",
100100
) as PowerSyncDatabaseImpl
101101

102-
private fun CoroutineScope.syncStream(): SyncStream {
103-
val client = MockSyncService.client(this, syncLines.receiveAsFlow())
102+
private fun syncStream(): SyncStream {
103+
val client = MockSyncService(syncLines)
104104
return SyncStream(
105105
bucketStorage = database.bucketStorage,
106106
connector = connector,
@@ -117,6 +117,27 @@ class SyncIntegrationTest {
117117
assertEquals(amount, users.size, "Expected $amount users, got $users")
118118
}
119119

120+
@Test
121+
@OptIn(DelicateCoroutinesApi::class)
122+
fun closesResponseStreamOnDisconnect() = runTest {
123+
val syncStream = syncStream()
124+
database.connectInternal(syncStream, 1000L)
125+
126+
turbineScope(timeout = 10.0.seconds) {
127+
val turbine = database.currentStatus.asFlow().testIn(this)
128+
turbine.waitFor { it.connected }
129+
130+
database.close()
131+
turbine.waitFor { !it.connected }
132+
turbine.cancel()
133+
}
134+
135+
// Closing the database should close the channel
136+
val channelClose = CompletableDeferred<Unit>()
137+
syncLines.invokeOnClose { channelClose.complete(Unit) }
138+
channelClose.await()
139+
}
140+
120141
@Test
121142
fun testPartialSync() =
122143
runTest {

core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@ internal class PowerSyncDatabaseImpl(
9696
override val currentStatus: SyncStatus = SyncStatus()
9797

9898
private val mutex = Mutex()
99-
private var syncStream: SyncStream? = null
10099
private var syncSupervisorJob: Job? = null
101100

102101
// This is set in the init
@@ -123,7 +122,7 @@ internal class PowerSyncDatabaseImpl(
123122
override suspend fun updateSchema(schema: Schema) =
124123
runWrappedSuspending {
125124
mutex.withLock {
126-
if (this.syncStream != null) {
125+
if (this.syncSupervisorJob != null) {
127126
throw PowerSyncException(
128127
"Cannot update schema while connected",
129128
cause = Exception("PowerSync client is already connected"),
@@ -161,8 +160,6 @@ internal class PowerSyncDatabaseImpl(
161160
stream: SyncStream,
162161
crudThrottleMs: Long,
163162
) {
164-
this.syncStream = stream
165-
166163
val db = this
167164
val job = SupervisorJob(scope.coroutineContext[Job])
168165
syncSupervisorJob = job
@@ -195,7 +192,7 @@ internal class PowerSyncDatabaseImpl(
195192
// We have a lock if we reached here
196193
try {
197194
ensureActive()
198-
syncStream!!.streamingSync()
195+
stream.streamingSync()
199196
} finally {
200197
streamMutex.unlock(db)
201198
}
@@ -225,10 +222,16 @@ internal class PowerSyncDatabaseImpl(
225222
.filter { it.contains(InternalTable.CRUD.toString()) }
226223
.throttle(crudThrottleMs)
227224
.collect {
228-
syncStream!!.triggerCrudUpload()
225+
stream.triggerCrudUpload()
229226
}
230227
}
231228
}
229+
230+
job.invokeOnCompletion {
231+
if (it is DisconnectRequestedException) {
232+
stream.invalidateCredentials()
233+
}
234+
}
232235
}
233236

234237
override suspend fun getCrudBatch(limit: Int): CrudBatch? {
@@ -367,16 +370,12 @@ internal class PowerSyncDatabaseImpl(
367370
private suspend fun disconnectInternal() {
368371
val syncJob = syncSupervisorJob
369372
if (syncJob != null && syncJob.isActive) {
370-
syncJob.cancel(CancellationException("disconnect() called"))
373+
// Using this exception type will also make the sync job invalidate credentials.
374+
syncJob.cancel(DisconnectRequestedException)
371375
syncJob.join()
372376
syncSupervisorJob = null
373377
}
374378

375-
if (syncStream != null) {
376-
syncStream?.invalidateCredentials()
377-
syncStream = null
378-
}
379-
380379
currentStatus.update(
381380
connected = false,
382381
connecting = false,
@@ -488,3 +487,5 @@ internal class PowerSyncDatabaseImpl(
488487
}
489488
}
490489
}
490+
491+
internal object DisconnectRequestedException: CancellationException("disconnect() called")

core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import com.powersync.connectors.PowerSyncBackendConnector
55
import kotlinx.coroutines.flow.MutableStateFlow
66
import kotlinx.coroutines.flow.SharedFlow
77
import kotlinx.coroutines.flow.asSharedFlow
8+
import kotlinx.coroutines.flow.cancel
89
import kotlinx.datetime.Instant
910

1011
@ConsistentCopyVisibility

core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ class SyncStreamTest {
210210
// TODO: It would be neat if we could use in-memory sqlite instances instead of mocking everything
211211
// Revisit https://github.com/powersync-ja/powersync-kotlin/pull/117/files at some point
212212
val syncLines = Channel<SyncLine>()
213-
val client = MockSyncService.client(this, syncLines.receiveAsFlow())
213+
val client = MockSyncService(syncLines)
214214

215215
syncStream =
216216
SyncStream(

core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt

Lines changed: 57 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -4,53 +4,77 @@ import app.cash.turbine.ReceiveTurbine
44
import com.powersync.sync.SyncLine
55
import com.powersync.sync.SyncStatusData
66
import com.powersync.utils.JsonUtil
7-
import io.ktor.client.engine.HttpClientEngine
8-
import io.ktor.client.engine.mock.MockEngine
9-
import io.ktor.client.engine.mock.MockRequestHandleScope
10-
import io.ktor.client.engine.mock.respond
11-
import io.ktor.client.engine.mock.respondBadRequest
7+
import io.ktor.client.engine.HttpClientEngineBase
8+
import io.ktor.client.engine.HttpClientEngineCapability
9+
import io.ktor.client.engine.HttpClientEngineConfig
10+
import io.ktor.client.engine.callContext
11+
import io.ktor.client.plugins.HttpTimeoutCapability
1212
import io.ktor.client.request.HttpRequestData
1313
import io.ktor.client.request.HttpResponseData
14-
import io.ktor.utils.io.ByteChannel
14+
import io.ktor.http.HttpProtocolVersion
15+
import io.ktor.http.HttpStatusCode
16+
import io.ktor.http.headersOf
17+
import io.ktor.util.date.GMTDate
18+
import io.ktor.utils.io.InternalAPI
1519
import io.ktor.utils.io.writeStringUtf8
20+
import io.ktor.utils.io.writer
1621
import kotlinx.coroutines.CoroutineScope
17-
import kotlinx.coroutines.flow.Flow
18-
import kotlinx.coroutines.launch
22+
import kotlinx.coroutines.channels.ReceiveChannel
23+
import kotlinx.coroutines.channels.consumeEach
1924
import kotlinx.serialization.encodeToString
2025

21-
internal class MockSyncService private constructor(
22-
private val scope: CoroutineScope,
23-
private val lines: Flow<SyncLine>,
24-
) {
25-
private fun handleRequest(
26-
scope: MockRequestHandleScope,
27-
request: HttpRequestData,
28-
): HttpResponseData =
29-
if (request.url.encodedPath == "/sync/stream") {
30-
val channel = ByteChannel(autoFlush = true)
31-
this.scope.launch {
32-
lines.collect {
26+
/**
27+
* A mock HTTP engine providing sync lines read from a coroutines [ReceiveChannel].
28+
*
29+
* Note that we can't trivially use ktor's `MockEngine` here because that engine requires a non-suspending handler
30+
* function which makes it very hard to cancel the channel when the sync client closes the request stream. That is
31+
* precisely what we may want to test though.
32+
*/
33+
internal class MockSyncService(
34+
private val lines: ReceiveChannel<SyncLine>,
35+
) : HttpClientEngineBase("sync-service") {
36+
37+
override val config: HttpClientEngineConfig
38+
get() = Config
39+
40+
override val supportedCapabilities: Set<HttpClientEngineCapability<out Any>> = setOf(
41+
HttpTimeoutCapability,
42+
)
43+
44+
@OptIn(InternalAPI::class)
45+
override suspend fun execute(data: HttpRequestData): HttpResponseData {
46+
val context = callContext()
47+
val scope = CoroutineScope(context)
48+
49+
return if (data.url.encodedPath == "/sync/stream") {
50+
val job = scope.writer(autoFlush = true) {
51+
lines.consumeEach {
3352
val serializedLine = JsonUtil.json.encodeToString(it)
3453
channel.writeStringUtf8("$serializedLine\n")
3554
}
3655
}
3756

38-
scope.respond(channel)
57+
HttpResponseData(
58+
HttpStatusCode.OK,
59+
GMTDate(),
60+
headersOf(),
61+
HttpProtocolVersion.HTTP_1_1,
62+
job.channel,
63+
context,
64+
)
3965
} else {
40-
scope.respondBadRequest()
41-
}
42-
43-
companion object {
44-
fun client(
45-
scope: CoroutineScope,
46-
lines: Flow<SyncLine>,
47-
): HttpClientEngine {
48-
val service = MockSyncService(scope, lines)
49-
return MockEngine { request ->
50-
service.handleRequest(this, request)
51-
}
66+
HttpResponseData(
67+
HttpStatusCode.BadRequest,
68+
GMTDate(),
69+
headersOf(),
70+
HttpProtocolVersion.HTTP_1_1,
71+
"",
72+
context,
73+
)
5274
}
5375
}
76+
77+
private object Config : HttpClientEngineConfig()
5478
}
5579

5680
suspend inline fun ReceiveTurbine<SyncStatusData>.waitFor(matcher: (SyncStatusData) -> Boolean) {

0 commit comments

Comments
 (0)