1+ namespace PowerSync . Common . Tests . Client ;
2+
3+ using System . Diagnostics ;
4+ using PowerSync . Common . Client ;
5+
6+ public class PowerSyncDatabaseTransactionTests : IAsyncLifetime
7+ {
8+ private PowerSyncDatabase db = default ! ;
9+
10+
11+ public async Task InitializeAsync ( )
12+ {
13+ db = new PowerSyncDatabase ( new PowerSyncDatabaseOptions
14+ {
15+ Database = new SQLOpenOptions { DbFilename = "powersyncDataBaseTransactions.db" } ,
16+ Schema = TestData . appSchema ,
17+ } ) ;
18+ await db . Init ( ) ;
19+ }
20+
21+ public async Task DisposeAsync ( )
22+ {
23+ await db . DisconnectAndClear ( ) ;
24+ await db . Close ( ) ;
25+ }
26+
27+ private record IdResult ( string id ) ;
28+ private record AssetResult ( string id , string description , string ? make = null ) ;
29+ private record CountResult ( int count ) ;
30+
31+ [ Fact ]
32+ public async Task SimpleReadTransactionTest ( )
33+ {
34+ await db . Execute ( "INSERT INTO assets(id) VALUES(?)" , [ "O3" ] ) ;
35+
36+ var result = await db . Database . ReadTransaction ( async tx =>
37+ {
38+ return await tx . GetAll < IdResult > ( "SELECT * FROM assets" ) ;
39+ } ) ;
40+
41+ Assert . Single ( result ) ;
42+ }
43+
44+ [ Fact ]
45+ public async Task ManualCommitTest ( )
46+ {
47+ await db . WriteTransaction ( async tx =>
48+ {
49+ await tx . Execute ( "INSERT INTO assets(id) VALUES(?)" , [ "O4" ] ) ;
50+ await tx . Commit ( ) ;
51+ } ) ;
52+
53+ var result = await db . GetAll < IdResult > ( "SELECT * FROM assets WHERE id = ?" , [ "O4" ] ) ;
54+
55+ Assert . Single ( result ) ;
56+ Assert . Equal ( "O4" , result . First ( ) . id ) ;
57+ }
58+
59+ [ Fact ]
60+ public async Task AutoCommitTest ( )
61+ {
62+ await db . WriteTransaction ( async tx =>
63+ {
64+ await tx . Execute ( "INSERT INTO assets(id) VALUES(?)" , [ "O41" ] ) ;
65+ } ) ;
66+
67+ var result = await db . GetAll < IdResult > ( "SELECT * FROM assets WHERE id = ?" , [ "O41" ] ) ;
68+
69+ Assert . Single ( result ) ;
70+ Assert . Equal ( "O41" , result . First ( ) . id ) ;
71+ }
72+
73+ [ Fact ]
74+ public async Task ManualRollbackTest ( )
75+ {
76+ await db . WriteTransaction ( async tx =>
77+ {
78+ await tx . Execute ( "INSERT INTO assets(id) VALUES(?)" , [ "O5" ] ) ;
79+ await tx . Rollback ( ) ;
80+ } ) ;
81+
82+ var result = await db . GetAll < object > ( "SELECT * FROM assets" ) ;
83+ Assert . Empty ( result ) ;
84+ }
85+
86+ [ Fact ]
87+ public async Task AutoRollbackTest ( )
88+ {
89+ bool exceptionThrown = false ;
90+ try
91+ {
92+ await db . WriteTransaction ( async tx =>
93+ {
94+ // This should throw an exception
95+ await tx . Execute ( "INSERT INTO assets(id) VALUES_SYNTAX_ERROR(?)" , [ "O5" ] ) ;
96+ } ) ;
97+ }
98+ catch ( Exception ex )
99+ {
100+ Assert . Contains ( "near \" VALUES_SYNTAX_ERROR\" : syntax error" , ex . Message ) ;
101+ exceptionThrown = true ;
102+ }
103+
104+ var result = await db . GetAll < IdResult > ( "SELECT * FROM assets" ) ;
105+ Assert . Empty ( result ) ;
106+ Assert . True ( exceptionThrown ) ;
107+ }
108+
109+ [ Fact ]
110+ public async Task WriteTransactionWithReturnTest ( )
111+ {
112+ var result = await db . WriteTransaction ( async tx =>
113+ {
114+ await tx . Execute ( "INSERT INTO assets(id) VALUES(?)" , [ "O5" ] ) ;
115+ return await tx . GetAll < IdResult > ( "SELECT * FROM assets" ) ;
116+ } ) ;
117+
118+ Assert . Single ( result ) ;
119+ Assert . Equal ( "O5" , result . First ( ) . id ) ;
120+ }
121+
122+
123+ [ Fact ]
124+ public async Task WriteTransactionNestedQueryTest ( )
125+ {
126+ await db . WriteTransaction ( async tx =>
127+ {
128+ await tx . Execute ( "INSERT INTO assets(id) VALUES(?)" , [ "O6" ] ) ;
129+
130+ var txQuery = await tx . GetAll < IdResult > ( "SELECT * FROM assets" ) ;
131+ Assert . Single ( txQuery ) ;
132+
133+ var dbQuery = await db . GetAll < IdResult > ( "SELECT * FROM assets" ) ;
134+ Assert . Empty ( dbQuery ) ;
135+ } ) ;
136+ }
137+
138+ [ Fact ]
139+ public async Task ReadLockShouldBeReadOnlyTest ( )
140+ {
141+ string id = Guid . NewGuid ( ) . ToString ( ) ;
142+ bool exceptionThrown = false ;
143+
144+ try
145+ {
146+ await db . ReadLock < object > ( async context =>
147+ {
148+ return await context . Execute (
149+ "INSERT INTO assets (id) VALUES (?)" ,
150+ [ id ]
151+ ) ;
152+ } ) ;
153+
154+ // If no exception is thrown, fail the test
155+ throw new Exception ( "Did not throw" ) ;
156+ }
157+ catch ( Exception ex )
158+ {
159+ Assert . Contains ( "attempt to write a readonly database" , ex . Message ) ;
160+ exceptionThrown = true ;
161+ }
162+
163+ Assert . True ( exceptionThrown ) ;
164+ }
165+
166+ [ Fact ]
167+ public async Task ReadLocksShouldQueueIfExceedNumberOfConnectionsTest ( )
168+ {
169+ string id = Guid . NewGuid ( ) . ToString ( ) ;
170+
171+ await db . Execute (
172+ "INSERT INTO assets (id) VALUES (?)" ,
173+ [ id ]
174+ ) ;
175+
176+ int numberOfReads = 20 ;
177+ var tasks = Enumerable . Range ( 0 , numberOfReads )
178+ . Select ( _ => db . ReadLock ( async context =>
179+ {
180+ return await context . GetAll < AssetResult > ( "SELECT id FROM assets WHERE id = ?" , [ id ] ) ;
181+ } ) )
182+ . ToArray ( ) ;
183+
184+ var lockResults = await Task . WhenAll ( tasks ) ;
185+
186+ var ids = lockResults . Select ( r => r . FirstOrDefault ( ) ? . id ) . ToList ( ) ;
187+
188+ Assert . All ( ids , n => Assert . Equal ( id , n ) ) ;
189+ }
190+
191+ [ Fact ( Timeout = 2000 ) ]
192+ public async Task ShouldBeAbleToReadWhileAWriteIsRunningTest ( )
193+ {
194+ var tcs = new TaskCompletionSource ( ) ;
195+
196+ // This wont resolve or free until another connection free's it
197+ var writeTask = db . WriteLock ( async context =>
198+ {
199+ await tcs . Task ; // Wait until read lock signals to proceed
200+ } ) ;
201+
202+ var readTask = db . ReadLock ( async context =>
203+ {
204+ // Read logic could execute here while writeLock is still open
205+ tcs . SetResult ( ) ;
206+ await Task . CompletedTask ;
207+ return 42 ;
208+ } ) ;
209+
210+ var result = await readTask ;
211+ await writeTask ;
212+
213+ Assert . Equal ( 42 , result ) ;
214+ }
215+
216+ [ Fact ( Timeout = 2000 ) ]
217+ public async Task ShouldQueueSimultaneousExecutionsTest ( )
218+ {
219+ var order = new List < int > ( ) ;
220+ var operationCount = 5 ;
221+
222+ await db . WriteLock ( async context =>
223+ {
224+ var tasks = Enumerable . Range ( 0 , operationCount )
225+ . Select ( async index =>
226+ {
227+ await context . Execute ( "SELECT * FROM assets" ) ;
228+ order . Add ( index ) ;
229+ } )
230+ . ToArray ( ) ;
231+
232+ await Task . WhenAll ( tasks ) ;
233+ } ) ;
234+
235+ var expectedOrder = Enumerable . Range ( 0 , operationCount ) . ToList ( ) ;
236+ Assert . Equal ( expectedOrder , order ) ;
237+ }
238+
239+ [ Fact ( Timeout = 2000 ) ]
240+ public async Task ShouldCallUpdateHookOnChangesTest ( )
241+ {
242+ var cts = new CancellationTokenSource ( ) ;
243+ var result = new TaskCompletionSource ( ) ;
244+
245+ db . OnChange ( new WatchOnChangeHandler
246+ {
247+ OnChange = ( x ) =>
248+ {
249+ result . SetResult ( ) ;
250+ cts . Cancel ( ) ;
251+ return Task . CompletedTask ;
252+ }
253+ } , new SQLWatchOptions
254+ {
255+ Tables = [ "assets" ] ,
256+ Signal = cts . Token
257+ } ) ;
258+ await db . Execute ( "INSERT INTO assets (id) VALUES(?)" , [ "099-onchange" ] ) ;
259+
260+ await result . Task ;
261+ }
262+
263+ [ Fact ( Timeout = 2000 ) ]
264+ public async Task ShouldReflectWriteTransactionUpdatesOnReadConnectionsTest ( )
265+ {
266+ var watched = new TaskCompletionSource ( ) ;
267+
268+ var cts = new CancellationTokenSource ( ) ;
269+ db . Watch ( "SELECT COUNT(*) as count FROM assets" , null , new WatchHandler < CountResult >
270+ {
271+ OnResult = ( x ) =>
272+ {
273+ if ( x . First ( ) . count == 1 )
274+ {
275+ watched . SetResult ( ) ;
276+ cts . Cancel ( ) ;
277+ }
278+ }
279+ } , new SQLWatchOptions
280+ {
281+
282+ Signal = cts . Token
283+ } ) ;
284+
285+ await db . WriteTransaction ( async tx =>
286+ {
287+ await tx . Execute ( "INSERT INTO assets (id) VALUES(?)" , [ "099-watch" ] ) ;
288+ } ) ;
289+
290+ await watched . Task ;
291+ }
292+
293+ [ Fact ( Timeout = 2000 ) ]
294+ public async Task ShouldReflectWriteLockUpdatesOnReadConnectionsTest ( )
295+ {
296+ var numberOfUsers = 10_000 ;
297+
298+ var watched = new TaskCompletionSource ( ) ;
299+
300+ var cts = new CancellationTokenSource ( ) ;
301+ db . Watch ( "SELECT COUNT(*) as count FROM assets" , null , new WatchHandler < CountResult >
302+ {
303+ OnResult = ( x ) =>
304+ {
305+ if ( x . First ( ) . count == numberOfUsers )
306+ {
307+ watched . SetResult ( ) ;
308+ cts . Cancel ( ) ;
309+ }
310+ }
311+ } , new SQLWatchOptions
312+ {
313+ Signal = cts . Token
314+ } ) ;
315+
316+ await db . WriteLock ( async tx =>
317+ {
318+ await tx . Execute ( "BEGIN" ) ;
319+ for ( var i = 0 ; i < numberOfUsers ; i ++ )
320+ {
321+ await tx . Execute ( "INSERT INTO assets (id) VALUES(?)" , [ "0" + i + "-writelock" ] ) ;
322+ }
323+ await tx . Execute ( "COMMIT" ) ;
324+ } ) ;
325+
326+ await watched . Task ;
327+ }
328+
329+ [ Fact ( Timeout = 5000 ) ]
330+ public async Task Insert10000Records_ShouldCompleteWithinTimeLimitTest ( )
331+ {
332+ var random = new Random ( ) ;
333+ var stopwatch = Stopwatch . StartNew ( ) ;
334+
335+ for ( int i = 0 ; i < 10000 ; ++ i )
336+ {
337+ int n = random . Next ( 0 , 100000 ) ;
338+ await db . Execute (
339+ "INSERT INTO assets(id, description) VALUES(?, ?)" ,
340+ [ i + 1 , n ]
341+ ) ;
342+ }
343+
344+ await db . Execute ( "PRAGMA wal_checkpoint(RESTART)" ) ;
345+
346+ stopwatch . Stop ( ) ;
347+ var duration = stopwatch . ElapsedMilliseconds ;
348+
349+ Assert . True ( duration < 2000 , $ "Test took too long: { duration } ms") ;
350+ }
351+
352+ [ Fact ( Timeout = 5000 ) ]
353+ public async Task TestConcurrentReadsTest ( )
354+ {
355+ await db . Execute ( "INSERT INTO assets(id) VALUES(?)" , [ "O6-conccurent-1" ] ) ;
356+ var tcs = new TaskCompletionSource < bool > ( ) ;
357+
358+ // Start a long-running write transaction
359+ var transactionTask = Task . Run ( async ( ) =>
360+ {
361+ await db . WriteTransaction ( async tx =>
362+ {
363+ await tx . Execute ( "INSERT INTO assets(id) VALUES(?)" , [ "O6-conccurent-2" ] ) ;
364+ await tcs . Task ;
365+ await tx . Commit ( ) ;
366+ } ) ;
367+ } ) ;
368+
369+ // Try and read while the write transaction is still open
370+ var result = await db . GetAll < object > ( "SELECT * FROM assets" ) ;
371+ Assert . Single ( result ) ; // The transaction is not commited yet, we should only read 1 user
372+
373+ // Let the transaction complete
374+ tcs . SetResult ( true ) ;
375+ await transactionTask ;
376+
377+ // Read again after the transaction is committed
378+ var afterTx = await db . GetAll < object > ( "SELECT * FROM assets" ) ;
379+ Assert . Equal ( 2 , afterTx . Length ) ;
380+ }
381+ }
0 commit comments