From daccc8dc4463954afd076b6a663dff4f5ac59e46 Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Wed, 3 Dec 2025 17:52:44 +0100 Subject: [PATCH 1/2] feat: add cdp_metrics_sink --- .../pipes/cdp_dashboard_metrics_sink.pipe | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 services/libs/tinybird/pipes/cdp_dashboard_metrics_sink.pipe diff --git a/services/libs/tinybird/pipes/cdp_dashboard_metrics_sink.pipe b/services/libs/tinybird/pipes/cdp_dashboard_metrics_sink.pipe new file mode 100644 index 0000000000..cd13a7ad02 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_dashboard_metrics_sink.pipe @@ -0,0 +1,62 @@ +DESCRIPTION > + Global metrics for activities, organizations, and members used for CDP dashboard. + +NODE activityRelations_metrics +SQL > + SELECT + count() AS activities_total, + countIf(createdAt >= now() - INTERVAL 30 DAY) AS activities_last_30_days + FROM activityRelations_deduplicated_ds +END + +NODE organizations_metrics +SQL > + SELECT + count() AS organizations_total, + countIf(createdAt >= now() - INTERVAL 30 DAY) AS organizations_last_30_days + FROM organizations FINAL +END + +NODE members_metrics +SQL > + SELECT + count() AS members_total, + countIf(joinedAt >= now() - INTERVAL 30 DAY) AS members_last_30_days + FROM members FINAL +END + +NODE merge_results +SQL > +SELECT + -- activity + (SELECT activities_total + FROM activityRelations_metrics) AS activities_total, + (SELECT activities_last_30_days + FROM activityRelations_metrics) AS activities_last_30_days, + + -- organizations + (SELECT organizations_total + FROM organizations_metrics) AS organizations_total, + (SELECT organizations_last_30_days + FROM organizations_metrics) AS organizations_last_30_days, + + -- members + (SELECT members_total + FROM members_metrics) AS members_total, + (SELECT members_last_30_days + FROM members_metrics) AS members_last_30_days +END + +NODE cdp_dashboard_full_metrics +SQL > + SELECT * + FROM merge_results +END + +TYPE SINK +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_SCHEDULE 30 0 * * * +EXPORT_FORMAT csv +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_sink From 5489c255b0ac1d3af1c26c6e251cee30c45ff838 Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Wed, 3 Dec 2025 17:53:41 +0100 Subject: [PATCH 2/2] feat: add cdp_metrics_sink --- .../pipes/cdp_dashboard_metrics_sink.pipe | 61 +++++++------------ 1 file changed, 23 insertions(+), 38 deletions(-) diff --git a/services/libs/tinybird/pipes/cdp_dashboard_metrics_sink.pipe b/services/libs/tinybird/pipes/cdp_dashboard_metrics_sink.pipe index cd13a7ad02..0b9bb487df 100644 --- a/services/libs/tinybird/pipes/cdp_dashboard_metrics_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_dashboard_metrics_sink.pipe @@ -1,57 +1,42 @@ DESCRIPTION > - Global metrics for activities, organizations, and members used for CDP dashboard. + Global metrics for activities, organizations, and members used for CDP dashboard. NODE activityRelations_metrics SQL > - SELECT - count() AS activities_total, - countIf(createdAt >= now() - INTERVAL 30 DAY) AS activities_last_30_days - FROM activityRelations_deduplicated_ds -END + SELECT + count() AS activities_total, + countIf(createdAt >= now() - INTERVAL 30 DAY) AS activities_last_30_days + FROM activityRelations_deduplicated_ds NODE organizations_metrics SQL > - SELECT - count() AS organizations_total, - countIf(createdAt >= now() - INTERVAL 30 DAY) AS organizations_last_30_days - FROM organizations FINAL -END + SELECT + count() AS organizations_total, + countIf(createdAt >= now() - INTERVAL 30 DAY) AS organizations_last_30_days + FROM organizations FINAL NODE members_metrics SQL > - SELECT - count() AS members_total, - countIf(joinedAt >= now() - INTERVAL 30 DAY) AS members_last_30_days - FROM members FINAL -END + SELECT + count() AS members_total, countIf(joinedAt >= now() - INTERVAL 30 DAY) AS members_last_30_days + FROM members FINAL NODE merge_results SQL > -SELECT - -- activity - (SELECT activities_total - FROM activityRelations_metrics) AS activities_total, - (SELECT activities_last_30_days - FROM activityRelations_metrics) AS activities_last_30_days, - - -- organizations - (SELECT organizations_total - FROM organizations_metrics) AS organizations_total, - (SELECT organizations_last_30_days - FROM organizations_metrics) AS organizations_last_30_days, - - -- members - (SELECT members_total - FROM members_metrics) AS members_total, - (SELECT members_last_30_days - FROM members_metrics) AS members_last_30_days -END + SELECT + -- activity + (SELECT activities_total FROM activityRelations_metrics) AS activities_total, + (SELECT activities_last_30_days FROM activityRelations_metrics) AS activities_last_30_days, + -- organizations + (SELECT organizations_total FROM organizations_metrics) AS organizations_total, + (SELECT organizations_last_30_days FROM organizations_metrics) AS organizations_last_30_days, + -- members + (SELECT members_total FROM members_metrics) AS members_total, + (SELECT members_last_30_days FROM members_metrics) AS members_last_30_days NODE cdp_dashboard_full_metrics SQL > - SELECT * - FROM merge_results -END + SELECT * FROM merge_results TYPE SINK EXPORT_SERVICE kafka