diff --git a/Cargo.toml b/Cargo.toml index 59ad2b767..6288b8a26 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -75,6 +75,14 @@ serde = { version = "1.0.210", default-features = false, features = ["std", "der serde_json = { version = "1.0.128", default-features = false, features = ["std"] } log = { version = "0.4.22", default-features = false, features = ["std"]} +tracing ={ version = "0.1.43" , default-features = false, features = ["log", "attributes"]} +tracing-subscriber = {version = "0.3.22", default-features = false, features = ["std", "fmt", "json"]} +tracing-opentelemetry ={ version = "0.32.0", default-features = false} +opentelemetry ={ version = "0.31.0", default-features = false} +opentelemetry_sdk ={ version = "0.31.0", default-features = false} +opentelemetry-otlp ={ version = "0.31.0", default-features = false, features = ["trace", "grpc-tonic"]} +opentelemetry-stdout = {version = "0.31.0"} + vss-client = { package = "vss-client-ng", version = "0.4" } prost = { version = "0.11.6", default-features = false} diff --git a/docker-compose.yml b/docker-compose.yml index e71fd70fb..bef091b7f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -48,6 +48,16 @@ services: networks: - bitcoin-electrs + jaeger: + image: jaegertracing/all-in-one:1.54 + container_name: jaeger + ports: + - "6831:6831/udp" + - "16686:16686" + - "14268:14268" + - "4317:4317" + - "4318:4318" + networks: bitcoin-electrs: driver: bridge diff --git a/src/io/utils.rs b/src/io/utils.rs index 928d4031b..def9b82fe 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -408,6 +408,56 @@ pub(crate) fn check_namespace_key_validity( Ok(()) } +pub(crate) fn classify_entity( + primary_namespace: &str, secondary_namespace: &str, key: &str, +) -> &'static str { + if !primary_namespace.is_empty() { + return match primary_namespace { + // LDK-Node + "payments" => "payment_info", + "bdk_wallet" => match key { + "descriptor" => "bdk_wallet.descriptor", + "change_descriptor" => "bdk_wallet.change_descriptor", + "network" => "bdk_wallet.network", + "local_chain" => "bdk_wallet.local_chain", + "tx_graph" => "bdk_wallet.tx_graph", + "indexer" => "bdk_wallet.indexer", + _ => "bdk_wallet.unknown", + }, + "static_invoices" => "static_invoice", + + // LDK + "monitors" => "channel_monitor", + "monitor_updates" => "channel_monitor_update", + "archived_monitors" => "archived_channel_monitor", + + _ => "unknown", + }; + } + + debug_assert!( + secondary_namespace.is_empty(), + "Unexpected: primary empty but secondary populated: '{}'", + secondary_namespace + ); + + match key { + // LDK-Node + "events" => "event_queue", + "peers" => "peer_info", + "node_metrics" => "node_metrics", + + // LDK + "manager" => "channel_manager", + "network_graph" => "network_graph", + "scorer" => "scorer", + "external_pathfinding_scores_cache" => "external_pathfinding_scores", + "output_sweeper" => "output_sweeper", + + _ => "unknown", + } +} + macro_rules! impl_read_write_change_set_type { ( $read_name:ident, diff --git a/src/io/vss_store.rs b/src/io/vss_store.rs index eb439ed10..a06910f26 100644 --- a/src/io/vss_store.rs +++ b/src/io/vss_store.rs @@ -27,6 +27,7 @@ use lightning::util::persist::{KVStore, KVStoreSync}; use lightning::util::ser::{Readable, Writeable}; use prost::Message; use rand::RngCore; +use tracing::{instrument, Instrument}; use vss_client::client::VssClient; use vss_client::error::VssError; use vss_client::headers::{FixedHeaders, LnurlAuthToJwtProvider, VssHeaderProvider}; @@ -42,7 +43,8 @@ use vss_client::util::retry::{ use vss_client::util::storable_builder::{EntropySource, StorableBuilder}; use crate::entropy::NodeEntropy; -use crate::io::utils::check_namespace_key_validity; +use crate::io::utils::{check_namespace_key_validity, classify_entity}; +use crate::tracing::TracingHeaderProvider; type CustomRetryPolicy = FilteredRetryPolicy< JitteredRetryPolicy< @@ -135,6 +137,7 @@ impl VssStore { })?; let async_retry_policy = retry_policy(); + let header_provider = Arc::new(TracingHeaderProvider::new(header_provider)); let async_client = VssClient::new_with_headers(base_url, async_retry_policy, header_provider); @@ -184,9 +187,12 @@ impl VssStore { } impl KVStoreSync for VssStore { + #[instrument(name = "vss.sync.read", skip_all)] fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> io::Result> { + let parent_span = tracing::Span::current(); + let internal_runtime = self.internal_runtime.as_ref().ok_or_else(|| { debug_assert!(false, "Failed to access internal runtime"); let msg = format!("Failed to access internal runtime"); @@ -199,14 +205,18 @@ impl KVStoreSync for VssStore { let fut = async move { inner .read_internal(&inner.blocking_client, primary_namespace, secondary_namespace, key) + .instrument(parent_span) .await }; tokio::task::block_in_place(move || internal_runtime.block_on(fut)) } + #[instrument(name = "vss.sync.write", skip_all)] fn write( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, ) -> io::Result<()> { + let parent_span = tracing::Span::current(); + let internal_runtime = self.internal_runtime.as_ref().ok_or_else(|| { debug_assert!(false, "Failed to access internal runtime"); let msg = format!("Failed to access internal runtime"); @@ -230,14 +240,18 @@ impl KVStoreSync for VssStore { key, buf, ) + .instrument(parent_span) .await }; tokio::task::block_in_place(move || internal_runtime.block_on(fut)) } + #[instrument(name = "vss.sync.remove", skip_all)] fn remove( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, ) -> io::Result<()> { + let parent_span = tracing::Span::current(); + let internal_runtime = self.internal_runtime.as_ref().ok_or_else(|| { debug_assert!(false, "Failed to access internal runtime"); let msg = format!("Failed to access internal runtime"); @@ -260,6 +274,7 @@ impl KVStoreSync for VssStore { secondary_namespace, key, ) + .instrument(parent_span) .await }; if lazy { @@ -270,7 +285,10 @@ impl KVStoreSync for VssStore { } } + #[instrument(name = "vss.sync.list", skip_all)] fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { + let parent_span = tracing::Span::current(); + let internal_runtime = self.internal_runtime.as_ref().ok_or_else(|| { debug_assert!(false, "Failed to access internal runtime"); let msg = format!("Failed to access internal runtime"); @@ -282,6 +300,7 @@ impl KVStoreSync for VssStore { let fut = async move { inner .list_internal(&inner.blocking_client, primary_namespace, secondary_namespace) + .instrument(parent_span) .await }; tokio::task::block_in_place(move || internal_runtime.block_on(fut)) @@ -289,6 +308,7 @@ impl KVStoreSync for VssStore { } impl KVStore for VssStore { + #[instrument(name = "vss.async.read", skip_all)] fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> impl Future, io::Error>> + 'static + Send { @@ -302,6 +322,8 @@ impl KVStore for VssStore { .await } } + + #[instrument(name = "vss.async.write", skip_all)] fn write( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, ) -> impl Future> + 'static + Send { @@ -326,6 +348,8 @@ impl KVStore for VssStore { .await } } + + #[instrument(name = "vss.async.remove", skip_all)] fn remove( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, ) -> impl Future> + 'static + Send { @@ -357,6 +381,8 @@ impl KVStore for VssStore { } } } + + #[instrument(name = "vss.async.list", skip_all)] fn list( &self, primary_namespace: &str, secondary_namespace: &str, ) -> impl Future, io::Error>> + 'static + Send { @@ -464,6 +490,7 @@ impl VssStoreInner { } } + #[instrument(name = "vss.list_all_keys", skip(self, client), err)] async fn list_all_keys( &self, client: &VssClient, primary_namespace: &str, secondary_namespace: &str, @@ -495,6 +522,7 @@ impl VssStoreInner { Ok(keys) } + #[instrument(name = "vss.read_internal", skip(self, client), err)] async fn read_internal( &self, client: &VssClient, primary_namespace: String, secondary_namespace: String, key: String, @@ -531,6 +559,15 @@ impl VssStoreInner { Ok(decrypted) } + #[instrument( + name = "vss.write_internal", + skip(self, client, buf, inner_lock_ref), + fields( + vss.payload_size_bytes = buf.len(), + vss.entity = classify_entity(&primary_namespace, &secondary_namespace, &key) + ), + err + )] async fn write_internal( &self, client: &VssClient, inner_lock_ref: Arc>, locking_key: String, version: u64, primary_namespace: String, secondary_namespace: String, @@ -575,6 +612,7 @@ impl VssStoreInner { .await } + #[instrument(name = "vss.remove_internal", skip(self, client, inner_lock_ref), err)] async fn remove_internal( &self, client: &VssClient, inner_lock_ref: Arc>, locking_key: String, version: u64, primary_namespace: String, secondary_namespace: String, @@ -608,6 +646,7 @@ impl VssStoreInner { .await } + #[instrument(name = "vss.list_internal", skip(self, client), err)] async fn list_internal( &self, client: &VssClient, primary_namespace: String, secondary_namespace: String, @@ -701,6 +740,7 @@ fn retry_policy() -> CustomRetryPolicy { }) as _) } +#[instrument(name = "vss.determine_and_write_schema_version", skip(client, key_obfuscator), err)] async fn determine_and_write_schema_version( client: &VssClient, store_id: &String, data_encryption_key: [u8; 32], key_obfuscator: &KeyObfuscator, diff --git a/src/lib.rs b/src/lib.rs index fdaa0f4f1..72f6dbc8d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -102,6 +102,7 @@ pub mod payment; mod peer_store; mod runtime; mod scoring; +mod tracing; mod tx_broadcaster; mod types; mod wallet; @@ -111,6 +112,7 @@ use std::net::ToSocketAddrs; use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +pub use crate::tracing::{configure_tracer, TracingLogWriter}; pub use balance::{BalanceDetails, LightningBalance, PendingSweepBalance}; use bitcoin::secp256k1::PublicKey; use bitcoin::{Address, Amount}; diff --git a/src/payment/bolt11.rs b/src/payment/bolt11.rs index 60c313381..06db88753 100644 --- a/src/payment/bolt11.rs +++ b/src/payment/bolt11.rs @@ -21,6 +21,7 @@ use lightning_invoice::{ Bolt11Invoice as LdkBolt11Invoice, Bolt11InvoiceDescription as LdkBolt11InvoiceDescription, }; use lightning_types::payment::{PaymentHash, PaymentPreimage}; +use tracing::instrument; use crate::config::{Config, LDK_PAYMENT_RETRY_TIMEOUT}; use crate::connection::ConnectionManager; @@ -90,10 +91,12 @@ impl Bolt11Payment { /// /// If `route_parameters` are provided they will override the default as well as the /// node-wide parameters configured via [`Config::route_parameters`] on a per-field basis. + #[instrument(skip(self), ret)] pub fn send( &self, invoice: &Bolt11Invoice, route_parameters: Option, ) -> Result { if !*self.is_running.read().unwrap() { + log_error!(self.logger, "Payment send failed. Handler not running."); return Err(Error::NotRunning); } diff --git a/src/tracing.rs b/src/tracing.rs new file mode 100644 index 000000000..c99326e47 --- /dev/null +++ b/src/tracing.rs @@ -0,0 +1,125 @@ +use std::pin::Pin; +use std::sync::Arc; +use std::{collections::HashMap, future::Future}; + +use opentelemetry::propagation::Injector; +use opentelemetry::propagation::TextMapPropagator; +use opentelemetry::trace::TracerProvider; +use opentelemetry_otlp::{SpanExporter as OtlpExporter, WithExportConfig}; +use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::SdkTracerProvider, Resource}; +use opentelemetry_stdout::SpanExporter as StdoutExporter; + +use tracing::{debug, error, info, level_filters::LevelFilter, trace, warn}; +use tracing_opentelemetry::{OpenTelemetryLayer, OpenTelemetrySpanExt}; +use tracing_subscriber::{filter::Targets, fmt, layer::SubscriberExt, util::SubscriberInitExt}; +use vss_client::headers::{VssHeaderProvider, VssHeaderProviderError}; + +use crate::logger::{LogRecord, LogWriter}; + +/// Adapter that bridges our `Logger` to the `tracing` ecosystem. +/// +/// This allows existing `log_*!` macros to emit events as tracing spans/events +/// without requiring a migration to tracing-specific macros. +pub struct TracingLogWriter {} + +impl LogWriter for TracingLogWriter { + fn log(&self, record: LogRecord) { + match record.level { + lightning::util::logger::Level::Gossip => { + trace!(target: "ldk_node", module = record.module_path, line = record.line, "{}", record.args) + }, + lightning::util::logger::Level::Trace => { + trace!(target: "ldk_node", module = record.module_path, line = record.line, "{}", record.args) + }, + lightning::util::logger::Level::Debug => { + debug!(target: "ldk_node", module = record.module_path, line = record.line, "{}", record.args) + }, + lightning::util::logger::Level::Info => { + info!(target: "ldk_node", module = record.module_path, line = record.line, "{}", record.args) + }, + lightning::util::logger::Level::Warn => { + warn!(target: "ldk_node", module = record.module_path, line = record.line, "{}", record.args) + }, + lightning::util::logger::Level::Error => { + error!(target: "ldk_node", module = record.module_path, line = record.line, "{}", record.args) + }, + } + } +} + +pub(crate) struct TracingHeaderProvider { + inner: Arc, + propagator: TraceContextPropagator, +} + +impl TracingHeaderProvider { + pub fn new(inner: Arc) -> Self { + Self { inner, propagator: TraceContextPropagator::new() } + } +} + +impl VssHeaderProvider for TracingHeaderProvider { + fn get_headers<'life0, 'life1, 'async_trait>( + &'life0 self, request: &'life1 [u8], + ) -> Pin< + Box< + dyn Future, VssHeaderProviderError>> + + Send + + 'async_trait, + >, + > + where + 'life0: 'async_trait, + 'life1: 'async_trait, + Self: 'async_trait, + { + let inner = Arc::clone(&self.inner); + let request = request.to_vec(); + let propagator = self.propagator.clone(); + + Box::pin(async move { + let mut headers = inner.get_headers(&request).await?; + + let cx = tracing::Span::current().context(); + propagator.inject_context(&cx, &mut HeaderInjector(&mut headers)); + + Ok(headers) + }) + } +} + +struct HeaderInjector<'a>(&'a mut HashMap); + +impl Injector for HeaderInjector<'_> { + fn set(&mut self, key: &str, value: String) { + self.0.insert(key.to_string(), value); + } +} + +/// Initialize tracing subscriber for Jaeger and `stdout` backends. +pub fn configure_tracer() { + let otlp_jaeger_exporter = OtlpExporter::builder() + .with_tonic() + .with_endpoint("http://localhost:4317") + .build() + .expect("Failed to create OTLP exporter"); + let stdout_exporter = StdoutExporter::default(); + + let tracer_provider = SdkTracerProvider::builder() + .with_batch_exporter(otlp_jaeger_exporter) + .with_batch_exporter(stdout_exporter) + .with_resource(Resource::builder().with_service_name("ldk-node").build()) + .build(); + + let tracer = tracer_provider.tracer("ldk_node"); + + tracing_subscriber::registry() + .with( + Targets::new() + .with_default(LevelFilter::WARN) + .with_target("ldk_node", LevelFilter::INFO), + ) + .with(fmt::layer().json()) + .with(OpenTelemetryLayer::new(tracer)) + .init(); +} diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 892afedcc..4cc9e97df 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -31,7 +31,7 @@ use ldk_node::payment::{ ConfirmationStatus, PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, QrPaymentResult, }; -use ldk_node::{Builder, Event, NodeError}; +use ldk_node::{Builder, Event, NodeError, TracingLogWriter}; use lightning::ln::channelmanager::PaymentId; use lightning::routing::gossip::{NodeAlias, NodeId}; use lightning::routing::router::RouteParametersConfig; @@ -151,6 +151,8 @@ async fn channel_open_fails_when_funds_insufficient() { async fn multi_hop_sending() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); + configure_tracer(); + let tracing_writer = Arc::new(TracingLogWriter {}); // Setup and fund 5 nodes let mut nodes = Vec::new(); @@ -159,6 +161,7 @@ async fn multi_hop_sending() { let sync_config = EsploraSyncConfig { background_sync_config: None }; setup_builder!(builder, config.node_config); builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); + builder.set_custom_logger(tracing_writer); let node = builder.build(config.node_entropy.into()).unwrap(); node.start().unwrap(); nodes.push(node); diff --git a/tests/integration_tests_vss.rs b/tests/integration_tests_vss.rs index 54912b358..30df41c70 100644 --- a/tests/integration_tests_vss.rs +++ b/tests/integration_tests_vss.rs @@ -10,9 +10,11 @@ mod common; use std::collections::HashMap; +use std::sync::Arc; use ldk_node::entropy::NodeEntropy; use ldk_node::Builder; +use ldk_node::{configure_tracer, TracingLogWriter}; use rand::{rng, Rng}; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] @@ -20,9 +22,13 @@ async fn channel_full_cycle_with_vss_store() { let (bitcoind, electrsd) = common::setup_bitcoind_and_electrsd(); println!("== Node A =="); let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); + + configure_tracer(); + let tracing_writer = Arc::new(TracingLogWriter {}); let config_a = common::random_config(true); let mut builder_a = Builder::from_config(config_a.node_config); builder_a.set_chain_source_esplora(esplora_url.clone(), None); + builder_a.set_custom_logger(tracing_writer); let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap(); let node_a = builder_a .build_with_vss_store_and_fixed_headers(