Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion demos/supabase-todolist/lib/powersync.dart
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ Future<String> getDatabasePath() async {
return join(dir.path, dbFilename);
}

const options = SyncOptions(syncImplementation: SyncClientImplementation.rust);
const options = SyncOptions(
syncImplementation: SyncClientImplementation.rust,
appMetadata: {'app_version': '1.0.1'});

Future<void> openDatabase() async {
// Open the local database
Expand Down
12 changes: 12 additions & 0 deletions packages/powersync_core/lib/src/sync/options.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ import 'package:meta/meta.dart';

/// Options that affect how the sync client connects to the sync service.
final class SyncOptions {
/// A map of application metadata that is passed to the PowerSync service.
///
/// Application metadata that will be displayed in PowerSync service logs.
final Map<String, String>? appMetadata;

/// A JSON object that is passed to the sync service and forwarded to sync
/// rules.
///
Expand Down Expand Up @@ -39,19 +44,22 @@ final class SyncOptions {
this.params,
this.syncImplementation = SyncClientImplementation.defaultClient,
this.includeDefaultStreams,
this.appMetadata,
});

SyncOptions _copyWith({
Duration? crudThrottleTime,
Duration? retryDelay,
Map<String, dynamic>? params,
Map<String, String>? appMetadata,
}) {
return SyncOptions(
crudThrottleTime: crudThrottleTime ?? this.crudThrottleTime,
retryDelay: retryDelay,
params: params ?? this.params,
syncImplementation: syncImplementation,
includeDefaultStreams: includeDefaultStreams,
appMetadata: appMetadata ?? this.appMetadata,
);
}
}
Expand Down Expand Up @@ -88,14 +96,18 @@ extension type ResolvedSyncOptions(SyncOptions source) {
Duration? crudThrottleTime,
Duration? retryDelay,
Map<String, dynamic>? params,
Map<String, String>? appMetadata,
}) {
return ResolvedSyncOptions((source ?? SyncOptions())._copyWith(
crudThrottleTime: crudThrottleTime,
retryDelay: retryDelay,
params: params,
appMetadata: appMetadata,
));
}

Map<String, String> get appMetadata => source.appMetadata ?? const {};

Duration get crudThrottleTime =>
source.crudThrottleTime ?? const Duration(milliseconds: 10);

Expand Down
7 changes: 5 additions & 2 deletions packages/powersync_core/lib/src/sync/protocol.dart
Original file line number Diff line number Diff line change
Expand Up @@ -247,15 +247,18 @@ class StreamingSyncRequest {
bool includeChecksum = true;
String clientId;
Map<String, dynamic>? parameters;
Map<String, String>? appMetadata;

StreamingSyncRequest(this.buckets, this.parameters, this.clientId);
StreamingSyncRequest(
this.buckets, this.parameters, this.clientId, this.appMetadata);

Map<String, dynamic> toJson() {
final Map<String, dynamic> json = {
'buckets': buckets,
'include_checksum': includeChecksum,
'raw_data': true,
'client_id': clientId
'client_id': clientId,
'app_metadata': appMetadata,
};

if (parameters != null) {
Expand Down
9 changes: 5 additions & 4 deletions packages/powersync_core/lib/src/sync/streaming_sync.dart
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ import 'package:powersync_core/src/sync/options.dart';
import 'package:powersync_core/src/user_agent/user_agent.dart';
import 'package:sqlite_async/mutex.dart';

import 'bucket_storage.dart';
import '../crud.dart';
import 'bucket_storage.dart';
import 'instruction.dart';
import 'internal_connector.dart';
import 'mutable_sync_status.dart';
import 'protocol.dart';
import 'stream_utils.dart';
import 'sync_status.dart';
import 'protocol.dart';

typedef SubscribedStream = ({String name, String parameters});

Expand Down Expand Up @@ -339,8 +339,8 @@ class StreamingSyncImplementation implements StreamingSync {

Checkpoint? targetCheckpoint;

var requestStream = _streamingSyncRequest(
StreamingSyncRequest(bucketRequests, options.params, clientId!))
var requestStream = _streamingSyncRequest(StreamingSyncRequest(
bucketRequests, options.params, clientId!, options.appMetadata))
.map(ReceivedLine.new);

var merged = addBroadcast(requestStream, _nonLineSyncEvents.stream);
Expand Down Expand Up @@ -633,6 +633,7 @@ final class _ActiveRustStreamingIteration {
await _control(
'start',
convert.json.encode({
'app_metadata': sync.options.appMetadata,
'parameters': sync.options.params,
'schema': convert.json.decode(sync.schemaJson),
'include_defaults': sync.options.includeDefaultStreams,
Expand Down
38 changes: 37 additions & 1 deletion packages/powersync_core/test/sync/stream_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import 'dart:convert';
import 'package:async/async.dart';
import 'package:logging/logging.dart';
import 'package:powersync_core/powersync_core.dart';

import 'package:test/test.dart';

import '../server/sync_server/in_memory_sync_server.dart';
Expand Down Expand Up @@ -260,4 +259,41 @@ void main() {
);
a.unsubscribe();
});

test('passes app metadata to the server (rust)', () async {
options = SyncOptions(
syncImplementation: SyncClientImplementation.rust,
appMetadata: {'foo': 'bar'},
);

await waitForConnection();

final request = await syncService.waitForListener;
expect(
json.decode(await request.readAsString()),
containsPair(
'app_metadata',
containsPair('foo', 'bar'),
),
);
});

test('passes app metadata to the server (legacy sync client)', () async {
options = SyncOptions(
// ignore: deprecated_member_use_from_same_package
syncImplementation: SyncClientImplementation.dart,
appMetadata: {'foo': 'bar'},
);

await waitForConnection();

final request = await syncService.waitForListener;
expect(
json.decode(await request.readAsString()),
containsPair(
'app_metadata',
containsPair('foo', 'bar'),
),
);
});
}