diff --git a/packages/core-bridge/Cargo.lock b/packages/core-bridge/Cargo.lock index 0fb1f9b6e..07680cbc1 100644 --- a/packages/core-bridge/Cargo.lock +++ b/packages/core-bridge/Cargo.lock @@ -159,6 +159,31 @@ version = "2.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b8e56985ec62d17e9c1001dc89c88ecd7dc08e47eba5ec7c29c7b5eeecde967" +[[package]] +name = "bon" +version = "3.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebeb9aaf9329dff6ceb65c689ca3db33dbf15f324909c60e4e5eef5701ce31b1" +dependencies = [ + "bon-macros", + "rustversion", +] + +[[package]] +name = "bon-macros" +version = "3.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77e9d642a7e3a318e37c2c9427b5a6a48aa1ad55dcd986f3034ab2239045a645" +dependencies = [ + "darling 0.21.3", + "ident_case", + "prettyplease", + "proc-macro2", + "quote", + "rustversion", + "syn", +] + [[package]] name = "bridge-macros" version = "0.1.0" @@ -287,8 +312,18 @@ version = "0.20.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee" dependencies = [ - "darling_core", - "darling_macro", + "darling_core 0.20.11", + "darling_macro 0.20.11", +] + +[[package]] +name = "darling" +version = "0.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cdf337090841a411e2a7f3deb9187445851f91b309c0c0a29e05f74a00a48c0" +dependencies = [ + "darling_core 0.21.3", + "darling_macro 0.21.3", ] [[package]] @@ -305,13 +340,38 @@ dependencies = [ "syn", ] +[[package]] +name = "darling_core" +version = "0.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1247195ecd7e3c85f83c8d2a366e4210d588e802133e1e355180a9870b517ea4" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn", +] + [[package]] name = "darling_macro" version = "0.20.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" dependencies = [ - "darling_core", + "darling_core 0.20.11", + "quote", + "syn", +] + +[[package]] +name = "darling_macro" +version = "0.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81" +dependencies = [ + "darling_core 0.21.3", "quote", "syn", ] @@ -356,7 +416,7 @@ version = "0.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2d5bcf7b024d6835cfb3d473887cd966994907effbe9227e8c8219824d06c4e8" dependencies = [ - "darling", + "darling 0.20.11", "proc-macro2", "quote", "syn", @@ -2423,8 +2483,8 @@ dependencies = [ "async-trait", "backoff", "base64", + "bon", "bytes", - "derive_builder", "derive_more", "dyn-clone", "futures-retry", diff --git a/packages/core-bridge/sdk-core b/packages/core-bridge/sdk-core index 850db67c8..44a6576bb 160000 --- a/packages/core-bridge/sdk-core +++ b/packages/core-bridge/sdk-core @@ -1 +1 @@ -Subproject commit 850db67c87ac9208da53df1cd82f8a36d71c5227 +Subproject commit 44a6576bbaac589b28afa173ca6d60757a4d821d diff --git a/packages/core-bridge/src/client.rs b/packages/core-bridge/src/client.rs index c427ccfb1..85e595403 100644 --- a/packages/core-bridge/src/client.rs +++ b/packages/core-bridge/src/client.rs @@ -656,12 +656,10 @@ where mod config { use std::collections::HashMap; - use anyhow::Context as _; - use temporalio_client::HttpConnectProxyOptions; use temporalio_sdk_core::{ - ClientOptions as CoreClientOptions, ClientOptionsBuilder, - ClientTlsConfig as CoreClientTlsConfig, TlsConfig as CoreTlsConfig, Url, + ClientOptions as CoreClientOptions, ClientTlsOptions as CoreClientTlsOptions, + TlsOptions as CoreTlsOptions, Url, }; use bridge_macros::TryFromJs; @@ -673,7 +671,7 @@ mod config { target_url: Url, client_name: String, client_version: String, - tls: Option, + tls: Option, http_connect_proxy: Option, headers: Option>, api_key: Option, @@ -682,14 +680,14 @@ mod config { #[derive(Debug, Clone, TryFromJs)] #[allow(clippy::struct_field_names)] - struct TlsConfig { + struct TlsOptions { domain: Option, server_root_ca_cert: Option>, - client_tls_config: Option, + client_tls_config: Option, } #[derive(Debug, Clone, TryFromJs)] - struct TlsConfigClientCertPair { + struct TlsOptionsClientCertPair { client_cert: Vec, client_private_key: Vec, } @@ -709,11 +707,9 @@ mod config { impl TryInto for ClientOptions { type Error = BridgeError; fn try_into(self) -> Result { - let mut builder = ClientOptionsBuilder::default(); + let builder = CoreClientOptions::builder(); - if let Some(tls) = self.tls { - builder.tls_cfg(tls.into()); - } + let tls_options = self.tls.map(Into::into); let (ascii_headers, bin_headers) = partition_headers(self.headers); @@ -722,29 +718,29 @@ mod config { .client_name(self.client_name) .client_version(self.client_version) // tls_cfg -- above - .http_connect_proxy(self.http_connect_proxy.map(Into::into)) - .headers(ascii_headers) - .binary_headers(bin_headers) - .api_key(self.api_key) + .maybe_http_connect_proxy(self.http_connect_proxy.map(Into::into)) + .maybe_headers(ascii_headers) + .maybe_binary_headers(bin_headers) + .maybe_api_key(self.api_key) .disable_error_code_metric_tags(self.disable_error_code_metric_tags) + .maybe_tls_options(tls_options) // identity -- skipped: will be set on worker // retry_config -- skipped: worker overrides anyway // override_origin -- skipped: will default to tls_cfg.domain // keep_alive -- skipped: defaults to true; is there any reason to disable this? // skip_get_system_info -- skipped: defaults to false; is there any reason to set this? - .build() - .context("Invalid Client options")?; + .build(); Ok(client_options) } } - impl From for CoreTlsConfig { - fn from(val: TlsConfig) -> Self { + impl From for CoreTlsOptions { + fn from(val: TlsOptions) -> Self { Self { domain: val.domain, server_root_ca_cert: val.server_root_ca_cert, - client_tls_config: val.client_tls_config.map(|pair| CoreClientTlsConfig { + client_tls_options: val.client_tls_config.map(|pair| CoreClientTlsOptions { client_cert: pair.client_cert, client_private_key: pair.client_private_key, }), diff --git a/packages/core-bridge/src/runtime.rs b/packages/core-bridge/src/runtime.rs index 24993b1bd..fae1d1449 100644 --- a/packages/core-bridge/src/runtime.rs +++ b/packages/core-bridge/src/runtime.rs @@ -59,12 +59,13 @@ pub struct Runtime { pub fn runtime_new( bridge_options: config::RuntimeOptions, ) -> BridgeResult> { - let (telemetry_options, metrics_options, logging_options) = bridge_options.try_into()?; + let (telemetry_options, metrics_options, logging_options, worker_heartbeat_interval_millis) = + bridge_options.try_into()?; // Create core runtime which starts tokio multi-thread runtime let runtime_options = RuntimeOptionsBuilder::default() .telemetry_options(telemetry_options) - .heartbeat_interval(None) + .heartbeat_interval(worker_heartbeat_interval_millis.map(Duration::from_millis)) .build() .context("Failed to build runtime options")?; let mut core_runtime = CoreRuntime::new(runtime_options, TokioRuntimeBuilder::default()) @@ -266,6 +267,7 @@ mod config { log_exporter: LogExporterOptions, telemetry: TelemetryOptions, metrics_exporter: Option, + worker_heartbeat_interval_millis: Option, } #[derive(Debug, Clone, TryFromJs)] @@ -322,6 +324,7 @@ mod config { CoreTelemetryOptions, Option, super::BridgeLogExporter, + Option, )> for RuntimeOptions { type Error = BridgeError; @@ -331,8 +334,16 @@ mod config { CoreTelemetryOptions, Option, super::BridgeLogExporter, + Option, )> { - let (telemetry_logger, log_exporter) = match self.log_exporter { + let Self { + log_exporter, + telemetry, + metrics_exporter, + worker_heartbeat_interval_millis, + } = self; + + let (telemetry_logger, log_exporter) = match log_exporter { LogExporterOptions::Console { filter } => ( CoreTelemetryLogger::Console { filter }, BridgeLogExporter::Console, @@ -352,17 +363,21 @@ mod config { let mut telemetry_options = TelemetryOptionsBuilder::default(); let telemetry_options = telemetry_options .logging(telemetry_logger) - .metric_prefix(self.telemetry.metric_prefix) - .attach_service_name(self.telemetry.attach_service_name) + .metric_prefix(telemetry.metric_prefix) + .attach_service_name(telemetry.attach_service_name) .build() .context("Failed to build telemetry options")?; - let metrics_exporter = self - .metrics_exporter + let metrics_exporter = metrics_exporter .map(std::convert::TryInto::try_into) .transpose()?; - Ok((telemetry_options, metrics_exporter, log_exporter)) + Ok(( + telemetry_options, + metrics_exporter, + log_exporter, + worker_heartbeat_interval_millis, + )) } } diff --git a/packages/core-bridge/src/worker.rs b/packages/core-bridge/src/worker.rs index d0d0cdff3..eaa25444c 100644 --- a/packages/core-bridge/src/worker.rs +++ b/packages/core-bridge/src/worker.rs @@ -166,6 +166,9 @@ pub fn worker_complete_workflow_activation( ), } } + CompleteWfError::WorkflowNotEnabled => { + BridgeError::UnexpectedError(err.to_string()) + } }) }) } @@ -225,6 +228,9 @@ pub fn worker_complete_activity_task( field: None, message: format!("Malformed Activity Completion: {reason:?}"), }, + CompleteActivityError::ActivityNotEnabled => { + BridgeError::UnexpectedError(err.to_string()) + } }) }) } @@ -296,7 +302,7 @@ pub fn worker_complete_nexus_task( .await .map_err(|err| match err { CompleteNexusError::NexusNotEnabled => { - BridgeError::UnexpectedError(format!("{err}")) + BridgeError::UnexpectedError(err.to_string()) } CompleteNexusError::MalformedNexusCompletion { reason } => BridgeError::TypeError { field: None, @@ -463,9 +469,10 @@ impl MutableFinalize for HistoryForReplayTunnelHandle {} //////////////////////////////////////////////////////////////////////////////////////////////////// mod config { + use std::collections::HashSet; use std::{sync::Arc, time::Duration}; - use temporalio_common::protos::temporal::api::enums::v1::VersioningBehavior as CoreVersioningBehavior; + use temporalio_common::protos::temporal::api::worker::v1::PluginInfo; use temporalio_common::worker::{ ActivitySlotKind, LocalActivitySlotKind, NexusSlotKind, PollerBehavior as CorePollerBehavior, SlotKind, WorkerConfig, WorkerConfigBuilder, @@ -499,7 +506,7 @@ mod config { workflow_task_poller_behavior: PollerBehavior, activity_task_poller_behavior: PollerBehavior, nexus_task_poller_behavior: PollerBehavior, - enable_non_local_activities: bool, + task_types: WorkerTaskTypes, sticky_queue_schedule_to_start_timeout: Duration, max_cached_workflows: usize, max_heartbeat_throttle_interval: Duration, @@ -507,6 +514,7 @@ mod config { max_activities_per_second: Option, max_task_queue_activities_per_second: Option, shutdown_grace_time: Option, + plugins: Vec, } #[derive(TryFromJs)] @@ -540,6 +548,26 @@ mod config { AutoUpgrade, } + #[derive(TryFromJs)] + #[allow(clippy::struct_excessive_bools)] + pub struct WorkerTaskTypes { + enable_workflows: bool, + enable_local_activities: bool, + enable_remote_activities: bool, + enable_nexus: bool, + } + + impl From<&WorkerTaskTypes> for temporalio_common::worker::WorkerTaskTypes { + fn from(t: &WorkerTaskTypes) -> Self { + Self { + enable_workflows: t.enable_workflows, + enable_local_activities: t.enable_local_activities, + enable_remote_activities: t.enable_remote_activities, + enable_nexus: t.enable_nexus, + } + } + } + impl BridgeWorkerOptions { pub(crate) fn into_core_config(self) -> Result { // Set all other options @@ -566,7 +594,7 @@ mod config { .workflow_task_poller_behavior(self.workflow_task_poller_behavior) .activity_task_poller_behavior(self.activity_task_poller_behavior) .nexus_task_poller_behavior(self.nexus_task_poller_behavior) - .no_remote_activities(!self.enable_non_local_activities) + .task_types(&self.task_types) .sticky_queue_schedule_to_start_timeout(self.sticky_queue_schedule_to_start_timeout) .max_cached_workflows(self.max_cached_workflows) .max_heartbeat_throttle_interval(self.max_heartbeat_throttle_interval) @@ -574,6 +602,15 @@ mod config { .max_task_queue_activities_per_second(self.max_task_queue_activities_per_second) .max_worker_activities_per_second(self.max_activities_per_second) .graceful_shutdown_period(self.shutdown_grace_time) + .plugins( + self.plugins + .into_iter() + .map(|name| PluginInfo { + name, + version: String::new(), + }) + .collect::>(), + ) .build() } } diff --git a/packages/core-bridge/ts/native.ts b/packages/core-bridge/ts/native.ts index 2ddd20a1c..099bf92f6 100644 --- a/packages/core-bridge/ts/native.ts +++ b/packages/core-bridge/ts/native.ts @@ -40,7 +40,7 @@ export type JsonString<_T> = string; // Runtime //////////////////////////////////////////////////////////////////////////////////////////////////// -export declare function newRuntime(telemOptions: RuntimeOptions): Runtime; +export declare function newRuntime(runtimeOptions: RuntimeOptions): Runtime; export declare function runtimeShutdown(runtime: Runtime): void; @@ -52,6 +52,7 @@ export type RuntimeOptions = { logExporter: LogExporterOptions; telemetry: TelemetryOptions; metricsExporter: MetricExporterOptions; + workerHeartbeatIntervalMillis: Option; }; export type TelemetryOptions = { @@ -213,7 +214,12 @@ export interface WorkerOptions { workflowTaskPollerBehavior: PollerBehavior; activityTaskPollerBehavior: PollerBehavior; nexusTaskPollerBehavior: PollerBehavior; - enableNonLocalActivities: boolean; + taskTypes: { + enableWorkflows: boolean; + enableLocalActivities: boolean; + enableRemoteActivities: boolean; + enableNexus: boolean; + }; stickyQueueScheduleToStartTimeout: number; maxCachedWorkflows: number; maxHeartbeatThrottleInterval: number; @@ -221,6 +227,7 @@ export interface WorkerOptions { maxTaskQueueActivitiesPerSecond: Option; maxActivitiesPerSecond: Option; shutdownGraceTime: number; + plugins: string[]; } export type PollerBehavior = diff --git a/packages/test/src/test-bridge.ts b/packages/test/src/test-bridge.ts index df14e0183..7ae3b6a16 100644 --- a/packages/test/src/test-bridge.ts +++ b/packages/test/src/test-bridge.ts @@ -241,6 +241,7 @@ const GenericConfigs = { attachServiceName: false, }, metricsExporter: null, + workerHeartbeatIntervalMillis: null, } satisfies native.RuntimeOptions, }, client: { @@ -298,7 +299,12 @@ const GenericConfigs = { initial: 5, maximum: 100, }, - enableNonLocalActivities: false, + taskTypes: { + enableWorkflows: true, + enableLocalActivities: false, + enableRemoteActivities: false, + enableNexus: false, + }, stickyQueueScheduleToStartTimeout: 1000, maxCachedWorkflows: 1000, maxHeartbeatThrottleInterval: 1000, @@ -306,6 +312,7 @@ const GenericConfigs = { maxTaskQueueActivitiesPerSecond: null, maxActivitiesPerSecond: null, shutdownGraceTime: 1000, + plugins: [], } satisfies native.WorkerOptions, }, ephemeralServer: { diff --git a/packages/test/src/test-local-activities.ts b/packages/test/src/test-local-activities.ts index ae73518b9..7f92bc360 100644 --- a/packages/test/src/test-local-activities.ts +++ b/packages/test/src/test-local-activities.ts @@ -657,3 +657,24 @@ test.serial('retryPolicy is set correctly', async (t) => { t.deepEqual(await executeWorkflow(getRetryPolicyFromActivityInfo, { args: [retryPolicy, false] }), retryPolicy); }); }); + +export async function runLocalActivityWithNonLocalActivitiesDisabled(): Promise { + const { echo } = workflow.proxyLocalActivities({ startToCloseTimeout: '1m' }); + return await echo('hello from local activity'); +} + +test.serial('Local activities work when enableNonLocalActivities is false', async (t) => { + const { executeWorkflow, createWorker } = helpers(t); + const worker = await createWorker({ + activities: { + async echo(message: string): Promise { + return message; + }, + }, + enableNonLocalActivities: false, + }); + await worker.runUntil(async () => { + const result = await executeWorkflow(runLocalActivityWithNonLocalActivitiesDisabled); + t.is(result, 'hello from local activity'); + }); +}); diff --git a/packages/worker/src/runtime-options.ts b/packages/worker/src/runtime-options.ts index d38db5a87..2d4a9cbc4 100644 --- a/packages/worker/src/runtime-options.ts +++ b/packages/worker/src/runtime-options.ts @@ -32,6 +32,14 @@ export interface RuntimeOptions { */ telemetryOptions?: TelemetryOptions; + /** + * Interval for worker heartbeats. Accepted range is between 1s and 60s. `0` disables heartbeating. + * + * @format number of milliseconds or {@link https://www.npmjs.com/package/ms | ms-formatted string} + * @default 60000 (60 seconds) + */ + workerHeartbeatInterval?: Duration; + /** * Automatically shutdown workers on any of these signals. * @@ -359,7 +367,7 @@ export interface PrometheusMetricsExporter { */ export interface CompiledRuntimeOptions { shutdownSignals: NodeJS.Signals[]; - telemetryOptions: native.RuntimeOptions; + runtimeOptions: native.RuntimeOptions; logger: Logger; } @@ -367,10 +375,12 @@ export function compileOptions(options: RuntimeOptions): CompiledRuntimeOptions const { metrics, noTemporalPrefixForMetrics } = options.telemetryOptions ?? {}; // eslint-disable-line deprecation/deprecation const [logger, logExporter] = compileLoggerOptions(options); + const heartbeatMillis = msToNumber(options.workerHeartbeatInterval ?? '60s'); + return { logger, shutdownSignals: options.shutdownSignals ?? ['SIGINT', 'SIGTERM', 'SIGQUIT', 'SIGUSR2'], - telemetryOptions: { + runtimeOptions: { logExporter, telemetry: { metricPrefix: metrics?.metricPrefix ?? (noTemporalPrefixForMetrics ? '' : 'temporal_'), @@ -400,6 +410,7 @@ export function compileOptions(options: RuntimeOptions): CompiledRuntimeOptions globalTags: metrics.globalTags ?? {}, } satisfies native.MetricExporterOptions) : null, + workerHeartbeatIntervalMillis: heartbeatMillis === 0 ? null : heartbeatMillis, }, }; } diff --git a/packages/worker/src/runtime.ts b/packages/worker/src/runtime.ts index 49bfc0f5a..e98227e68 100644 --- a/packages/worker/src/runtime.ts +++ b/packages/worker/src/runtime.ts @@ -50,7 +50,7 @@ export class Runtime { public readonly options: CompiledRuntimeOptions ) { this.logger = options.logger; - this.metricMeter = options.telemetryOptions.metricsExporter + this.metricMeter = options.runtimeOptions.metricsExporter ? MetricMeterWithComposedTags.compose(new RuntimeMetricMeter(this.native), {}, true) : noopMetricMeter; @@ -97,7 +97,7 @@ export class Runtime { */ protected static create(options: RuntimeOptions, instantiator: 'install' | 'instance'): Runtime { const compiledOptions = compileOptions(options); - const runtime = native.newRuntime(compiledOptions.telemetryOptions); + const runtime = native.newRuntime(compiledOptions.runtimeOptions); // Remember the provided options in case Core is reinstantiated after being shut down this.defaultOptions = options; diff --git a/packages/worker/src/worker-options.ts b/packages/worker/src/worker-options.ts index f04771623..e738b6234 100644 --- a/packages/worker/src/worker-options.ts +++ b/packages/worker/src/worker-options.ts @@ -1078,6 +1078,8 @@ function nexusServiceRegistryFromOptions(opts: WorkerOptions): nexus.ServiceRegi } export function toNativeWorkerOptions(opts: CompiledWorkerOptionsWithBuildId): native.WorkerOptions { + const enableWorkflows = opts.workflowBundle !== undefined || opts.workflowsPath !== undefined; + const enableLocalActivities = enableWorkflows && opts.activities.size > 0; return { identity: opts.identity, buildId: opts.buildId, // eslint-disable-line deprecation/deprecation @@ -1090,7 +1092,12 @@ export function toNativeWorkerOptions(opts: CompiledWorkerOptionsWithBuildId): n workflowTaskPollerBehavior: toNativeTaskPollerBehavior(opts.workflowTaskPollerBehavior), activityTaskPollerBehavior: toNativeTaskPollerBehavior(opts.activityTaskPollerBehavior), nexusTaskPollerBehavior: toNativeTaskPollerBehavior(opts.nexusTaskPollerBehavior), - enableNonLocalActivities: opts.enableNonLocalActivities, + taskTypes: { + enableWorkflows, + enableLocalActivities, + enableRemoteActivities: opts.enableNonLocalActivities && opts.activities.size > 0, + enableNexus: opts.nexusServiceRegistry !== undefined, + }, stickyQueueScheduleToStartTimeout: msToNumber(opts.stickyQueueScheduleToStartTimeout), maxCachedWorkflows: opts.maxCachedWorkflows, maxHeartbeatThrottleInterval: msToNumber(opts.maxHeartbeatThrottleInterval), @@ -1098,6 +1105,7 @@ export function toNativeWorkerOptions(opts: CompiledWorkerOptionsWithBuildId): n maxTaskQueueActivitiesPerSecond: opts.maxTaskQueueActivitiesPerSecond ?? null, maxActivitiesPerSecond: opts.maxActivitiesPerSecond ?? null, shutdownGraceTime: msToNumber(opts.shutdownGraceTime), + plugins: opts.plugins?.map((p) => p.name) ?? [], }; }