Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ serde = { version = "1.0.210", default-features = false, features = ["std", "der
serde_json = { version = "1.0.128", default-features = false, features = ["std"] }
log = { version = "0.4.22", default-features = false, features = ["std"]}

tracing ={ version = "0.1.43" , default-features = false, features = ["log", "attributes"]}
tracing-subscriber = {version = "0.3.22", default-features = false, features = ["std", "fmt", "json"]}
tracing-opentelemetry ={ version = "0.32.0", default-features = false}
opentelemetry ={ version = "0.31.0", default-features = false}
opentelemetry_sdk ={ version = "0.31.0", default-features = false}
opentelemetry-otlp ={ version = "0.31.0", default-features = false, features = ["trace", "grpc-tonic"]}
opentelemetry-stdout = {version = "0.31.0"}

vss-client = { package = "vss-client-ng", version = "0.4" }
prost = { version = "0.11.6", default-features = false}

Expand Down
10 changes: 10 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ services:
networks:
- bitcoin-electrs

jaeger:
image: jaegertracing/all-in-one:1.54
container_name: jaeger
ports:
- "6831:6831/udp"
- "16686:16686"
- "14268:14268"
- "4317:4317"
- "4318:4318"

networks:
bitcoin-electrs:
driver: bridge
50 changes: 50 additions & 0 deletions src/io/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,56 @@ pub(crate) fn check_namespace_key_validity(
Ok(())
}

pub(crate) fn classify_entity(
primary_namespace: &str, secondary_namespace: &str, key: &str,
) -> &'static str {
if !primary_namespace.is_empty() {
return match primary_namespace {
// LDK-Node
"payments" => "payment_info",
"bdk_wallet" => match key {
"descriptor" => "bdk_wallet.descriptor",
"change_descriptor" => "bdk_wallet.change_descriptor",
"network" => "bdk_wallet.network",
"local_chain" => "bdk_wallet.local_chain",
"tx_graph" => "bdk_wallet.tx_graph",
"indexer" => "bdk_wallet.indexer",
_ => "bdk_wallet.unknown",
},
"static_invoices" => "static_invoice",

// LDK
"monitors" => "channel_monitor",
"monitor_updates" => "channel_monitor_update",
"archived_monitors" => "archived_channel_monitor",

_ => "unknown",
};
}

debug_assert!(
secondary_namespace.is_empty(),
"Unexpected: primary empty but secondary populated: '{}'",
secondary_namespace
);

match key {
// LDK-Node
"events" => "event_queue",
"peers" => "peer_info",
"node_metrics" => "node_metrics",

// LDK
"manager" => "channel_manager",
"network_graph" => "network_graph",
"scorer" => "scorer",
"external_pathfinding_scores_cache" => "external_pathfinding_scores",
"output_sweeper" => "output_sweeper",

_ => "unknown",
}
}

macro_rules! impl_read_write_change_set_type {
(
$read_name:ident,
Expand Down
42 changes: 41 additions & 1 deletion src/io/vss_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use lightning::util::persist::{KVStore, KVStoreSync};
use lightning::util::ser::{Readable, Writeable};
use prost::Message;
use rand::RngCore;
use tracing::{instrument, Instrument};
use vss_client::client::VssClient;
use vss_client::error::VssError;
use vss_client::headers::{FixedHeaders, LnurlAuthToJwtProvider, VssHeaderProvider};
Expand All @@ -42,7 +43,8 @@ use vss_client::util::retry::{
use vss_client::util::storable_builder::{EntropySource, StorableBuilder};

use crate::entropy::NodeEntropy;
use crate::io::utils::check_namespace_key_validity;
use crate::io::utils::{check_namespace_key_validity, classify_entity};
use crate::tracing::TracingHeaderProvider;

type CustomRetryPolicy = FilteredRetryPolicy<
JitteredRetryPolicy<
Expand Down Expand Up @@ -135,6 +137,7 @@ impl VssStore {
})?;

let async_retry_policy = retry_policy();
let header_provider = Arc::new(TracingHeaderProvider::new(header_provider));
let async_client =
VssClient::new_with_headers(base_url, async_retry_policy, header_provider);

Expand Down Expand Up @@ -184,9 +187,12 @@ impl VssStore {
}

impl KVStoreSync for VssStore {
#[instrument(name = "vss.sync.read", skip_all)]
fn read(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
) -> io::Result<Vec<u8>> {
let parent_span = tracing::Span::current();

let internal_runtime = self.internal_runtime.as_ref().ok_or_else(|| {
debug_assert!(false, "Failed to access internal runtime");
let msg = format!("Failed to access internal runtime");
Expand All @@ -199,14 +205,18 @@ impl KVStoreSync for VssStore {
let fut = async move {
inner
.read_internal(&inner.blocking_client, primary_namespace, secondary_namespace, key)
.instrument(parent_span)
.await
};
tokio::task::block_in_place(move || internal_runtime.block_on(fut))
}

#[instrument(name = "vss.sync.write", skip_all)]
fn write(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
) -> io::Result<()> {
let parent_span = tracing::Span::current();

let internal_runtime = self.internal_runtime.as_ref().ok_or_else(|| {
debug_assert!(false, "Failed to access internal runtime");
let msg = format!("Failed to access internal runtime");
Expand All @@ -230,14 +240,18 @@ impl KVStoreSync for VssStore {
key,
buf,
)
.instrument(parent_span)
.await
};
tokio::task::block_in_place(move || internal_runtime.block_on(fut))
}

#[instrument(name = "vss.sync.remove", skip_all)]
fn remove(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
) -> io::Result<()> {
let parent_span = tracing::Span::current();

let internal_runtime = self.internal_runtime.as_ref().ok_or_else(|| {
debug_assert!(false, "Failed to access internal runtime");
let msg = format!("Failed to access internal runtime");
Expand All @@ -260,6 +274,7 @@ impl KVStoreSync for VssStore {
secondary_namespace,
key,
)
.instrument(parent_span)
.await
};
if lazy {
Expand All @@ -270,7 +285,10 @@ impl KVStoreSync for VssStore {
}
}

#[instrument(name = "vss.sync.list", skip_all)]
fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> {
let parent_span = tracing::Span::current();

let internal_runtime = self.internal_runtime.as_ref().ok_or_else(|| {
debug_assert!(false, "Failed to access internal runtime");
let msg = format!("Failed to access internal runtime");
Expand All @@ -282,13 +300,15 @@ impl KVStoreSync for VssStore {
let fut = async move {
inner
.list_internal(&inner.blocking_client, primary_namespace, secondary_namespace)
.instrument(parent_span)
.await
};
tokio::task::block_in_place(move || internal_runtime.block_on(fut))
}
}

impl KVStore for VssStore {
#[instrument(name = "vss.async.read", skip_all)]
fn read(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
) -> impl Future<Output = Result<Vec<u8>, io::Error>> + 'static + Send {
Expand All @@ -302,6 +322,8 @@ impl KVStore for VssStore {
.await
}
}

#[instrument(name = "vss.async.write", skip_all)]
fn write(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
) -> impl Future<Output = Result<(), io::Error>> + 'static + Send {
Expand All @@ -326,6 +348,8 @@ impl KVStore for VssStore {
.await
}
}

#[instrument(name = "vss.async.remove", skip_all)]
fn remove(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
) -> impl Future<Output = Result<(), io::Error>> + 'static + Send {
Expand Down Expand Up @@ -357,6 +381,8 @@ impl KVStore for VssStore {
}
}
}

#[instrument(name = "vss.async.list", skip_all)]
fn list(
&self, primary_namespace: &str, secondary_namespace: &str,
) -> impl Future<Output = Result<Vec<String>, io::Error>> + 'static + Send {
Expand Down Expand Up @@ -464,6 +490,7 @@ impl VssStoreInner {
}
}

#[instrument(name = "vss.list_all_keys", skip(self, client), err)]
async fn list_all_keys(
&self, client: &VssClient<CustomRetryPolicy>, primary_namespace: &str,
secondary_namespace: &str,
Expand Down Expand Up @@ -495,6 +522,7 @@ impl VssStoreInner {
Ok(keys)
}

#[instrument(name = "vss.read_internal", skip(self, client), err)]
async fn read_internal(
&self, client: &VssClient<CustomRetryPolicy>, primary_namespace: String,
secondary_namespace: String, key: String,
Expand Down Expand Up @@ -531,6 +559,15 @@ impl VssStoreInner {
Ok(decrypted)
}

#[instrument(
name = "vss.write_internal",
skip(self, client, buf, inner_lock_ref),
fields(
vss.payload_size_bytes = buf.len(),
vss.entity = classify_entity(&primary_namespace, &secondary_namespace, &key)
),
err
)]
async fn write_internal(
&self, client: &VssClient<CustomRetryPolicy>, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>,
locking_key: String, version: u64, primary_namespace: String, secondary_namespace: String,
Expand Down Expand Up @@ -575,6 +612,7 @@ impl VssStoreInner {
.await
}

#[instrument(name = "vss.remove_internal", skip(self, client, inner_lock_ref), err)]
async fn remove_internal(
&self, client: &VssClient<CustomRetryPolicy>, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>,
locking_key: String, version: u64, primary_namespace: String, secondary_namespace: String,
Expand Down Expand Up @@ -608,6 +646,7 @@ impl VssStoreInner {
.await
}

#[instrument(name = "vss.list_internal", skip(self, client), err)]
async fn list_internal(
&self, client: &VssClient<CustomRetryPolicy>, primary_namespace: String,
secondary_namespace: String,
Expand Down Expand Up @@ -701,6 +740,7 @@ fn retry_policy() -> CustomRetryPolicy {
}) as _)
}

#[instrument(name = "vss.determine_and_write_schema_version", skip(client, key_obfuscator), err)]
async fn determine_and_write_schema_version(
client: &VssClient<CustomRetryPolicy>, store_id: &String, data_encryption_key: [u8; 32],
key_obfuscator: &KeyObfuscator,
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ pub mod payment;
mod peer_store;
mod runtime;
mod scoring;
mod tracing;
mod tx_broadcaster;
mod types;
mod wallet;
Expand All @@ -111,6 +112,7 @@ use std::net::ToSocketAddrs;
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};

pub use crate::tracing::{configure_tracer, TracingLogWriter};
pub use balance::{BalanceDetails, LightningBalance, PendingSweepBalance};
use bitcoin::secp256k1::PublicKey;
use bitcoin::{Address, Amount};
Expand Down
3 changes: 3 additions & 0 deletions src/payment/bolt11.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use lightning_invoice::{
Bolt11Invoice as LdkBolt11Invoice, Bolt11InvoiceDescription as LdkBolt11InvoiceDescription,
};
use lightning_types::payment::{PaymentHash, PaymentPreimage};
use tracing::instrument;

use crate::config::{Config, LDK_PAYMENT_RETRY_TIMEOUT};
use crate::connection::ConnectionManager;
Expand Down Expand Up @@ -90,10 +91,12 @@ impl Bolt11Payment {
///
/// If `route_parameters` are provided they will override the default as well as the
/// node-wide parameters configured via [`Config::route_parameters`] on a per-field basis.
#[instrument(skip(self), ret)]
pub fn send(
&self, invoice: &Bolt11Invoice, route_parameters: Option<RouteParametersConfig>,
) -> Result<PaymentId, Error> {
if !*self.is_running.read().unwrap() {
log_error!(self.logger, "Payment send failed. Handler not running.");
return Err(Error::NotRunning);
}

Expand Down
Loading
Loading