Skip to content

Commit 6788119

Browse files
committed
Fix flushing
1 parent c331ce1 commit 6788119

File tree

2 files changed

+47
-7
lines changed

2 files changed

+47
-7
lines changed

core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import com.powersync.utils.JsonUtil
2727
import dev.mokkery.answering.returns
2828
import dev.mokkery.everySuspend
2929
import dev.mokkery.mock
30-
import kotlinx.coroutines.CompletableDeferred
30+
import dev.mokkery.verify
3131
import kotlinx.coroutines.DelicateCoroutinesApi
3232
import kotlinx.coroutines.channels.Channel
3333
import kotlinx.coroutines.runBlocking
@@ -38,6 +38,7 @@ import kotlin.test.AfterTest
3838
import kotlin.test.BeforeTest
3939
import kotlin.test.Test
4040
import kotlin.test.assertEquals
41+
import kotlin.test.assertFailsWith
4142
import kotlin.test.assertFalse
4243
import kotlin.test.assertNotNull
4344
import kotlin.test.assertTrue
@@ -119,7 +120,7 @@ class SyncIntegrationTest {
119120

120121
@Test
121122
@OptIn(DelicateCoroutinesApi::class)
122-
fun closesResponseStreamOnDisconnect() = runTest {
123+
fun closesResponseStreamOnDatabaseClose() = runTest {
123124
val syncStream = syncStream()
124125
database.connectInternal(syncStream, 1000L)
125126

@@ -132,10 +133,48 @@ class SyncIntegrationTest {
132133
turbine.cancel()
133134
}
134135

135-
// Closing the database should close the channel
136-
val channelClose = CompletableDeferred<Unit>()
137-
syncLines.invokeOnClose { channelClose.complete(Unit) }
138-
channelClose.await()
136+
// Closing the database should have closed the channel
137+
assertTrue { syncLines.isClosedForSend }
138+
}
139+
140+
@Test
141+
@OptIn(DelicateCoroutinesApi::class)
142+
fun cleansResourcesOnDisconnect() = runTest {
143+
val syncStream = syncStream()
144+
database.connectInternal(syncStream, 1000L)
145+
146+
turbineScope(timeout = 10.0.seconds) {
147+
val turbine = database.currentStatus.asFlow().testIn(this)
148+
turbine.waitFor { it.connected }
149+
150+
database.disconnect()
151+
turbine.waitFor { !it.connected }
152+
turbine.cancel()
153+
}
154+
155+
// Disconnecting should have closed the channel
156+
assertTrue { syncLines.isClosedForSend }
157+
158+
// And called invalidateCredentials on the connector
159+
verify { connector.invalidateCredentials() }
160+
}
161+
162+
@Test
163+
fun cannotUpdateSchemaWhileConnected() = runTest {
164+
val syncStream = syncStream()
165+
database.connectInternal(syncStream, 1000L)
166+
167+
turbineScope(timeout = 10.0.seconds) {
168+
val turbine = database.currentStatus.asFlow().testIn(this)
169+
turbine.waitFor { it.connected }
170+
turbine.cancel()
171+
}
172+
173+
assertFailsWith<PowerSyncException>("Cannot update schema while connected") {
174+
database.updateSchema(Schema())
175+
}
176+
177+
database.close()
139178
}
140179

141180
@Test

core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,11 @@ internal class MockSyncService(
4747
val scope = CoroutineScope(context)
4848

4949
return if (data.url.encodedPath == "/sync/stream") {
50-
val job = scope.writer(autoFlush = true) {
50+
val job = scope.writer {
5151
lines.consumeEach {
5252
val serializedLine = JsonUtil.json.encodeToString(it)
5353
channel.writeStringUtf8("$serializedLine\n")
54+
channel.flush()
5455
}
5556
}
5657

0 commit comments

Comments
 (0)