Skip to content

Commit bdc6f7d

Browse files
authored
Merge pull request #20 from powersync-ja/feat/rust-sync-implementation
(feat) Added rust sync implementation
2 parents fb7e312 + 848d78c commit bdc6f7d

File tree

27 files changed

+1602
-367
lines changed

27 files changed

+1602
-367
lines changed

.github/workflows/test.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ jobs:
1717
with:
1818
dotnet-version: '8.0'
1919

20+
- name: Install MAUI Workloads
21+
run: dotnet workload restore
22+
2023
- name: Download PowerSync extension
2124
run: dotnet run --project Tools/Setup
2225

Directory.build.props

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
<?xml version="1.0" encoding="utf-8"?>
22
<Project>
3+
<PropertyGroup>
4+
<MSBuildWarningsAsMessages>$(MSBuildWarningsAsMessages);NETSDK1202</MSBuildWarningsAsMessages>
5+
</PropertyGroup>
36
<ItemGroup>
47
<Compile Include="$(MSBuildThisFileDirectory)IsExternalInit.cs" Visible="false" />
58
</ItemGroup>

PowerSync/PowerSync.Common/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# PowerSync.Common Changelog
22

3+
## 0.0.5-alpha.1
4+
- Using the latest (0.4.9) version of the core extension, it introduces support for the Rust Sync implementation and also makes it the default - users can still opt out and use the legacy C# sync implementation as option when calling `connect()`.
5+
36
## 0.0.4-alpha.1
47
- Fixed MAUI issues related to extension loading when installing package outside of the monorepo.
58

PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,11 @@ public interface IPowerSyncDatabase : IEventStream<PowerSyncDBEvent>
6060

6161
Task<NonQueryResult> Execute(string query, object[]? parameters = null);
6262

63-
Task<T[]> GetAll<T>(string sql, params object[]? parameters);
63+
Task<T[]> GetAll<T>(string sql, object[]? parameters = null);
6464

65-
Task<T?> GetOptional<T>(string sql, params object[]? parameters);
65+
Task<T?> GetOptional<T>(string sql, object[]? parameters = null);
6666

67-
Task<T> Get<T>(string sql, params object[]? parameters);
67+
Task<T> Get<T>(string sql, object[]? parameters = null);
6868

6969
Task<T> ReadLock<T>(Func<ILockContext, Task<T>> fn, DBLockOptions? options = null);
7070

@@ -88,7 +88,7 @@ public class PowerSyncDatabase : EventStream<PowerSyncDBEvent>, IPowerSyncDataba
8888
private static readonly int DEFAULT_WATCH_THROTTLE_MS = 30;
8989
private static readonly Regex POWERSYNC_TABLE_MATCH = new Regex(@"(^ps_data__|^ps_data_local__)", RegexOptions.Compiled);
9090

91-
public bool Closed;
91+
public new bool Closed;
9292
public bool Ready;
9393

9494
protected Task isReadyTask;
@@ -236,7 +236,7 @@ protected async Task UpdateHasSynced()
236236
);
237237

238238
DateTime? lastCompleteSync = null;
239-
239+
240240
// TODO: Will be altered/extended when reporting individual sync priority statuses is supported
241241
foreach (var result in results)
242242
{

PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,17 @@ namespace PowerSync.Common.Client.Sync.Bucket;
88
using PowerSync.Common.Utils;
99
using Newtonsoft.Json;
1010

11+
public static class PowerSyncControlCommand
12+
{
13+
public const string PROCESS_TEXT_LINE = "line_text";
14+
public const string PROCESS_BSON_LINE = "line_binary";
15+
public const string STOP = "stop";
16+
public const string START = "start";
17+
public const string NOTIFY_TOKEN_REFRESHED = "refreshed_token";
18+
public const string NOTIFY_CRUD_UPLOAD_COMPLETED = "completed_upload";
19+
public const string UPDATE_SUBSCRIPTIONS = "update_subscriptions";
20+
}
21+
1122
public class Checkpoint
1223
{
1324
[JsonProperty("last_op_id")]
@@ -113,4 +124,9 @@ public interface IBucketStorageAdapter : IEventStream<BucketStorageEvent>
113124
/// Get a unique client ID.
114125
/// </summary>
115126
Task<string> GetClientId();
127+
128+
/// <summary>
129+
/// Invokes the `powersync_control` function for the sync client.
130+
/// </summary>
131+
Task<string> Control(string op, object? payload);
116132
}

PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs

Lines changed: 42 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ await db.WriteTransaction(async tx =>
111111
foreach (var b in batch.Buckets)
112112
{
113113
var result = await tx.Execute("INSERT INTO powersync_operations(op, data) VALUES(?, ?)",
114-
["save", JsonConvert.SerializeObject(new { buckets = new[] { b.ToJSON() } })]);
114+
["save", JsonConvert.SerializeObject(new { buckets = new[] { JsonConvert.DeserializeObject(b.ToJSON()) } })]);
115115
logger.LogDebug("saveSyncData {message}", JsonConvert.SerializeObject(result));
116116
count += b.Data.Length;
117117
}
@@ -314,7 +314,7 @@ public async Task<bool> UpdateLocalTarget(Func<Task<string>> callback)
314314
}
315315

316316
var rs = await db.GetAll<SequenceResult>(
317-
"SELECT seq FROM sqlite_sequence WHERE name = 'ps_crud'"
317+
"SELECT seq FROM main.sqlite_sequence WHERE name = 'ps_crud'"
318318
);
319319

320320
if (rs.Length == 0)
@@ -338,7 +338,7 @@ public async Task<bool> UpdateLocalTarget(Func<Task<string>> callback)
338338
}
339339

340340
var rsAfter = await tx.GetAll<SequenceResult>(
341-
"SELECT seq FROM sqlite_sequence WHERE name = 'ps_crud'"
341+
"SELECT seq FROM main.sqlite_sequence WHERE name = 'ps_crud'"
342342
);
343343

344344
if (rsAfter.Length == 0)
@@ -351,16 +351,18 @@ public async Task<bool> UpdateLocalTarget(Func<Task<string>> callback)
351351

352352
if (seqAfter != seqBefore)
353353
{
354-
logger.LogDebug("[updateLocalTarget] seqAfter ({seqAfter}) != seqBefore ({seqBefore})", seqAfter, seqBefore);
354+
logger.LogDebug("[updateLocalTarget] seqAfter ({seqAfter}) != seqBefore ({seqBefore})", seqAfter,
355+
seqBefore);
355356
return false;
356357
}
357358

358359
var response = await tx.Execute(
359-
"UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'",
360-
[opId]
361-
);
360+
"UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'",
361+
[opId]
362+
);
362363

363-
logger.LogDebug("[updateLocalTarget] Response from updating target_op: {response}", JsonConvert.SerializeObject(response));
364+
logger.LogDebug("[updateLocalTarget] Response from updating target_op: {response}",
365+
JsonConvert.SerializeObject(response));
364366
return true;
365367
});
366368
}
@@ -388,33 +390,33 @@ public async Task<bool> UpdateLocalTarget(Func<Task<string>> callback)
388390
var last = all[all.Length - 1];
389391

390392
return new CrudBatch(
391-
Crud: all,
392-
HaveMore: true,
393-
CompleteCallback: async (string? writeCheckpoint) =>
394-
{
395-
await db.WriteTransaction(async tx =>
393+
Crud: all,
394+
HaveMore: true,
395+
CompleteCallback: async (string? writeCheckpoint) =>
396396
{
397-
await tx.Execute("DELETE FROM ps_crud WHERE id <= ?", [last.ClientId]);
398-
399-
if (!string.IsNullOrEmpty(writeCheckpoint))
397+
await db.WriteTransaction(async tx =>
400398
{
401-
var crudResult = await tx.GetAll<object>("SELECT 1 FROM ps_crud LIMIT 1");
402-
if (crudResult?.Length > 0)
399+
await tx.Execute("DELETE FROM ps_crud WHERE id <= ?", [last.ClientId]);
400+
401+
if (!string.IsNullOrEmpty(writeCheckpoint))
402+
{
403+
var crudResult = await tx.GetAll<object>("SELECT 1 FROM ps_crud LIMIT 1");
404+
if (crudResult?.Length > 0)
405+
{
406+
await tx.Execute(
407+
"UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'",
408+
[writeCheckpoint]);
409+
}
410+
}
411+
else
403412
{
404413
await tx.Execute(
405414
"UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'",
406-
[writeCheckpoint]);
415+
[GetMaxOpId()]);
407416
}
408-
}
409-
else
410-
{
411-
await tx.Execute(
412-
"UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'",
413-
[GetMaxOpId()]);
414-
}
415-
});
416-
}
417-
);
417+
});
418+
}
419+
);
418420
}
419421

420422
public async Task<CrudEntry?> NextCrudItem()
@@ -434,4 +436,15 @@ public async Task SetTargetCheckpoint(Checkpoint checkpoint)
434436
// No Op
435437
await Task.CompletedTask;
436438
}
439+
440+
record ControlResult(string? r);
441+
442+
public async Task<string> Control(string op, object? payload = null)
443+
{
444+
return await db.WriteTransaction(async tx =>
445+
{
446+
var result = await tx.Get<ControlResult>("SELECT powersync_control(?, ?) AS r", [op, payload]);
447+
return result.r!;
448+
});
449+
}
437450
}
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
using Newtonsoft.Json.Linq;
2+
using Newtonsoft.Json;
3+
4+
namespace PowerSync.Common.Client.Sync.Stream;
5+
6+
/// <summary>
7+
/// An internal instruction emitted by the sync client in the core extension in response to the
8+
/// SDK passing sync data into the extension.
9+
/// </summary>
10+
public abstract class Instruction
11+
{
12+
13+
public static Instruction[] ParseInstructions(string rawResponse)
14+
{
15+
var jsonArray = JArray.Parse(rawResponse);
16+
List<Instruction> instructions = [];
17+
18+
foreach (JObject item in jsonArray)
19+
{
20+
var instruction = ParseInstruction(item);
21+
if (instruction == null)
22+
{
23+
throw new JsonSerializationException("Failed to parse instruction from JSON.");
24+
}
25+
instructions.Add(instruction);
26+
}
27+
28+
return instructions.ToArray();
29+
}
30+
31+
public static Instruction? ParseInstruction(JObject json)
32+
{
33+
if (json.ContainsKey("LogLine"))
34+
return json["LogLine"]!.ToObject<LogLine>();
35+
if (json.ContainsKey("UpdateSyncStatus"))
36+
return json["UpdateSyncStatus"]!.ToObject<UpdateSyncStatus>();
37+
if (json.ContainsKey("EstablishSyncStream"))
38+
return json["EstablishSyncStream"]!.ToObject<EstablishSyncStream>();
39+
if (json.ContainsKey("FetchCredentials"))
40+
return json["FetchCredentials"]!.ToObject<FetchCredentials>();
41+
if (json.ContainsKey("CloseSyncStream"))
42+
return new CloseSyncStream();
43+
if (json.ContainsKey("FlushFileSystem"))
44+
return new FlushFileSystem();
45+
if (json.ContainsKey("DidCompleteSync"))
46+
return new DidCompleteSync();
47+
48+
throw new JsonSerializationException("Unknown Instruction type.");
49+
}
50+
}
51+
52+
public class LogLine : Instruction
53+
{
54+
[JsonProperty("severity")]
55+
public string Severity { get; set; } = null!; // "DEBUG", "INFO", "WARNING"
56+
57+
[JsonProperty("line")]
58+
public string Line { get; set; } = null!;
59+
}
60+
61+
public class EstablishSyncStream : Instruction
62+
{
63+
[JsonProperty("request")]
64+
public StreamingSyncRequest Request { get; set; } = null!;
65+
}
66+
67+
public class UpdateSyncStatus : Instruction
68+
{
69+
[JsonProperty("status")]
70+
public CoreSyncStatus Status { get; set; } = null!;
71+
}
72+
73+
public class CoreSyncStatus
74+
{
75+
[JsonProperty("connected")]
76+
public bool Connected { get; set; }
77+
78+
[JsonProperty("connecting")]
79+
public bool Connecting { get; set; }
80+
81+
[JsonProperty("priority_status")]
82+
public List<SyncPriorityStatus> PriorityStatus { get; set; } = [];
83+
84+
[JsonProperty("downloading")]
85+
public DownloadProgress? Downloading { get; set; }
86+
}
87+
88+
public class SyncPriorityStatus
89+
{
90+
[JsonProperty("priority")]
91+
public int Priority { get; set; }
92+
93+
[JsonProperty("last_synced_at")]
94+
public long LastSyncedAt { get; set; }
95+
96+
[JsonProperty("has_synced")]
97+
public bool? HasSynced { get; set; }
98+
}
99+
100+
public class DownloadProgress
101+
{
102+
[JsonProperty("buckets")]
103+
public Dictionary<string, BucketProgress> Buckets { get; set; } = null!;
104+
}
105+
106+
public class BucketProgress
107+
{
108+
[JsonProperty("priority")]
109+
public int Priority { get; set; }
110+
111+
[JsonProperty("at_last")]
112+
public int AtLast { get; set; }
113+
114+
[JsonProperty("since_last")]
115+
public int SinceLast { get; set; }
116+
117+
[JsonProperty("target_count")]
118+
public int TargetCount { get; set; }
119+
}
120+
121+
public class FetchCredentials : Instruction
122+
{
123+
[JsonProperty("did_expire")]
124+
public bool DidExpire { get; set; }
125+
}
126+
127+
public class CloseSyncStream : Instruction { }
128+
public class FlushFileSystem : Instruction { }
129+
public class DidCompleteSync : Instruction { }

PowerSync/PowerSync.Common/Client/Sync/Stream/Remote.cs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,37 @@ public async Task<T> Get<T>(string path, Dictionary<string, string>? headers = n
124124
return JsonConvert.DeserializeObject<T>(responseData)!;
125125
}
126126

127+
/// <summary>
128+
/// Posts to the stream endpoint and returns a raw NDJSON stream that can be read line by line.
129+
/// </summary>
130+
public async Task<Stream> PostStreamRaw(SyncStreamOptions options)
131+
{
132+
var requestMessage = await BuildRequest(HttpMethod.Post, options.Path, options.Data, options.Headers);
133+
var response = await httpClient.SendAsync(requestMessage, HttpCompletionOption.ResponseHeadersRead, options.CancellationToken);
134+
135+
if (response.Content == null)
136+
{
137+
throw new HttpRequestException($"HTTP {response.StatusCode}: No content");
138+
}
139+
140+
if (response.StatusCode == System.Net.HttpStatusCode.Unauthorized)
141+
{
142+
InvalidateCredentials();
143+
}
144+
145+
if (!response.IsSuccessStatusCode)
146+
{
147+
var errorText = await response.Content.ReadAsStringAsync();
148+
throw new HttpRequestException($"HTTP {response.StatusCode}: {errorText}");
149+
}
150+
151+
return await response.Content.ReadAsStreamAsync();
152+
}
153+
154+
155+
/// <summary>
156+
/// Originally used for the C# streaming sync implementation.
157+
/// </summary>
127158
public async IAsyncEnumerable<StreamingSyncLine?> PostStream(SyncStreamOptions options)
128159
{
129160
using var requestMessage = await BuildRequest(HttpMethod.Post, options.Path, options.Data, options.Headers);

0 commit comments

Comments
 (0)