diff --git a/rclrs/src/qos.rs b/rclrs/src/qos.rs index dcaa0b30..ba056a63 100644 --- a/rclrs/src/qos.rs +++ b/rclrs/src/qos.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use crate::rcl_bindings::*; +use crate::{log_error, rcl_bindings::*}; /// The `HISTORY` DDS QoS policy. /// @@ -61,6 +61,9 @@ pub enum QoSReliabilityPolicy { Reliable = 1, /// Send messages but do not guarantee delivery. BestEffort = 2, + /// Use the highest level of service among the majority of endpoints currently available. + #[cfg(not(ros_distro = "humble"))] + BestAvailable = 4, } /// The `DURABILITY` DDS QoS policy. @@ -88,6 +91,9 @@ pub enum QoSDurabilityPolicy { TransientLocal = 1, /// Do not retain/request old messages. Volatile = 2, + /// Use the highest level of service among the majority of endpoints currently available. + #[cfg(not(ros_distro = "humble"))] + BestAvailable = 4, } /// The `LIVELINESS` DDS QoS policy. @@ -113,6 +119,9 @@ pub enum QoSLivelinessPolicy { /// on the topic or an explicit signal from the application to assert liveliness on the topic /// will mark the topic as being alive. ManualByTopic = 3, + /// Use the highest level of service among the majority of endpoints currently available. + #[cfg(not(ros_distro = "humble"))] + BestAvailable = 5, } /// A duration that can take two special values: System default and infinite. @@ -206,6 +215,44 @@ impl From for rmw_qos_profile_t { } } +impl From<&rmw_qos_profile_t> for QoSProfile { + fn from(profile: &rmw_qos_profile_t) -> Self { + let history = match profile.history { + rmw_qos_history_policy_e::RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT => { + QoSHistoryPolicy::SystemDefault { + depth: profile.depth as u32, + } + } + rmw_qos_history_policy_e::RMW_QOS_POLICY_HISTORY_KEEP_LAST => { + QoSHistoryPolicy::KeepLast { + depth: profile.depth as u32, + } + } + rmw_qos_history_policy_e::RMW_QOS_POLICY_HISTORY_KEEP_ALL => QoSHistoryPolicy::KeepAll, + rmw_qos_history_policy_e::RMW_QOS_POLICY_HISTORY_UNKNOWN => { + log_error!( + "QoSProfile.conversion", + "History policy is unknown. Converting as KeepLast.", + ); + QoSHistoryPolicy::KeepLast { + depth: profile.depth as u32, + } + } + }; + + QoSProfile { + history, + reliability: (&profile.reliability).into(), + durability: (&profile.durability).into(), + deadline: (&profile.deadline).into(), + lifespan: (&profile.lifespan).into(), + liveliness: (&profile.liveliness).into(), + liveliness_lease: (&profile.liveliness_lease_duration).into(), + avoid_ros_namespace_conventions: profile.avoid_ros_namespace_conventions, + } + } +} + impl QoSProfile { /// Sets the QoS profile history to [QoSHistoryPolicy::KeepLast] with the specified depth. pub fn keep_last(mut self, depth: u32) -> Self { @@ -300,6 +347,24 @@ impl QoSProfile { pub fn action_status_default() -> Self { QOS_PROFILE_ACTION_STATUS_DEFAULT } + + /// Match the highest level service of the majority of endpoints available + /// when the primitive (publisher or subscription) is created. + /// + /// From rclcpp documentation: + /// > The middleware is not expected to update policies after creating a + /// > subscription or publisher, even if one or more policies are incompatible + /// > with newly discovered endpoints. Therefore, this profile should be used + /// > with care since non-deterministic behavior can occur due to races with + /// > discovery. + #[cfg(not(ros_distro = "humble"))] + pub fn best_available() -> Self { + unsafe { + // SAFETY: There are no preconditions for using this static const + // global variable + (&rmw_qos_profile_best_available).into() + } + } } impl From for rmw_qos_history_policy_t { @@ -328,6 +393,37 @@ impl From for rmw_qos_reliability_policy_t { QoSReliabilityPolicy::BestEffort => { rmw_qos_reliability_policy_t::RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT } + #[cfg(not(ros_distro = "humble"))] + QoSReliabilityPolicy::BestAvailable => { + rmw_qos_reliability_policy_t::RMW_QOS_POLICY_RELIABILITY_BEST_AVAILABLE + } + } + } +} + +impl From<&rmw_qos_reliability_policy_t> for QoSReliabilityPolicy { + fn from(policy: &rmw_qos_reliability_policy_t) -> Self { + match policy { + rmw_qos_reliability_policy_e::RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT => { + QoSReliabilityPolicy::SystemDefault + } + rmw_qos_reliability_policy_e::RMW_QOS_POLICY_RELIABILITY_RELIABLE => { + QoSReliabilityPolicy::Reliable + } + rmw_qos_reliability_policy_e::RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT => { + QoSReliabilityPolicy::BestEffort + } + #[cfg(not(ros_distro = "humble"))] + rmw_qos_reliability_policy_e::RMW_QOS_POLICY_RELIABILITY_BEST_AVAILABLE => { + QoSReliabilityPolicy::BestAvailable + } + rmw_qos_reliability_policy_e::RMW_QOS_POLICY_RELIABILITY_UNKNOWN => { + log_error!( + "QoSReliabilityPolicy.conversion", + "Reliability policy is unknown. Converting as SystemDefault", + ); + QoSReliabilityPolicy::SystemDefault + } } } } @@ -344,6 +440,37 @@ impl From for rmw_qos_durability_policy_t { QoSDurabilityPolicy::Volatile => { rmw_qos_durability_policy_t::RMW_QOS_POLICY_DURABILITY_VOLATILE } + #[cfg(not(ros_distro = "humble"))] + QoSDurabilityPolicy::BestAvailable => { + rmw_qos_durability_policy_t::RMW_QOS_POLICY_DURABILITY_BEST_AVAILABLE + } + } + } +} + +impl From<&rmw_qos_durability_policy_t> for QoSDurabilityPolicy { + fn from(policy: &rmw_qos_durability_policy_t) -> Self { + match policy { + rmw_qos_durability_policy_e::RMW_QOS_POLICY_DURABILITY_SYSTEM_DEFAULT => { + QoSDurabilityPolicy::SystemDefault + } + rmw_qos_durability_policy_e::RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL => { + QoSDurabilityPolicy::TransientLocal + } + rmw_qos_durability_policy_e::RMW_QOS_POLICY_DURABILITY_VOLATILE => { + QoSDurabilityPolicy::Volatile + } + #[cfg(not(ros_distro = "humble"))] + rmw_qos_durability_policy_e::RMW_QOS_POLICY_DURABILITY_BEST_AVAILABLE => { + QoSDurabilityPolicy::BestAvailable + } + rmw_qos_durability_policy_e::RMW_QOS_POLICY_DURABILITY_UNKNOWN => { + log_error!( + "QoSDurabilityPolicy.conversion", + "Durability policy is unknown. Converting as SystemDefault.", + ); + QoSDurabilityPolicy::SystemDefault + } } } } @@ -360,6 +487,44 @@ impl From for rmw_qos_liveliness_policy_t { QoSLivelinessPolicy::ManualByTopic => { rmw_qos_liveliness_policy_t::RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC } + #[cfg(not(ros_distro = "humble"))] + QoSLivelinessPolicy::BestAvailable => { + rmw_qos_liveliness_policy_t::RMW_QOS_POLICY_LIVELINESS_BEST_AVAILABLE + } + } + } +} + +impl From<&rmw_qos_liveliness_policy_t> for QoSLivelinessPolicy { + fn from(value: &rmw_qos_liveliness_policy_t) -> Self { + match value { + rmw_qos_liveliness_policy_e::RMW_QOS_POLICY_LIVELINESS_SYSTEM_DEFAULT => { + QoSLivelinessPolicy::SystemDefault + } + rmw_qos_liveliness_policy_e::RMW_QOS_POLICY_LIVELINESS_AUTOMATIC => { + QoSLivelinessPolicy::Automatic + } + rmw_qos_liveliness_policy_e::RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC => { + QoSLivelinessPolicy::ManualByTopic + } + #[cfg(not(ros_distro = "humble"))] + rmw_qos_liveliness_policy_e::RMW_QOS_POLICY_LIVELINESS_BEST_AVAILABLE => { + QoSLivelinessPolicy::BestAvailable + } + rmw_qos_liveliness_policy_e::RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE => { + log_error!( + "QoSLivelinessPolicy.conversion", + "Using deprecated liveliness mode. Converting as ManualByTopic.", + ); + QoSLivelinessPolicy::ManualByTopic + } + rmw_qos_liveliness_policy_e::RMW_QOS_POLICY_LIVELINESS_UNKNOWN => { + log_error!( + "QoSLivelinessPolicy.conversion", + "Liveliness policy is unknown. Converting as SystemDefault.", + ); + QoSLivelinessPolicy::SystemDefault + } } } } @@ -385,6 +550,33 @@ impl From for rmw_time_t { } } +impl From<&rmw_time_t> for Duration { + fn from(value: &rmw_time_t) -> Self { + Duration::from_secs(value.sec) + Duration::from_nanos(value.nsec) + } +} + +impl From<&rmw_time_t> for QoSDuration { + fn from(value: &rmw_time_t) -> Self { + let value: Duration = value.into(); + let time_eq = |a: Duration, b: QoSDuration| { + let b: rmw_time_t = b.into(); + let b: Duration = (&b).into(); + a == b + }; + + if time_eq(value, QoSDuration::SystemDefault) { + return QoSDuration::SystemDefault; + } + + if time_eq(value, QoSDuration::Infinite) { + return QoSDuration::Infinite; + } + + QoSDuration::Custom(value) + } +} + /// Equivalent to `rmw_qos_profile_sensor_data` from the [`rmw` package][1]. /// /// [1]: https://github.com/ros2/rmw/blob/master/rmw/include/rmw/qos_profiles.h diff --git a/rclrs/src/subscription.rs b/rclrs/src/subscription.rs index 06d331eb..dd32ee09 100644 --- a/rclrs/src/subscription.rs +++ b/rclrs/src/subscription.rs @@ -7,9 +7,9 @@ use std::{ use rosidl_runtime_rs::{Message, RmwMessage}; use crate::{ - error::ToResult, qos::QoSProfile, rcl_bindings::*, IntoPrimitiveOptions, Node, NodeHandle, - RclPrimitive, RclPrimitiveHandle, RclPrimitiveKind, RclrsError, ReadyKind, Waitable, - WaitableLifecycle, WorkScope, Worker, WorkerCommands, ENTITY_LIFECYCLE_MUTEX, + error::ToResult, log_error, qos::QoSProfile, rcl_bindings::*, IntoPrimitiveOptions, Node, + NodeHandle, RclPrimitive, RclPrimitiveHandle, RclPrimitiveKind, RclrsError, ReadyKind, + Waitable, WaitableLifecycle, WorkScope, Worker, WorkerCommands, ENTITY_LIFECYCLE_MUTEX, }; mod any_subscription_callback; @@ -110,6 +110,27 @@ where .into_owned() } + /// Returns the QoS settings of the subscription. + pub fn qos(&self) -> QoSProfile { + let options = unsafe { + // SAFETY: The handle is protected by a mutex. No other + // preconditions need to be met. + let handle = self.handle.lock(); + let options = rcl_subscription_get_options(&*handle); + if options.is_null() { + None + } else { + Some((&(*options).qos).into()) + } + }; + + if options.is_none() { + log_error!("Subscroption.qos", "Options returned null"); + } + + options.unwrap_or_default() + } + /// Used by [`Node`][crate::Node] to create a new subscription. pub(crate) fn create<'a>( options: impl Into>, @@ -592,4 +613,102 @@ mod tests { let message_was_received = success.load(Ordering::Acquire); assert!(message_was_received); } + + #[test] + fn test_subscription_qos_settings() { + use crate::vendor::example_interfaces::msg::Empty; + use crate::*; + + let executor = Context::default().create_basic_executor(); + + let node = executor + .create_node(&format!("test_subscription_qos_settings_{}", line!())) + .unwrap(); + + let subscription = node + .create_subscription("test_subscription_qos_topic".best_effort(), |_: Empty| { + // Do nothing + }) + .unwrap(); + + let qos = subscription.qos(); + assert_eq!(qos.reliability, QoSReliabilityPolicy::BestEffort); + + let expected_qos = QoSProfile::topics_default().best_effort(); + assert_eq!(expected_qos.reliability, QoSReliabilityPolicy::BestEffort); + let subscription = node + .create_subscription( + "test_subscription_qos_topic_2".qos(expected_qos), + |_: Empty| { + // Do nothing + }, + ) + .unwrap(); + + let qos = subscription.qos(); + assert_eq!(expected_qos.reliability, qos.reliability); + assert_eq!(qos.reliability, QoSReliabilityPolicy::BestEffort); + + let subscription = node + .create_subscription( + SubscriptionOptions { + topic: "test_subscription_qos_topic_3", + qos: expected_qos, + }, + |_: Empty| { + // Do nothing + }, + ) + .unwrap(); + + let qos = subscription.qos(); + assert_eq!(expected_qos.reliability, qos.reliability); + assert_eq!(qos.reliability, QoSReliabilityPolicy::BestEffort); + } + + #[test] + fn test_setting_qos_from_parameters() { + use crate::vendor::example_interfaces::msg::Empty; + use crate::*; + + let args = ["--ros-args", "-p", "qos_reliability:=best_effort"].map(ToString::to_string); + + let context = Context::new(args, InitOptions::default()).unwrap(); + + let executor = context.create_basic_executor(); + + let node = executor + .create_node(&format!("test_setting_qos_from_parameters_{}", line!())) + .unwrap(); + + let qos_reliability_str = node + .declare_parameter::>("qos_reliability") + .default("best_effort".into()) + .mandatory() + .unwrap() + .get(); + + let mut expected_qos = QOS_PROFILE_DEFAULT; + expected_qos.reliability = match &*qos_reliability_str { + "reliable" => QoSReliabilityPolicy::Reliable, + "best_effort" => QoSReliabilityPolicy::BestEffort, + #[cfg(not(ros_distro = "humble"))] + "best_available" => QoSReliabilityPolicy::BestAvailable, + x => panic!("unknown reliability string: {x}"), + }; + + assert_eq!(expected_qos.reliability, QoSReliabilityPolicy::BestEffort); + let subscription = node + .create_subscription( + "test_setting_qos_from_parameters_topic".qos(expected_qos), + |_: Empty| { + // Do nothing + }, + ) + .unwrap(); + + let qos = subscription.qos(); + assert_eq!(expected_qos.reliability, qos.reliability); + assert_eq!(qos.reliability, QoSReliabilityPolicy::BestEffort); + } }