diff --git a/rust/docker-compose.yml b/rust/docker-compose.yml new file mode 100644 index 0000000..938bd1f --- /dev/null +++ b/rust/docker-compose.yml @@ -0,0 +1,32 @@ +version: '3.8' +services: + postgres: + image: postgres:15 + environment: + POSTGRES_DB: postgres + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + volumes: + - postgres-data:/var/lib/postgresql/data + - ./impls/src/postgres/sql/v0_create_vss_db.sql:/docker-entrypoint-initdb.d/init.sql + ports: + - "5432:5432" + networks: + - app-network + + jaeger: + image: jaegertracing/all-in-one:1.54 + container_name: jaeger + ports: + - "6831:6831/udp" + - "16686:16686" + - "14268:14268" + - "4317:4317" + - "4318:4318" + +volumes: + postgres-data: + +networks: + app-network: + driver: bridge diff --git a/rust/server/Cargo.toml b/rust/server/Cargo.toml index 2a0e6f1..f3b00da 100644 --- a/rust/server/Cargo.toml +++ b/rust/server/Cargo.toml @@ -15,3 +15,11 @@ prost = { version = "0.11.6", default-features = false, features = ["std"] } bytes = "1.4.0" serde = { version = "1.0.203", default-features = false, features = ["derive"] } toml = { version = "0.8.9", default-features = false, features = ["parse"] } + +tracing ={ version = "0.1.43" , default-features = false, features = ["log", "attributes"]} +tracing-subscriber = {version = "0.3.22", default-features = false, features = ["std", "fmt", "json"]} +tracing-opentelemetry ={ version = "0.32.0", default-features = false} +opentelemetry ={ version = "0.31.0", default-features = false} +opentelemetry_sdk ={ version = "0.31.0", default-features = false} +opentelemetry-otlp ={ version = "0.31.0", default-features = false, features = ["trace", "grpc-tonic"]} +opentelemetry-stdout = {version = "0.31.0"} \ No newline at end of file diff --git a/rust/server/src/main.rs b/rust/server/src/main.rs index 38fdccd..668c492 100644 --- a/rust/server/src/main.rs +++ b/rust/server/src/main.rs @@ -17,12 +17,14 @@ use tokio::signal::unix::SignalKind; use hyper::server::conn::http1; use hyper_util::rt::TokioIo; +use crate::tracing::configure_tracer; use crate::vss_service::VssService; use api::auth::{Authorizer, NoopAuthorizer}; use api::kv_store::KvStore; use impls::postgres_store::{Certificate, PostgresPlaintextBackend, PostgresTlsBackend}; use std::sync::Arc; +pub(crate) mod tracing; pub(crate) mod util; pub(crate) mod vss_service; @@ -59,6 +61,7 @@ fn main() { }; runtime.block_on(async { + configure_tracer(); let mut sigterm_stream = match tokio::signal::unix::signal(SignalKind::terminate()) { Ok(stream) => stream, Err(e) => { diff --git a/rust/server/src/tracing.rs b/rust/server/src/tracing.rs new file mode 100644 index 0000000..68032f9 --- /dev/null +++ b/rust/server/src/tracing.rs @@ -0,0 +1,59 @@ +use std::collections::HashMap; + +use opentelemetry::propagation::Extractor; +use opentelemetry::propagation::TextMapPropagator; +use opentelemetry::trace::TracerProvider; +use opentelemetry_otlp::{SpanExporter as OtlpExporter, WithExportConfig}; +use opentelemetry_sdk::propagation::TraceContextPropagator; +use opentelemetry_sdk::{trace::SdkTracerProvider, Resource}; +use opentelemetry_stdout::SpanExporter as StdoutExporter; + +use tracing::level_filters::LevelFilter; +use tracing_opentelemetry::OpenTelemetryLayer; +use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::{filter::Targets, fmt, layer::SubscriberExt}; + +pub struct HeaderExtractor<'a>(&'a HashMap); + +impl Extractor for HeaderExtractor<'_> { + fn get(&self, key: &str) -> Option<&str> { + self.0.get(key).map(|s| s.as_str()) + } + + fn keys(&self) -> Vec<&str> { + self.0.keys().map(|k| k.as_str()).collect() + } +} + +pub fn extract_context(headers: &HashMap) -> opentelemetry::Context { + let propagator = TraceContextPropagator::new(); + propagator.extract(&HeaderExtractor(headers)) +} + +/// Initialize tracing subscriber for Jaeger and `stdout` backends. +pub fn configure_tracer() { + let otlp_jaeger_exporter = OtlpExporter::builder() + .with_tonic() + .with_endpoint("http://localhost:4317") + .build() + .expect("Failed to create OTLP exporter"); + let stdout_exporter = StdoutExporter::default(); + + let tracer_provider = SdkTracerProvider::builder() + .with_batch_exporter(otlp_jaeger_exporter) + .with_batch_exporter(stdout_exporter) + .with_resource(Resource::builder().with_service_name("vss_server").build()) + .build(); + + let tracer = tracer_provider.tracer("vss_server"); + + tracing_subscriber::registry() + .with( + Targets::new() + .with_default(LevelFilter::WARN) + .with_target("vss_server", LevelFilter::INFO), + ) + .with(fmt::layer().json()) + .with(OpenTelemetryLayer::new(tracer)) + .init(); +} diff --git a/rust/server/src/vss_service.rs b/rust/server/src/vss_service.rs index 811bbbd..25197c4 100644 --- a/rust/server/src/vss_service.rs +++ b/rust/server/src/vss_service.rs @@ -3,6 +3,8 @@ use hyper::body::{Bytes, Incoming}; use hyper::service::Service; use hyper::{Request, Response, StatusCode}; use std::collections::HashMap; +use tracing::Instrument; +use tracing_opentelemetry::OpenTelemetrySpanExt; use prost::Message; @@ -18,6 +20,8 @@ use std::future::Future; use std::pin::Pin; use std::sync::Arc; +use crate::tracing::extract_context; + #[derive(Clone)] pub struct VssService { store: Arc, @@ -90,6 +94,7 @@ async fn handle_list_object_request( ) -> Result { store.list_key_versions(user_token, request).await } + async fn handle_request< T: Message + Default, R: Message, @@ -106,26 +111,67 @@ async fn handle_request< .map(|(k, v)| (k.as_str().to_string(), v.to_str().unwrap_or_default().to_string())) .collect::>(); + let parent_cx = extract_context(&headers_map); + let (server_address, server_port) = parts + .headers + .get("host") + .and_then(|v| v.to_str().ok()) + .map(|h| { + let mut split = h.splitn(2, ':'); + let addr = split.next().map(|s| s.to_string()); + let port = split.next().and_then(|p| p.parse::().ok()); + (addr, port) + }) + .unwrap_or((None, None)); + let span = tracing::info_span!( + "vss.server.request", + request_type = std::any::type_name::().split("::").last().unwrap_or("unknown"), + http.request.method = %parts.method, + http.route = %parts.uri.path(), + http.status_code = tracing::field::Empty, + server.address = server_address, + server.port = server_port, + ); + let _ = span.set_parent(parent_cx); + let user_token = match authorizer.verify(&headers_map).await { Ok(auth_response) => auth_response.user_token, Err(e) => return Ok(build_error_response(e)), }; - // TODO: we should bound the amount of data we read to avoid allocating too much memory. - let bytes = body.collect().await?.to_bytes(); - match T::decode(bytes) { - Ok(request) => match handler(store.clone(), user_token, request).await { - Ok(response) => Ok(Response::builder() - .body(Full::new(Bytes::from(response.encode_to_vec()))) - // unwrap safety: body only errors when previous chained calls failed. - .unwrap()), - Err(e) => Ok(build_error_response(e)), - }, - Err(_) => Ok(Response::builder() - .status(StatusCode::BAD_REQUEST) - .body(Full::new(Bytes::from(b"Error parsing request".to_vec()))) - // unwrap safety: body only errors when previous chained calls failed. - .unwrap()), + + async move { + // TODO: we should bound the amount of data we read to avoid allocating too much memory. + let bytes = body.collect().await?.to_bytes(); + tracing::info!(payload_size = bytes.len()); + match T::decode(bytes) { + Ok(request) => match handler(store.clone(), user_token, request).await { + Ok(response) => { + let status = StatusCode::OK; + tracing::Span::current().record("http.status_code", status.as_u16()); + Ok(Response::builder() + .body(Full::new(Bytes::from(response.encode_to_vec()))) + // unwrap safety: body only errors when previous chained calls failed. + .unwrap()) + }, + Err(e) => { + let response = build_error_response(e); + tracing::Span::current().record("http.status_code", response.status().as_u16()); + Ok(response) + }, + }, + Err(_) => { + let status_code = StatusCode::BAD_REQUEST; + tracing::Span::current().record("http.status_code", status_code.as_u16()); + Ok(Response::builder() + .status(status_code) + .body(Full::new(Bytes::from(b"Error parsing request".to_vec()))) + // unwrap safety: body only errors when previous chained calls failed. + .unwrap()) + }, + } } + .instrument(span) + .await } fn build_error_response(e: VssError) -> Response> {