Skip to content

Commit 5b20a72

Browse files
committed
Add flow for crud transactions
1 parent 799174a commit 5b20a72

File tree

6 files changed

+129
-50
lines changed

6 files changed

+129
-50
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
# Changelog
22

3+
## 1.5.0 (unreleased)
4+
5+
* Add `PowerSyncDatabase.getCrudTransactions()`, returning a flow of transactions. This is useful
6+
to upload multiple transactions in a batch.
7+
38
## 1.4.0
49

510
* Added the ability to log PowerSync service HTTP request information via specifying a

core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package com.powersync
33
import app.cash.turbine.turbineScope
44
import co.touchlab.kermit.ExperimentalKermitApi
55
import com.powersync.db.ActiveDatabaseGroup
6+
import com.powersync.db.crud.CrudEntry
7+
import com.powersync.db.crud.CrudTransaction
68
import com.powersync.db.schema.Schema
79
import com.powersync.testutils.UserRow
810
import com.powersync.testutils.databaseTest
@@ -17,6 +19,7 @@ import kotlinx.coroutines.CompletableDeferred
1719
import kotlinx.coroutines.Dispatchers
1820
import kotlinx.coroutines.async
1921
import kotlinx.coroutines.delay
22+
import kotlinx.coroutines.flow.takeWhile
2023
import kotlinx.coroutines.runBlocking
2124
import kotlinx.coroutines.withContext
2225
import kotlin.test.Test
@@ -427,6 +430,35 @@ class DatabaseTest {
427430
database.getNextCrudTransaction() shouldBe null
428431
}
429432

433+
@Test
434+
fun testCrudTransactions() =
435+
databaseTest {
436+
suspend fun insertInTransaction(size: Int) {
437+
database.writeTransaction { tx ->
438+
repeat(size) {
439+
tx.execute("INSERT INTO users (id, name, email) VALUES (uuid(), null, null)")
440+
}
441+
}
442+
}
443+
444+
insertInTransaction(5)
445+
insertInTransaction(10)
446+
insertInTransaction(15)
447+
448+
val batch = mutableListOf<CrudEntry>()
449+
var lastTx: CrudTransaction? = null
450+
database.getCrudTransactions().takeWhile { batch.size < 10 }.collect {
451+
batch.addAll(it.crud)
452+
lastTx = it
453+
}
454+
455+
batch shouldHaveSize 15
456+
lastTx!!.complete(null)
457+
458+
val finalTx = database.getNextCrudTransaction()
459+
finalTx!!.crud shouldHaveSize 15
460+
}
461+
430462
@Test
431463
fun testCrudBatch() =
432464
databaseTest {

core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import com.powersync.db.schema.Schema
99
import com.powersync.sync.SyncOptions
1010
import com.powersync.sync.SyncStatus
1111
import com.powersync.utils.JsonParam
12+
import kotlinx.coroutines.flow.Flow
13+
import kotlinx.coroutines.flow.firstOrNull
1214
import kotlin.coroutines.cancellation.CancellationException
1315

1416
/**
@@ -123,7 +125,7 @@ public interface PowerSyncDatabase : Queries {
123125
*
124126
* Returns null if there is no data to upload.
125127
*
126-
* Use this from the [PowerSyncBackendConnector.uploadData]` callback.
128+
* Use this from the [PowerSyncBackendConnector.uploadData] callback.
127129
*
128130
* Once the data have been successfully uploaded, call [CrudTransaction.complete] before
129131
* requesting the next transaction.
@@ -132,7 +134,41 @@ public interface PowerSyncDatabase : Queries {
132134
* All data for the transaction is loaded into memory.
133135
*/
134136
@Throws(PowerSyncException::class, CancellationException::class)
135-
public suspend fun getNextCrudTransaction(): CrudTransaction?
137+
public suspend fun getNextCrudTransaction(): CrudTransaction? = getCrudTransactions().firstOrNull()
138+
139+
/**
140+
* Obtains a flow of transactions with local data against the database.
141+
142+
* This is typically used from the [PowerSyncBackendConnector.uploadData] callback.
143+
* Each entry emitted by the returned flow is a full transaction containing all local writes
144+
* made while that transaction was active.
145+
*
146+
* Unlike [getNextCrudTransaction], which always returns the oldest transaction that hasn't
147+
* been [CrudTransaction.complete]d yet, this flow can be used to collect multiple transactions.
148+
* Calling [CrudTransaction.complete] will mark _all_ transactions emitted by the flow until
149+
* that point as completed.
150+
*
151+
* This can be used to upload multiple transactions in a single batch, e.g with:
152+
*
153+
* ```Kotlin
154+
* val batch = mutableListOf<CrudEntry>()
155+
* var lastTx: CrudTransaction? = null
156+
*
157+
* database.getCrudTransactions().takeWhile { batch.size < 10 }.collect {
158+
* batch.addAll(it.crud)
159+
* lastTx = it
160+
* }
161+
*
162+
* if (batch.isNotEmpty()) {
163+
* uploadChanges(batch)
164+
* lastTx!!.complete(null)
165+
* }
166+
* ````
167+
*
168+
* If there is no local data to upload, returns an empty flow.
169+
*/
170+
@Throws(PowerSyncException::class, CancellationException::class)
171+
public suspend fun getCrudTransactions(): Flow<CrudTransaction>
136172

137173
/**
138174
* Convenience method to get the current version of PowerSync.

core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.powersync.bucket
22

3+
import com.powersync.db.SqlCursor
34
import com.powersync.db.crud.CrudEntry
45
import com.powersync.db.internal.PowerSyncTransaction
56
import com.powersync.db.schema.SerializableSchema
@@ -19,15 +20,12 @@ internal interface BucketStorage {
1920

2021
fun nextCrudItem(transaction: PowerSyncTransaction): CrudEntry?
2122

22-
fun getCrudItemsByTransactionId(
23-
transactionId: Int,
24-
transaction: PowerSyncTransaction,
25-
): List<CrudEntry>
26-
2723
suspend fun hasCrud(): Boolean
2824

2925
fun hasCrud(transaction: PowerSyncTransaction): Boolean
3026

27+
fun mapCrudEntry(row: SqlCursor): CrudEntry
28+
3129
suspend fun updateLocalTarget(checkpointCallback: suspend () -> String): Boolean
3230

3331
suspend fun hasCompletedSync(): Boolean

core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -35,32 +35,21 @@ internal class BucketStorageImpl(
3535
return id ?: throw IllegalStateException("Client ID not found")
3636
}
3737

38-
override suspend fun nextCrudItem(): CrudEntry? = db.getOptional(sql = nextCrudQuery, mapper = crudEntryMapper)
38+
override suspend fun nextCrudItem(): CrudEntry? = db.getOptional(sql = nextCrudQuery, mapper = ::mapCrudEntry)
3939

4040
override fun nextCrudItem(transaction: PowerSyncTransaction): CrudEntry? =
41-
transaction.getOptional(sql = nextCrudQuery, mapper = crudEntryMapper)
42-
43-
override fun getCrudItemsByTransactionId(
44-
transactionId: Int,
45-
transaction: PowerSyncTransaction,
46-
): List<CrudEntry> =
47-
transaction.getAll(
48-
sql = transactionCrudQuery,
49-
parameters = listOf(transactionId),
50-
mapper = crudEntryMapper,
51-
)
41+
transaction.getOptional(sql = nextCrudQuery, mapper = ::mapCrudEntry)
5242

5343
private val nextCrudQuery = "SELECT id, tx_id, data FROM ${InternalTable.CRUD} ORDER BY id ASC LIMIT 1"
54-
private val transactionCrudQuery = "SELECT id, tx_id, data FROM ${InternalTable.CRUD} WHERE tx_id = ? ORDER BY id ASC"
55-
private val crudEntryMapper: (SqlCursor) -> CrudEntry = { cursor ->
44+
45+
override fun mapCrudEntry(row: SqlCursor): CrudEntry =
5646
CrudEntry.fromRow(
5747
CrudRow(
58-
id = cursor.getString(0)!!,
59-
txId = cursor.getString(1)?.toInt(),
60-
data = cursor.getString(2)!!,
48+
id = row.getString(0)!!,
49+
txId = row.getString(1)?.toInt(),
50+
data = row.getString(2)!!,
6151
),
6252
)
63-
}
6453

6554
override suspend fun hasCrud(): Boolean {
6655
val res = db.getOptional(sql = hasCrudQuery, mapper = hasCrudMapper)

core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt

Lines changed: 44 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -276,34 +276,53 @@ internal class PowerSyncDatabaseImpl(
276276
})
277277
}
278278

279-
override suspend fun getNextCrudTransaction(): CrudTransaction? {
280-
waitReady()
281-
return internalDb.readTransaction { transaction ->
282-
val entry =
283-
bucketStorage.nextCrudItem(transaction)
284-
?: return@readTransaction null
285-
286-
val txId = entry.transactionId
287-
val entries: List<CrudEntry> =
288-
if (txId == null) {
289-
listOf(entry)
290-
} else {
291-
bucketStorage.getCrudItemsByTransactionId(
292-
transactionId = txId,
293-
transaction = transaction,
294-
)
279+
override suspend fun getCrudTransactions(): Flow<CrudTransaction> =
280+
flow {
281+
waitReady()
282+
var lastItemId = -1
283+
284+
// Note: We try to avoid filtering on tx_id here because there's no index on that column.
285+
// Starting at the first entry we want and then joining by rowid is more efficient. This is
286+
// sound because there can't be concurrent write transactions, so transaction ids are
287+
// increasing when we iterate over rowids.
288+
val query =
289+
"""
290+
WITH RECURSIVE crud_entries AS (
291+
SELECT id, tx_id, data FROM ps_crud WHERE id = (SELECT min(id) FROM ps_crud WHERE id > ?)
292+
UNION ALL
293+
SELECT ps_crud.id, ps_crud.tx_id, ps_crud.data FROM ps_crud
294+
INNER JOIN crud_entries ON crud_entries.id + 1 = rowid
295+
WHERE crud_entries.tx_id = ps_crud.tx_id
296+
)
297+
SELECT * FROM crud_entries;
298+
""".trimIndent()
299+
300+
// TODO: Map the entire flow in a read transaction after we have a driver implementation
301+
// that allows suspending transactions.
302+
while (true) {
303+
val items = getAll(query, listOf(lastItemId), bucketStorage::mapCrudEntry)
304+
if (items.isEmpty()) {
305+
break
295306
}
296307

297-
return@readTransaction CrudTransaction(
298-
crud = entries,
299-
transactionId = txId,
300-
complete = { writeCheckpoint ->
301-
logger.i { "[CrudTransaction::complete] Completing transaction with checkpoint $writeCheckpoint" }
302-
handleWriteCheckpoint(entries.last().clientId, writeCheckpoint)
303-
},
304-
)
308+
val txId = items[0].transactionId
309+
val lastId = items.last().clientId
310+
311+
lastItemId = lastId
312+
emit(
313+
CrudTransaction(
314+
crud = items,
315+
transactionId = items[0].transactionId,
316+
complete = { writeCheckpoint ->
317+
logger.i {
318+
"[CrudTransaction::complete] Completing transaction $txId (client ids until <=$lastId) with checkpoint $writeCheckpoint"
319+
}
320+
handleWriteCheckpoint(lastId, writeCheckpoint)
321+
},
322+
),
323+
)
324+
}
305325
}
306-
}
307326

308327
override suspend fun getPowerSyncVersion(): String {
309328
// The initialization sets powerSyncVersion.

0 commit comments

Comments
 (0)