Skip to content

Commit 581b5ca

Browse files
committed
refactor: improve tracing span usage
1 parent 44b90d9 commit 581b5ca

File tree

8 files changed

+84
-30
lines changed

8 files changed

+84
-30
lines changed

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ edition = "2021"
1010
rust-version = "1.81"
1111
authors = ["init4", "James Prestwich"]
1212
license = "MIT OR Apache-2.0"
13-
homepage = "https://github.com/init4tech/rpc"
14-
repository = "https://github.com/init4tech/rpc"
13+
homepage = "https://github.com/init4tech/ajj"
14+
repository = "https://github.com/init4tech/ajj"
1515

1616
[dependencies]
1717
bytes = "1.9.0"

src/pubsub/shared.rs

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use serde_json::value::RawValue;
88
use tokio::{pin, select, sync::mpsc, task::JoinHandle};
99
use tokio_stream::StreamExt;
1010
use tokio_util::sync::WaitForCancellationFutureOwned;
11-
use tracing::{debug, debug_span, error, instrument, trace, Instrument};
11+
use tracing::{debug, debug_span, error, trace, Instrument};
1212

1313
/// Default notification buffer size per task.
1414
pub const DEFAULT_NOTIFICATION_BUFFER_PER_CLIENT: usize = 16;
@@ -163,8 +163,7 @@ where
163163
/// and routes them to the router. For each request, a new task is spawned
164164
/// to handle the request, and given a sender to the [`WriteTask`]. This
165165
/// ensures that requests can be handled concurrently.
166-
#[instrument(name = "RouteTask", skip(self), fields(conn_id = self.conn_id))]
167-
pub async fn task_future(self, cancel: WaitForCancellationFutureOwned) {
166+
pub(crate) async fn task_future(self, cancel: WaitForCancellationFutureOwned) {
168167
let RouteTask {
169168
router,
170169
mut requests,
@@ -200,27 +199,24 @@ where
200199
// enforces the specification.
201200
let reqs = InboundData::try_from(item).unwrap_or_default();
202201

203-
let span = debug_span!("pubsub request handling", reqs = reqs.len());
204-
205-
let ctx =
206-
HandlerCtx::new(
207-
Some(write_task.clone()),
208-
children.clone(),
209-
);
210-
211-
let fut = router.handle_request_batch(ctx, reqs);
212-
let write_task = write_task.clone();
213-
214202
// Acquiring the permit before spawning the task means that
215203
// the write task can backpressure the route task. I.e.
216204
// if the client stops accepting responses, we do not keep
217205
// handling inbound requests.
218-
let Ok(permit) = write_task.reserve_owned().await else {
206+
let Ok(permit) = write_task.clone().reserve_owned().await else {
219207
tracing::error!("write task dropped while waiting for permit");
220208
break;
221209
};
222210

211+
let ctx =
212+
HandlerCtx::new(
213+
Some(write_task.clone()),
214+
children.clone(),
215+
);
216+
223217
// Run the future in a new task.
218+
let fut = router.handle_request_batch(ctx, reqs);
219+
224220
children.spawn_cancellable(
225221
async move {
226222
// Send the response to the write task.
@@ -232,7 +228,6 @@ where
232228
);
233229
}
234230
}
235-
.instrument(span)
236231
);
237232
}
238233
}
@@ -277,7 +272,6 @@ impl<T: Listener> WriteTask<T> {
277272
/// [`ServerShutdown`] struct.
278273
///
279274
/// [`ServerShutdown`]: crate::pubsub::ServerShutdown
280-
#[instrument(skip(self), fields(conn_id = self.conn_id))]
281275
pub(crate) async fn task_future(self) {
282276
let WriteTask {
283277
tasks,
@@ -299,8 +293,9 @@ impl<T: Listener> WriteTask<T> {
299293
tracing::error!("Json stream has closed");
300294
break;
301295
};
302-
if let Err(err) = connection.send_json(json).await {
303-
debug!(%err, "Failed to send json");
296+
let span = debug_span!("WriteTask", conn_id = self.conn_id);
297+
if let Err(err) = connection.send_json(json).instrument(span).await {
298+
debug!(%err, conn_id = self.conn_id, "Failed to send json");
304299
break;
305300
}
306301
}

src/router.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use core::fmt;
1010
use serde_json::value::RawValue;
1111
use std::{borrow::Cow, collections::BTreeMap, convert::Infallible, sync::Arc, task::Poll};
1212
use tower::Service;
13-
use tracing::{debug_span, trace};
13+
use tracing::debug_span;
1414

1515
/// A JSON-RPC router. This is the top-level type for handling JSON-RPC
1616
/// requests. It is heavily inspired by the [`axum::Router`] type.
@@ -301,9 +301,7 @@ where
301301
pub fn call_with_state(&self, args: HandlerArgs, state: S) -> RouteFuture {
302302
let id = args.req().id_owned();
303303
let method = args.req().method();
304-
305-
let span = debug_span!("Router::call_with_state", %method, ?id);
306-
trace!(params = args.req().params());
304+
let span = debug_span!(parent: None, "Router::call_with_state", %method, ?id);
307305

308306
self.inner.call_with_state(args, state).with_span(span)
309307
}
@@ -318,14 +316,19 @@ where
318316
let mut fut = BatchFuture::new_with_capacity(inbound.single(), inbound.len());
319317
// According to spec, non-parsable requests should still receive a
320318
// response.
321-
for req in inbound.iter() {
319+
let span = debug_span!(parent: None, "BatchFuture::poll", futs = fut.len());
320+
321+
for (batch_idx, req) in inbound.iter().enumerate() {
322322
let req = req.map(|req| {
323+
let span = debug_span!("RouteFuture::poll", batch_idx, method = req.method(), id = ?req.id());
323324
let args = HandlerArgs::new(ctx.clone(), req);
324-
self.call_with_state(args, state.clone())
325+
self.inner
326+
.call_with_state(args, state.clone())
327+
.with_span(span)
325328
});
326329
fut.push_parse_result(req);
327330
}
328-
fut
331+
fut.with_span(span)
329332
}
330333

331334
/// Nest this router into a new Axum router, with the specified path and the currently-running

src/routes/future.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,9 @@ pub struct BatchFuture {
118118
resps: Vec<Box<RawValue>>,
119119
/// Whether the batch was a single request.
120120
single: bool,
121+
122+
/// The span (if any).
123+
span: Option<tracing::Span>,
121124
}
122125

123126
impl fmt::Debug for BatchFuture {
@@ -136,6 +139,15 @@ impl BatchFuture {
136139
futs: BatchFutureInner::Prepping(Vec::with_capacity(capacity)),
137140
resps: Vec::with_capacity(capacity),
138141
single,
142+
span: None,
143+
}
144+
}
145+
146+
/// Set the span for the future.
147+
pub(crate) fn with_span(self, span: tracing::Span) -> Self {
148+
Self {
149+
span: Some(span),
150+
..self
139151
}
140152
}
141153

@@ -167,6 +179,11 @@ impl BatchFuture {
167179
Err(_) => self.push_parse_error(),
168180
}
169181
}
182+
183+
/// Get the number of futures in the batch.
184+
pub(crate) fn len(&self) -> usize {
185+
self.futs.len()
186+
}
170187
}
171188

172189
impl std::future::Future for BatchFuture {
@@ -185,6 +202,8 @@ impl std::future::Future for BatchFuture {
185202
}
186203

187204
let this = self.project();
205+
let _enter = this.span.as_ref().map(tracing::Span::enter);
206+
188207
let BatchFutureInnerProj::Running(mut futs) = this.futs.project() else {
189208
unreachable!()
190209
};

src/routes/handler.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::{
33
};
44
use serde_json::value::RawValue;
55
use std::{convert::Infallible, future::Future, marker::PhantomData, pin::Pin, task};
6+
use tracing::{debug_span, enabled, trace, Instrument, Level};
67

78
macro_rules! convert_result {
89
($res:expr) => {{
@@ -410,7 +411,27 @@ where
410411

411412
fn call(&mut self, args: HandlerArgs) -> Self::Future {
412413
let this = self.clone();
413-
Box::pin(async move { Ok(this.handler.call_with_state(args, this.state.clone()).await) })
414+
Box::pin(async move {
415+
let notifications_enabled = args.ctx.notifications_enabled();
416+
417+
let span = debug_span!(
418+
"HandlerService::call",
419+
notifications_enabled,
420+
params = tracing::field::Empty
421+
);
422+
if enabled!(Level::TRACE) {
423+
span.record("params", args.req.params());
424+
}
425+
426+
Ok(this
427+
.handler
428+
.call_with_state(args, this.state.clone())
429+
.instrument(span)
430+
.await
431+
.inspect(|res| {
432+
trace!(?res, "handler response");
433+
}))
434+
})
414435
}
415436
}
416437

src/routes/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::{
2020
task::{Context, Poll},
2121
};
2222
use tower::{util::BoxCloneSyncService, Service, ServiceExt};
23+
use tracing::{debug_span, enabled, Level};
2324

2425
use crate::types::Response;
2526

@@ -95,6 +96,14 @@ impl Service<HandlerArgs> for Route {
9596
}
9697

9798
fn call(&mut self, args: HandlerArgs) -> Self::Future {
99+
let span = debug_span!(
100+
"Route::call",
101+
notifications_enabled = args.ctx().notifications_enabled(),
102+
params = tracing::field::Empty,
103+
);
104+
if enabled!(Level::TRACE) {
105+
span.record("params", args.req.params());
106+
}
98107
self.oneshot_inner(args)
99108
}
100109
}

src/types/batch.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use bytes::Bytes;
33
use serde::Deserialize;
44
use serde_json::value::RawValue;
55
use std::ops::Range;
6+
use tracing::{debug, enabled, instrument, Level};
67

78
/// UTF-8, partially deserialized JSON-RPC request batch.
89
#[derive(Default)]
@@ -52,7 +53,13 @@ impl core::fmt::Debug for InboundData {
5253
impl TryFrom<Bytes> for InboundData {
5354
type Error = RequestError;
5455

56+
#[instrument(level = "debug", skip(bytes), fields(buf_len = bytes.len(), bytes = tracing::field::Empty))]
5557
fn try_from(bytes: Bytes) -> Result<Self, Self::Error> {
58+
if enabled!(Level::TRACE) {
59+
tracing::span::Span::current().record("bytes", &format!("0x{:x}", bytes));
60+
}
61+
debug!("Parsing inbound data");
62+
5663
// Special-case a single request, rejecting invalid JSON.
5764
if bytes.starts_with(b"{") {
5865
let rv: &RawValue = serde_json::from_slice(bytes.as_ref())?;

src/types/req.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ impl Request {
183183
pub fn deser_params<'a: 'de, 'de, T: serde::Deserialize<'de>>(
184184
&'a self,
185185
) -> serde_json::Result<T> {
186-
serde_json::from_str(self.params())
186+
serde_json::from_str(self.params()).inspect_err(|err| tracing::debug!(%err, expected = std::any::type_name::<T>(), "failed to parse params"))
187187
}
188188
}
189189

0 commit comments

Comments
 (0)