Skip to content

Commit 4ec18ba

Browse files
committed
Use single supervisor job for sync
1 parent 59cba5c commit 4ec18ba

File tree

1 file changed

+30
-31
lines changed

1 file changed

+30
-31
lines changed

core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt

Lines changed: 30 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import com.powersync.utils.toJsonObject
2727
import kotlinx.coroutines.CoroutineScope
2828
import kotlinx.coroutines.FlowPreview
2929
import kotlinx.coroutines.Job
30+
import kotlinx.coroutines.SupervisorJob
3031
import kotlinx.coroutines.cancelAndJoin
3132
import kotlinx.coroutines.ensureActive
3233
import kotlinx.coroutines.flow.Flow
@@ -96,8 +97,7 @@ internal class PowerSyncDatabaseImpl(
9697

9798
private val mutex = Mutex()
9899
private var syncStream: SyncStream? = null
99-
private var syncJob: Job? = null
100-
private var uploadJob: Job? = null
100+
private var syncSupervisorJob: Job? = null
101101

102102
// This is set in the init
103103
private lateinit var powerSyncVersion: String
@@ -164,9 +164,10 @@ internal class PowerSyncDatabaseImpl(
164164
this.syncStream = stream
165165

166166
val db = this
167-
168-
syncJob =
169-
scope.launch {
167+
val job = SupervisorJob()
168+
syncSupervisorJob = job
169+
scope.launch(job) {
170+
launch {
170171
// Get a global lock for checking mutex maps
171172
val streamMutex = resource.group.syncMutex
172173

@@ -181,7 +182,7 @@ internal class PowerSyncDatabaseImpl(
181182
// (The tryLock should throw if this client already holds the lock).
182183
logger.w(streamConflictMessage)
183184
}
184-
} catch (ex: IllegalStateException) {
185+
} catch (_: IllegalStateException) {
185186
logger.e { "The streaming sync client did not disconnect before connecting" }
186187
}
187188

@@ -200,26 +201,25 @@ internal class PowerSyncDatabaseImpl(
200201
}
201202
}
202203

203-
scope.launch {
204-
syncStream!!.status.asFlow().collect {
205-
currentStatus.update(
206-
connected = it.connected,
207-
connecting = it.connecting,
208-
uploading = it.uploading,
209-
downloading = it.downloading,
210-
lastSyncedAt = it.lastSyncedAt,
211-
hasSynced = it.hasSynced,
212-
uploadError = it.uploadError,
213-
downloadError = it.downloadError,
214-
clearDownloadError = it.downloadError == null,
215-
clearUploadError = it.uploadError == null,
216-
priorityStatusEntries = it.priorityStatusEntries,
217-
)
204+
launch {
205+
stream.status.asFlow().collect {
206+
currentStatus.update(
207+
connected = it.connected,
208+
connecting = it.connecting,
209+
uploading = it.uploading,
210+
downloading = it.downloading,
211+
lastSyncedAt = it.lastSyncedAt,
212+
hasSynced = it.hasSynced,
213+
uploadError = it.uploadError,
214+
downloadError = it.downloadError,
215+
clearDownloadError = it.downloadError == null,
216+
clearUploadError = it.uploadError == null,
217+
priorityStatusEntries = it.priorityStatusEntries,
218+
)
219+
}
218220
}
219-
}
220221

221-
uploadJob =
222-
scope.launch {
222+
launch {
223223
internalDb
224224
.updatesOnTables()
225225
.filter { it.contains(InternalTable.CRUD.toString()) }
@@ -228,6 +228,7 @@ internal class PowerSyncDatabaseImpl(
228228
syncStream!!.triggerCrudUpload()
229229
}
230230
}
231+
}
231232
}
232233

233234
override suspend fun getCrudBatch(limit: Int): CrudBatch? {
@@ -364,12 +365,10 @@ internal class PowerSyncDatabaseImpl(
364365
override suspend fun disconnect() = mutex.withLock { disconnectInternal() }
365366

366367
private suspend fun disconnectInternal() {
367-
if (syncJob != null && syncJob!!.isActive) {
368-
syncJob?.cancelAndJoin()
369-
}
370-
371-
if (uploadJob != null && uploadJob!!.isActive) {
372-
uploadJob?.cancelAndJoin()
368+
val syncJob = syncSupervisorJob
369+
if (syncJob != null && syncJob.isActive) {
370+
syncJob.cancelAndJoin()
371+
syncSupervisorJob = null
373372
}
374373

375374
if (syncStream != null) {
@@ -470,7 +469,7 @@ internal class PowerSyncDatabaseImpl(
470469
/**
471470
* Check that a supported version of the powersync extension is loaded.
472471
*/
473-
private suspend fun checkVersion(powerSyncVersion: String) {
472+
private fun checkVersion(powerSyncVersion: String) {
474473
// Parse version
475474
val versionInts: List<Int> =
476475
try {

0 commit comments

Comments
 (0)