Skip to content
Open
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ For notes on migrating to 2.x / 0.200.x see [the upgrade guide](doc/upgrade-to-2

### :rocket: Features

* feat(sdk-trace): implement span processor metrics [#6504](https://github.com/open-telemetry/opentelemetry-js/pull/6504) @anuraaga

### :bug: Bug Fixes

### :books: Documentation
Expand Down
51 changes: 35 additions & 16 deletions experimental/packages/opentelemetry-sdk-node/src/sdk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ import {
ConsoleMetricExporter,
PeriodicExportingMetricReader,
} from '@opentelemetry/sdk-metrics';
import type { SpanProcessor } from '@opentelemetry/sdk-trace-base';
import type {
SpanExporter,
SpanProcessor,
} from '@opentelemetry/sdk-trace-base';
import { BatchSpanProcessor } from '@opentelemetry/sdk-trace-base';
import type { NodeTracerConfig } from '@opentelemetry/sdk-trace-node';
import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node';
Expand All @@ -65,7 +68,8 @@ import {

type TracerProviderConfig = {
tracerConfig: NodeTracerConfig;
spanProcessors: SpanProcessor[];
spanProcessors: SpanProcessor[] | undefined;
traceExporter: SpanExporter | undefined;
};

export type MeterProviderConfig = {
Expand Down Expand Up @@ -225,16 +229,16 @@ export class NodeSDK {
);
}

const spanProcessor =
configuration.spanProcessor ??
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
new BatchSpanProcessor(configuration.traceExporter!);

const spanProcessors = configuration.spanProcessors ?? [spanProcessor];
const spanProcessors =
configuration.spanProcessors ??
(configuration.spanProcessor
? [configuration.spanProcessor]
: undefined);

this._tracerProviderConfig = {
tracerConfig: tracerProviderConfig,
spanProcessors,
traceExporter: configuration.traceExporter,
};
}

Expand Down Expand Up @@ -334,17 +338,32 @@ export class NodeSDK {
}
}

const spanProcessors = this._tracerProviderConfig
? this._tracerProviderConfig.spanProcessors
: getSpanProcessorsFromEnv();
// While SDK metrics are unstable, we require an opt-in.
// https://opentelemetry.io/docs/specs/semconv/otel/sdk-metrics/
const sdkMetricsEnabled = getBooleanFromEnv(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, there is a merge issue here now, because #6433 went in.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup fixed it - it seems the code was slightly different enough in the PRs to automerge unexpectedly 😂

'OTEL_NODE_EXPERIMENTAL_SDK_METRICS'
);

let spanProcessors: SpanProcessor[];
if (this._tracerProviderConfig) {
// If tracerProviderConfig is set, either spanProcessors or traceExporter is set.
if (this._tracerProviderConfig.spanProcessors) {
spanProcessors = this._tracerProviderConfig.spanProcessors;
} else {
spanProcessors = [
new BatchSpanProcessor(this._tracerProviderConfig.traceExporter!, {
meterProvider: sdkMetricsEnabled ? this._meterProvider : undefined,
}),
];
}
} else {
spanProcessors = getSpanProcessorsFromEnv(
sdkMetricsEnabled ? this._meterProvider : undefined
);
}

// Only register if there is a span processor
if (spanProcessors.length > 0) {
// While SDK metrics are unstable, we require an opt-in.
// https://opentelemetry.io/docs/specs/semconv/otel/sdk-metrics/
const sdkMetricsEnabled = getBooleanFromEnv(
'OTEL_NODE_EXPERIMENTAL_SDK_METRICS'
);
this._tracerProvider = new NodeTracerProvider({
...this._configuration,
resource: this._resource,
Expand Down
14 changes: 10 additions & 4 deletions experimental/packages/opentelemetry-sdk-node/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
* SPDX-License-Identifier: Apache-2.0
*/

import type { ContextManager, TextMapPropagator } from '@opentelemetry/api';
import type {
ContextManager,
MeterProvider,
TextMapPropagator,
} from '@opentelemetry/api';
import { context, diag, propagation } from '@opentelemetry/api';
import {
CompositePropagator,
Expand Down Expand Up @@ -177,7 +181,9 @@ function getOtlpExporterFromEnv(): SpanExporter {
}
}

export function getSpanProcessorsFromEnv(): SpanProcessor[] {
export function getSpanProcessorsFromEnv(
meterProvider: MeterProvider | undefined
): SpanProcessor[] {
const exportersMap = new Map<string, () => SpanExporter>([
['otlp', () => getOtlpExporterFromEnv()],
['zipkin', () => new ZipkinExporter()],
Expand Down Expand Up @@ -220,9 +226,9 @@ export function getSpanProcessorsFromEnv(): SpanProcessor[] {

for (const exp of exporters) {
if (exp instanceof ConsoleSpanExporter) {
processors.push(new SimpleSpanProcessor(exp));
processors.push(new SimpleSpanProcessor(exp, { meterProvider }));
} else {
processors.push(new BatchSpanProcessor(exp));
processors.push(new BatchSpanProcessor(exp, { meterProvider }));
}
}

Expand Down
17 changes: 14 additions & 3 deletions experimental/packages/opentelemetry-sdk-node/test/sdk.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ import { OTLPTraceExporter as OTLPGrpcTraceExporter } from '@opentelemetry/expor
import { ZipkinExporter } from '@opentelemetry/exporter-zipkin';

import { ATTR_HOST_NAME, ATTR_PROCESS_PID } from '../src/semconv';
import { NOOP_COUNTER_METRIC } from '../../../../api/src/metrics/NoopMeter';

function assertDefaultContextManagerRegistered() {
assert.ok(
Expand Down Expand Up @@ -408,7 +409,7 @@ describe('Node SDK', () => {
await sdk.shutdown();
});

it('should register a meter provider to the tracer provider if both initialized and metrics enabled', async () => {
it('should configure components for SDK metrics if enabled', async () => {
process.env.OTEL_NODE_EXPERIMENTAL_SDK_METRICS = 'true';
const exporter = new ConsoleMetricExporter();
const metricReader = new PeriodicExportingMetricReader({
Expand All @@ -418,7 +419,7 @@ describe('Node SDK', () => {
});

const sdk = new NodeSDK({
metricReader: metricReader,
metricReaders: [metricReader],
traceExporter: new ConsoleSpanExporter(),
autoDetectResources: false,
});
Expand All @@ -434,13 +435,18 @@ describe('Node SDK', () => {
assert.ok(
(tracerProvider as any)._config.meterProvider instanceof MeterProvider
);
assert.notDeepEqual(
(tracerProvider as any)._activeSpanProcessor._spanProcessors[0]._metrics
.processedSpans,
NOOP_COUNTER_METRIC
);

assert.ok(metrics.getMeterProvider() instanceof MeterProvider);

await sdk.shutdown();
});

it('should not register a meter provider to the tracer provider if both initialized but metrics disabled', async () => {
it('should not configure components for SDK metrics if disabled', async () => {
const exporter = new ConsoleMetricExporter();
const metricReader = new PeriodicExportingMetricReader({
exporter: exporter,
Expand All @@ -463,6 +469,11 @@ describe('Node SDK', () => {
const tracerProvider = setGlobalTracerProviderSpy.lastCall.args[0];
assert.ok(tracerProvider instanceof NodeTracerProvider);
assert.equal((tracerProvider as any)._config.meterProvider, undefined);
assert.deepEqual(
(tracerProvider as any)._activeSpanProcessor._spanProcessors[0]._metrics
.processedSpans,
NOOP_COUNTER_METRIC
);

assert.ok(metrics.getMeterProvider() instanceof MeterProvider);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
*/

import type { Context } from '@opentelemetry/api';
import { context, diag, TraceFlags } from '@opentelemetry/api';
import { context, createNoopMeter, diag, TraceFlags } from '@opentelemetry/api';
import {
BindOnceFuture,
ExportResultCode,
Expand All @@ -14,9 +14,12 @@ import {
} from '@opentelemetry/core';
import type { Span } from '../Span';
import type { SpanProcessor } from '../SpanProcessor';
import type { SpanProcessorConfig } from './SpanProcessorConfig';
import type { BufferConfig } from '../types';
import type { ReadableSpan } from './ReadableSpan';
import type { SpanExporter } from './SpanExporter';
import { SpanProcessorMetrics } from './SpanProcessorMetrics';
import { OTEL_COMPONENT_TYPE_VALUE_BATCHING_SPAN_PROCESSOR } from '../semconv';

/**
* Implementation of the {@link SpanProcessor} that batches spans exported by
Expand All @@ -30,14 +33,15 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig>
private readonly _scheduledDelayMillis: number;
private readonly _exportTimeoutMillis: number;
private readonly _exporter: SpanExporter;
private readonly _metrics: SpanProcessorMetrics;

private _isExporting = false;
private _finishedSpans: ReadableSpan[] = [];
private _timer: NodeJS.Timeout | number | undefined;
private _shutdownOnce: BindOnceFuture<void>;
private _droppedSpansCount: number = 0;

constructor(exporter: SpanExporter, config?: T) {
constructor(exporter: SpanExporter, config?: T & SpanProcessorConfig) {
this._exporter = exporter;
this._maxExportBatchSize =
typeof config?.maxExportBatchSize === 'number'
Expand All @@ -64,6 +68,18 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig>
);
this._maxExportBatchSize = this._maxQueueSize;
}

const meter = config?.meterProvider
? config.meterProvider.getMeter('@opentelemetry/sdk-trace')
: createNoopMeter();
Comment thread
trentm marked this conversation as resolved.
this._metrics = new SpanProcessorMetrics(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the SpanProcessorMetrics constructor, a callback is added to queueSize using meter.createObservableUpDownCounter().addCallback(). Looks like this callback is never removed. Since the callback captures this (to call getQueueSize), it prevents the SpanProcessor and its metrics from being garbage collected even after shutdown.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh thanks - I can add explicit cleanup. But just curious, shouldn't the GC still be handle cycles like that or is there something else at play?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a cycle issue. The problem is that the Meter lives longer than the span processor and holds onto the callback. Since the callback references the processor, the processor can never be garbage collected, even after shutdown(). Removing the callback in shutdown() should fix it.

OTEL_COMPONENT_TYPE_VALUE_BATCHING_SPAN_PROCESSOR,
meter,
{
capacity: this._maxQueueSize,
getQueueSize: () => this._finishedSpans.length,
}
);
}

forceFlush(): Promise<void> {
Expand Down Expand Up @@ -101,6 +117,7 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig>
return this._flushAll();
})
.then(() => {
this._metrics.shutdown();
return this._exporter.shutdown();
});
}
Expand All @@ -114,6 +131,7 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig>
diag.debug('maxQueueSize reached, dropping spans');
}
this._droppedSpansCount++;
this._metrics.dropSpans(1);

return;
}
Expand Down Expand Up @@ -179,6 +197,7 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig>
const doExport = () =>
this._exporter.export(spans, result => {
clearTimeout(timer);
this._metrics.finishSpans(spans.length, result.error);
if (result.code === ExportResultCode.SUCCESS) {
resolve();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
*/

import type { Context } from '@opentelemetry/api';
import { TraceFlags } from '@opentelemetry/api';
import { createNoopMeter, TraceFlags } from '@opentelemetry/api';
import {
internal,
ExportResultCode,
Expand All @@ -13,8 +13,11 @@ import {
} from '@opentelemetry/core';
import type { Span } from '../Span';
import type { SpanProcessor } from '../SpanProcessor';
import type { SpanProcessorConfig } from './SpanProcessorConfig';
import type { ReadableSpan } from './ReadableSpan';
import type { SpanExporter } from './SpanExporter';
import { SpanProcessorMetrics } from './SpanProcessorMetrics';
import { OTEL_COMPONENT_TYPE_VALUE_SIMPLE_SPAN_PROCESSOR } from '../semconv';

/**
* An implementation of the {@link SpanProcessor} that converts the {@link Span}
Expand All @@ -26,13 +29,22 @@ import type { SpanExporter } from './SpanExporter';
*/
export class SimpleSpanProcessor implements SpanProcessor {
private readonly _exporter: SpanExporter;
private readonly _metrics: SpanProcessorMetrics;
private _shutdownOnce: BindOnceFuture<void>;
private _pendingExports: Set<Promise<void>>;

constructor(exporter: SpanExporter) {
constructor(exporter: SpanExporter, config?: SpanProcessorConfig) {
this._exporter = exporter;
this._shutdownOnce = new BindOnceFuture(this._shutdown, this);
this._pendingExports = new Set<Promise<void>>();

const meter = config?.meterProvider
? config.meterProvider.getMeter('@opentelemetry/sdk-trace')
: createNoopMeter();
this._metrics = new SpanProcessorMetrics(
OTEL_COMPONENT_TYPE_VALUE_SIMPLE_SPAN_PROCESSOR,
meter
);
}

async forceFlush(): Promise<void> {
Expand Down Expand Up @@ -70,6 +82,7 @@ export class SimpleSpanProcessor implements SpanProcessor {
}

const result = await internal._export(this._exporter, [span]);
this._metrics.finishSpans(1, result.error);
if (result.code !== ExportResultCode.SUCCESS) {
throw (
result.error ??
Expand All @@ -83,6 +96,7 @@ export class SimpleSpanProcessor implements SpanProcessor {
}

private _shutdown(): Promise<void> {
this._metrics.shutdown();
return this._exporter.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

import type { MeterProvider } from '@opentelemetry/api';

/**
* Common options for SDK span processors.
*/
export interface SpanProcessorConfig {
meterProvider?: MeterProvider;
}
Loading
Loading