Skip to content

Commit f4e1667

Browse files
committed
extract span buffer to class, add serverSpanStreamingIntegration, use it in cloudflare, vercel edge and deno
1 parent af979ea commit f4e1667

File tree

13 files changed

+700
-120
lines changed

13 files changed

+700
-120
lines changed

packages/cloudflare/src/sdk.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
initAndBind,
99
linkedErrorsIntegration,
1010
requestDataIntegration,
11+
serverSpanStreamingIntegration,
1112
stackParserFromStackParserOptions,
1213
} from '@sentry/core';
1314
import type { CloudflareClientOptions, CloudflareOptions } from './client';
@@ -36,6 +37,7 @@ export function getDefaultIntegrations(options: CloudflareOptions): Integration[
3637
// TODO(v11): the `include` object should be defined directly in the integration based on `sendDefaultPii`
3738
requestDataIntegration(sendDefaultPii ? undefined : { include: { cookies: false } }),
3839
consoleIntegration(),
40+
...(options.traceLifecycle === 'stream' ? [serverSpanStreamingIntegration()] : []),
3941
];
4042
}
4143

packages/cloudflare/test/sdk.test.ts

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import * as SentryCore from '@sentry/core';
2-
import { beforeEach, describe, expect, test, vi } from 'vitest';
2+
import { beforeEach, describe, expect, it, test, vi } from 'vitest';
33
import { CloudflareClient } from '../src/client';
4-
import { init } from '../src/sdk';
4+
import { getDefaultIntegrations, init } from '../src/sdk';
55
import { resetSdk } from './testUtils';
66

77
describe('init', () => {
@@ -18,4 +18,65 @@ describe('init', () => {
1818
expect(client).toBeDefined();
1919
expect(client).toBeInstanceOf(CloudflareClient);
2020
});
21+
22+
describe('getDefaultIntegrations', () => {
23+
it('returns list of integrations with default options', () => {
24+
const integrations = getDefaultIntegrations({}).map(integration => integration.name);
25+
expect(integrations).toEqual([
26+
'Dedupe',
27+
'InboundFilters',
28+
'FunctionToString',
29+
'LinkedErrors',
30+
'Fetch',
31+
'Hono',
32+
'RequestData',
33+
'Console',
34+
]);
35+
});
36+
37+
it('adds dedupeIntegration if enableDedupe is true', () => {
38+
const integrations = getDefaultIntegrations({ enableDedupe: true }).map(integration => integration.name);
39+
expect(integrations).toEqual([
40+
'Dedupe',
41+
'InboundFilters',
42+
'FunctionToString',
43+
'LinkedErrors',
44+
'Fetch',
45+
'Hono',
46+
'RequestData',
47+
'Console',
48+
]);
49+
});
50+
51+
it('adds serverSpanStreamingIntegration if traceLifecycle is stream', () => {
52+
const integrations = getDefaultIntegrations({ traceLifecycle: 'stream' }).map(integration => integration.name);
53+
expect(integrations).toEqual([
54+
'Dedupe',
55+
'InboundFilters',
56+
'FunctionToString',
57+
'LinkedErrors',
58+
'Fetch',
59+
'Hono',
60+
'RequestData',
61+
'Console',
62+
'ServerSpanStreaming',
63+
]);
64+
});
65+
66+
it('intializes requestDataIntegration to not include cookies if sendDefaultPii is false', () => {
67+
const reqDataIntegrationSpy = vi.spyOn(SentryCore, 'requestDataIntegration');
68+
69+
getDefaultIntegrations({ sendDefaultPii: false }).map(integration => integration.name);
70+
71+
expect(reqDataIntegrationSpy).toHaveBeenCalledWith({ include: { cookies: false } });
72+
});
73+
74+
it('intializes requestDataIntegration to include cookies if sendDefaultPii is true', () => {
75+
const reqDataIntegrationSpy = vi.spyOn(SentryCore, 'requestDataIntegration');
76+
77+
getDefaultIntegrations({ sendDefaultPii: true }).map(integration => integration.name);
78+
79+
expect(reqDataIntegrationSpy).toHaveBeenCalledWith(undefined);
80+
});
81+
});
2182
});

packages/core/src/client.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1536,7 +1536,7 @@ function _validateBeforeSendResult(
15361536
/**
15371537
* Process the matching `beforeSendXXX` callback.
15381538
*/
1539-
// eslint-disable-next-line complexity
1539+
15401540
function processBeforeSend(
15411541
client: Client,
15421542
options: ClientOptions,

packages/core/src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ export {
9292
} from './utils/spanUtils';
9393
export { captureSpan } from './spans/captureSpan';
9494
export { safeSetSpanJSONAttributes } from './spans/spanFirstUtils';
95+
export { SpanBuffer, type SpanBufferOptions } from './spans/spanBuffer';
9596
export { _setSpanForScope as _INTERNAL_setSpanForScope } from './utils/spanOnScope';
9697
export { parseSampleRate } from './utils/parseSampleRate';
9798
export { applySdkMetadata } from './utils/sdkMetadata';
@@ -125,6 +126,7 @@ export { thirdPartyErrorFilterIntegration } from './integrations/third-party-err
125126
export { consoleIntegration } from './integrations/console';
126127
export { featureFlagsIntegration, type FeatureFlagsIntegration } from './integrations/featureFlags';
127128
export { growthbookIntegration } from './integrations/featureFlags';
129+
export { serverSpanStreamingIntegration } from './integrations/serverSpanStreaming';
128130

129131
export { profiler } from './profiling';
130132
// eslint thinks the entire function is deprecated (while only one overload is actually deprecated)
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import { DEBUG_BUILD } from '../debug-build';
2+
import { defineIntegration } from '../integration';
3+
import { captureSpan } from '../spans/captureSpan';
4+
import { SpanBuffer } from '../spans/spanBuffer';
5+
import type { IntegrationFn } from '../types-hoist/integration';
6+
import { isV2BeforeSendSpanCallback } from '../utils/beforeSendSpan';
7+
import { debug } from '../utils/debug-logger';
8+
9+
export interface ServerSpanStreamingOptions {
10+
/** Max spans per envelope batch (default: 1000) */
11+
maxSpanLimit?: number;
12+
/** Flush interval in ms (default: 5000) */
13+
flushInterval?: number;
14+
}
15+
16+
const INTEGRATION_NAME = 'ServerSpanStreaming';
17+
18+
const _serverSpanStreamingIntegration = ((options?: ServerSpanStreamingOptions) => {
19+
return {
20+
name: INTEGRATION_NAME,
21+
setup(client) {
22+
const clientOptions = client.getOptions();
23+
const beforeSendSpan = clientOptions.beforeSendSpan;
24+
25+
const initialMessage = 'serverSpanStreamingIntegration requires';
26+
const fallbackMsg = 'Falling back to static trace lifecycle.';
27+
28+
if (clientOptions.traceLifecycle !== 'stream') {
29+
client.getOptions().traceLifecycle = 'static';
30+
DEBUG_BUILD && debug.warn(`${initialMessage} \`traceLifecycle\` to be set to "stream"! ${fallbackMsg}`);
31+
return;
32+
}
33+
34+
if (beforeSendSpan && !isV2BeforeSendSpanCallback(beforeSendSpan)) {
35+
client.getOptions().traceLifecycle = 'static';
36+
DEBUG_BUILD &&
37+
debug.warn(`${initialMessage} a beforeSendSpan callback using \`withStreamSpan\`! ${fallbackMsg}`);
38+
return;
39+
}
40+
41+
const buffer = new SpanBuffer(client, options);
42+
43+
client.on('enqueueSpan', spanJSON => {
44+
buffer.addSpan(spanJSON);
45+
});
46+
47+
client.on('afterSpanEnd', span => {
48+
captureSpan(span, client);
49+
});
50+
},
51+
};
52+
}) satisfies IntegrationFn;
53+
54+
export const serverSpanStreamingIntegration = defineIntegration(_serverSpanStreamingIntegration);
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
import type { Client } from '../client';
2+
import { DEBUG_BUILD } from '../debug-build';
3+
import { createSpanV2Envelope } from '../envelope';
4+
import { getDynamicSamplingContextFromSpan } from '../tracing/dynamicSamplingContext';
5+
import type { SpanV2JSON, SpanV2JSONWithSegmentRef } from '../types-hoist/span';
6+
import { debug } from '../utils/debug-logger';
7+
8+
export interface SpanBufferOptions {
9+
/** Max spans per trace before auto-flush (default: 1000) */
10+
maxSpanLimit?: number;
11+
/** Flush interval in ms (default: 5000) */
12+
flushInterval?: number;
13+
}
14+
15+
/**
16+
* A buffer for span JSON objects that flushes them to Sentry in Span v2 envelopes.
17+
* Handles interval-based flushing, size thresholds, and graceful shutdown.
18+
*/
19+
export class SpanBuffer {
20+
private _spanTreeMap: Map<string, Set<SpanV2JSONWithSegmentRef>>;
21+
private _flushIntervalId: ReturnType<typeof setInterval> | null;
22+
private _client: Client;
23+
private _maxSpanLimit: number;
24+
private _flushInterval: number;
25+
26+
public constructor(client: Client, options?: SpanBufferOptions) {
27+
this._spanTreeMap = new Map();
28+
this._client = client;
29+
30+
const { maxSpanLimit, flushInterval } = options ?? {};
31+
32+
this._maxSpanLimit = maxSpanLimit && maxSpanLimit > 0 && maxSpanLimit <= 1000 ? maxSpanLimit : 1000;
33+
this._flushInterval = flushInterval && flushInterval > 0 ? flushInterval : 5_000;
34+
35+
this._flushIntervalId = setInterval(() => {
36+
this.flush();
37+
}, this._flushInterval);
38+
39+
this._client.on('flush', () => {
40+
this.flush();
41+
});
42+
}
43+
44+
/**
45+
* Add a span to the buffer.
46+
*/
47+
public addSpan(spanJSON: SpanV2JSONWithSegmentRef): void {
48+
const traceId = spanJSON.trace_id;
49+
let traceBucket = this._spanTreeMap.get(traceId);
50+
if (traceBucket) {
51+
traceBucket.add(spanJSON);
52+
} else {
53+
traceBucket = new Set([spanJSON]);
54+
this._spanTreeMap.set(traceId, traceBucket);
55+
}
56+
57+
if (traceBucket.size >= this._maxSpanLimit) {
58+
this._flushTrace(traceId);
59+
this._debounceFlushInterval();
60+
}
61+
}
62+
63+
/**
64+
* Flush all buffered traces.
65+
*/
66+
public flush(): void {
67+
if (!this._spanTreeMap.size) {
68+
return;
69+
}
70+
71+
DEBUG_BUILD && debug.log(`Flushing span tree map with ${this._spanTreeMap.size} traces`);
72+
73+
this._spanTreeMap.forEach((_, traceId) => {
74+
this._flushTrace(traceId);
75+
});
76+
this._debounceFlushInterval();
77+
}
78+
79+
private _flushTrace(traceId: string): void {
80+
const traceBucket = this._spanTreeMap.get(traceId);
81+
if (!traceBucket) {
82+
return;
83+
}
84+
85+
if (!traceBucket.size) {
86+
this._spanTreeMap.delete(traceId);
87+
return;
88+
}
89+
90+
const firstSpanJSON = traceBucket.values().next().value;
91+
92+
const segmentSpan = firstSpanJSON?._segmentSpan;
93+
if (!segmentSpan) {
94+
DEBUG_BUILD && debug.warn('No segment span reference found on span JSON, cannot compute DSC');
95+
this._spanTreeMap.delete(traceId);
96+
return;
97+
}
98+
99+
const dsc = getDynamicSamplingContextFromSpan(segmentSpan);
100+
101+
const cleanedSpans: SpanV2JSON[] = Array.from(traceBucket).map(spanJSON => {
102+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
103+
const { _segmentSpan, ...cleanSpanJSON } = spanJSON;
104+
return cleanSpanJSON;
105+
});
106+
107+
const envelope = createSpanV2Envelope(cleanedSpans, dsc, this._client);
108+
109+
DEBUG_BUILD && debug.log(`Sending span envelope for trace ${traceId} with ${cleanedSpans.length} spans`);
110+
111+
this._client.sendEnvelope(envelope).then(null, reason => {
112+
DEBUG_BUILD && debug.error('Error while sending span stream envelope:', reason);
113+
});
114+
115+
this._spanTreeMap.delete(traceId);
116+
}
117+
118+
private _debounceFlushInterval(): void {
119+
if (this._flushIntervalId) {
120+
clearInterval(this._flushIntervalId);
121+
}
122+
this._flushIntervalId = setInterval(() => {
123+
this.flush();
124+
}, this._flushInterval);
125+
}
126+
}

0 commit comments

Comments
 (0)