@@ -79,15 +79,16 @@ pub fn sync_local<V: Value>(db: *mut sqlite::sqlite3, data: &V) -> Result<i64, S
7979 -- 1. Filter oplog by the ops added but not applied yet (oplog b).
8080-- SELECT DISTINCT / UNION is important for cases with many duplicate ids.
8181WITH updated_rows AS (
82- SELECT DISTINCT b.row_type, b.row_id FROM ps_buckets AS buckets
82+ SELECT DISTINCT FALSE as local, b.row_type, b.row_id FROM ps_buckets AS buckets
8383 CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id AND (b.op_id > buckets.last_applied_op)
84- WHERE buckets.priority <= ?
85- UNION SELECT row_type, row_id FROM ps_updated_rows
84+ WHERE buckets.priority <= ?1
85+ UNION SELECT TRUE, row_type, row_id FROM ps_updated_rows
8686)
8787
8888-- 3. Group the objects from different buckets together into a single one (ops).
8989SELECT b.row_type as type,
9090 b.row_id as id,
91+ b.local as local,
9192 r.data as data,
9293 count(r.bucket) as buckets,
9394 /* max() affects which row is used for 'data' */
@@ -97,6 +98,7 @@ FROM updated_rows b
9798 LEFT OUTER JOIN ps_oplog AS r
9899 ON r.row_type = b.row_type
99100 AND r.row_id = b.row_id
101+ AND (SELECT priority FROM ps_buckets WHERE id = r.bucket) <= ?1
100102-- Group for (3)
101103GROUP BY b.row_type, b.row_id" ,
102104 )
@@ -108,11 +110,18 @@ GROUP BY b.row_type, b.row_id",
108110 while statement. step ( ) . into_db_result ( db) ? == ResultCode :: ROW {
109111 let type_name = statement. column_text ( 0 ) ?;
110112 let id = statement. column_text ( 1 ) ?;
111- let buckets = statement. column_int ( 3 ) ?;
112- let data = statement. column_text ( 2 ) ;
113+ let local = statement. column_int ( 2 ) ? == 1 ;
114+ let buckets = statement. column_int ( 4 ) ?;
115+ let data = statement. column_text ( 3 ) ;
113116
114117 let table_name = internal_table_name ( type_name) ;
115118
119+ if local && buckets == 0 && priority == BucketPriority :: HIGHEST {
120+ // These rows are still local and they haven't been uploaded yet (which we allow for
121+ // buckets with priority=0 completing). We should just keep them around.
122+ continue ;
123+ }
124+
116125 if tables. contains ( & table_name) {
117126 let quoted = quote_internal_name ( type_name, false ) ;
118127
@@ -157,20 +166,27 @@ GROUP BY b.row_type, b.row_id",
157166 }
158167
159168 // language=SQLite
160- db. exec_safe (
161- "UPDATE ps_buckets
169+ let updated = db
170+ . prepare_v2 (
171+ "UPDATE ps_buckets
162172 SET last_applied_op = last_op
163- WHERE last_applied_op != last_op" ,
164- )
165- . into_db_result ( db) ?;
166-
167- // language=SQLite
168- db. exec_safe ( "DELETE FROM ps_updated_rows" )
173+ WHERE last_applied_op != last_op AND priority <= ?" ,
174+ )
169175 . into_db_result ( db) ?;
176+ updated. bind_int ( 1 , priority. into ( ) ) ?;
177+ updated. exec ( ) ?;
170178
171- // language=SQLite
172- db. exec_safe ( "insert or replace into ps_kv(key, value) values('last_synced_at', datetime())" )
179+ if priority == BucketPriority :: LOWEST {
180+ // language=SQLite
181+ db. exec_safe ( "DELETE FROM ps_updated_rows" )
182+ . into_db_result ( db) ?;
183+
184+ // language=SQLite
185+ db. exec_safe (
186+ "insert or replace into ps_kv(key, value) values('last_synced_at', datetime())" ,
187+ )
173188 . into_db_result ( db) ?;
189+ }
174190
175191 Ok ( 1 )
176192}
0 commit comments