Skip to content
Open
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ For notes on migrating to 2.x / 0.200.x see [the upgrade guide](doc/upgrade-to-2

### :rocket: Features

* feat(sdk-logs): implement log creation metrics [#6433](https://github.com/open-telemetry/opentelemetry-js/pull/6433) @anuraaga
* feat(sdk-trace): implement span processor metrics [#6504](https://github.com/open-telemetry/opentelemetry-js/pull/6504) @anuraaga
* feat(sdk-logs): implement log creation metrics [#6433](https://github.com/open-telemetry/opentelemetry-js/pull/6433) @anuraaga

### :bug: Bug Fixes

Expand Down
40 changes: 29 additions & 11 deletions experimental/packages/opentelemetry-sdk-node/src/sdk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ import {
ConsoleMetricExporter,
PeriodicExportingMetricReader,
} from '@opentelemetry/sdk-metrics';
import type { SpanProcessor } from '@opentelemetry/sdk-trace-base';
import type {
SpanExporter,
SpanProcessor,
} from '@opentelemetry/sdk-trace-base';
import { BatchSpanProcessor } from '@opentelemetry/sdk-trace-base';
import type { NodeTracerConfig } from '@opentelemetry/sdk-trace-node';
import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node';
Expand All @@ -65,7 +68,8 @@ import {

type TracerProviderConfig = {
tracerConfig: NodeTracerConfig;
spanProcessors: SpanProcessor[];
spanProcessors: SpanProcessor[] | undefined;
traceExporter: SpanExporter | undefined;
};

export type MeterProviderConfig = {
Expand Down Expand Up @@ -225,16 +229,16 @@ export class NodeSDK {
);
}

const spanProcessor =
configuration.spanProcessor ??
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
new BatchSpanProcessor(configuration.traceExporter!);

const spanProcessors = configuration.spanProcessors ?? [spanProcessor];
const spanProcessors =
configuration.spanProcessors ??
(configuration.spanProcessor
? [configuration.spanProcessor]
: undefined);

this._tracerProviderConfig = {
tracerConfig: tracerProviderConfig,
spanProcessors,
traceExporter: configuration.traceExporter,
};
}

Expand Down Expand Up @@ -340,9 +344,23 @@ export class NodeSDK {
}
}

const spanProcessors = this._tracerProviderConfig
? this._tracerProviderConfig.spanProcessors
: getSpanProcessorsFromEnv();
let spanProcessors: SpanProcessor[];
if (this._tracerProviderConfig) {
// If tracerProviderConfig is set, either spanProcessors or traceExporter is set.
if (this._tracerProviderConfig.spanProcessors) {
spanProcessors = this._tracerProviderConfig.spanProcessors;
} else {
spanProcessors = [
new BatchSpanProcessor(this._tracerProviderConfig.traceExporter!, {
meterProvider: sdkMetricsEnabled ? this._meterProvider : undefined,
}),
];
}
} else {
spanProcessors = getSpanProcessorsFromEnv(
sdkMetricsEnabled ? this._meterProvider : undefined
);
}

// Only register if there is a span processor
if (spanProcessors.length > 0) {
Expand Down
14 changes: 10 additions & 4 deletions experimental/packages/opentelemetry-sdk-node/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
* SPDX-License-Identifier: Apache-2.0
*/

import type { ContextManager, TextMapPropagator } from '@opentelemetry/api';
import type {
ContextManager,
MeterProvider,
TextMapPropagator,
} from '@opentelemetry/api';
import { context, diag, propagation } from '@opentelemetry/api';
import {
CompositePropagator,
Expand Down Expand Up @@ -177,7 +181,9 @@ function getOtlpExporterFromEnv(): SpanExporter {
}
}

export function getSpanProcessorsFromEnv(): SpanProcessor[] {
export function getSpanProcessorsFromEnv(
meterProvider: MeterProvider | undefined
): SpanProcessor[] {
const exportersMap = new Map<string, () => SpanExporter>([
['otlp', () => getOtlpExporterFromEnv()],
['zipkin', () => new ZipkinExporter()],
Expand Down Expand Up @@ -220,9 +226,9 @@ export function getSpanProcessorsFromEnv(): SpanProcessor[] {

for (const exp of exporters) {
if (exp instanceof ConsoleSpanExporter) {
processors.push(new SimpleSpanProcessor(exp));
processors.push(new SimpleSpanProcessor(exp, { meterProvider }));
} else {
processors.push(new BatchSpanProcessor(exp));
processors.push(new BatchSpanProcessor(exp, { meterProvider }));
}
}

Expand Down
12 changes: 11 additions & 1 deletion experimental/packages/opentelemetry-sdk-node/test/sdk.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ describe('Node SDK', () => {
});

const sdk = new NodeSDK({
metricReader: metricReader,
metricReaders: [metricReader],
traceExporter: new ConsoleSpanExporter(),
logRecordProcessors: [
new SimpleLogRecordProcessor(new InMemoryLogRecordExporter()),
Expand All @@ -438,6 +438,11 @@ describe('Node SDK', () => {
assert.ok(
(tracerProvider as any)._config.meterProvider instanceof MeterProvider
);
assert.notDeepEqual(
(tracerProvider as any)._activeSpanProcessor._spanProcessors[0]._metrics
.processedSpans,
NOOP_COUNTER_METRIC
);

const loggerProvider = setGlobalLoggerProviderSpy.lastCall.args[0];
assert.notDeepEqual(
Expand Down Expand Up @@ -476,6 +481,11 @@ describe('Node SDK', () => {
const tracerProvider = setGlobalTracerProviderSpy.lastCall.args[0];
assert.ok(tracerProvider instanceof NodeTracerProvider);
assert.equal((tracerProvider as any)._config.meterProvider, undefined);
assert.deepEqual(
(tracerProvider as any)._activeSpanProcessor._spanProcessors[0]._metrics
.processedSpans,
NOOP_COUNTER_METRIC
);

const loggerProvider = setGlobalLoggerProviderSpy.lastCall.args[0];
assert.deepEqual(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
*/

import type { Context } from '@opentelemetry/api';
import { context, diag, TraceFlags } from '@opentelemetry/api';
import { context, createNoopMeter, diag, TraceFlags } from '@opentelemetry/api';
import {
BindOnceFuture,
ExportResultCode,
Expand All @@ -14,9 +14,12 @@ import {
} from '@opentelemetry/core';
import type { Span } from '../Span';
import type { SpanProcessor } from '../SpanProcessor';
import type { SpanProcessorConfig } from './SpanProcessorConfig';
import type { BufferConfig } from '../types';
import type { ReadableSpan } from './ReadableSpan';
import type { SpanExporter } from './SpanExporter';
import { SpanProcessorMetrics } from './SpanProcessorMetrics';
import { OTEL_COMPONENT_TYPE_VALUE_BATCHING_SPAN_PROCESSOR } from '../semconv';

/**
* Implementation of the {@link SpanProcessor} that batches spans exported by
Expand All @@ -30,14 +33,15 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig>
private readonly _scheduledDelayMillis: number;
private readonly _exportTimeoutMillis: number;
private readonly _exporter: SpanExporter;
private readonly _metrics: SpanProcessorMetrics;

private _isExporting = false;
private _finishedSpans: ReadableSpan[] = [];
private _timer: NodeJS.Timeout | number | undefined;
private _shutdownOnce: BindOnceFuture<void>;
private _droppedSpansCount: number = 0;

constructor(exporter: SpanExporter, config?: T) {
constructor(exporter: SpanExporter, config?: T & SpanProcessorConfig) {
this._exporter = exporter;
this._maxExportBatchSize =
typeof config?.maxExportBatchSize === 'number'
Expand All @@ -64,6 +68,18 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig>
);
this._maxExportBatchSize = this._maxQueueSize;
}

const meter = config?.meterProvider
? config.meterProvider.getMeter('@opentelemetry/sdk-trace')
: createNoopMeter();
Comment thread
trentm marked this conversation as resolved.
this._metrics = new SpanProcessorMetrics(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the SpanProcessorMetrics constructor, a callback is added to queueSize using meter.createObservableUpDownCounter().addCallback(). Looks like this callback is never removed. Since the callback captures this (to call getQueueSize), it prevents the SpanProcessor and its metrics from being garbage collected even after shutdown.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh thanks - I can add explicit cleanup. But just curious, shouldn't the GC still be handle cycles like that or is there something else at play?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a cycle issue. The problem is that the Meter lives longer than the span processor and holds onto the callback. Since the callback references the processor, the processor can never be garbage collected, even after shutdown(). Removing the callback in shutdown() should fix it.

OTEL_COMPONENT_TYPE_VALUE_BATCHING_SPAN_PROCESSOR,
meter,
{
capacity: this._maxQueueSize,
getQueueSize: () => this._finishedSpans.length,
}
);
}

forceFlush(): Promise<void> {
Expand Down Expand Up @@ -101,6 +117,7 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig>
return this._flushAll();
})
.then(() => {
this._metrics.shutdown();
return this._exporter.shutdown();
});
}
Expand All @@ -114,6 +131,7 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig>
diag.debug('maxQueueSize reached, dropping spans');
}
this._droppedSpansCount++;
this._metrics.dropSpans(1);

return;
}
Expand Down Expand Up @@ -179,6 +197,7 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig>
const doExport = () =>
this._exporter.export(spans, result => {
clearTimeout(timer);
this._metrics.finishSpans(spans.length, result.error);
if (result.code === ExportResultCode.SUCCESS) {
resolve();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
*/

import type { Context } from '@opentelemetry/api';
import { TraceFlags } from '@opentelemetry/api';
import { createNoopMeter, TraceFlags } from '@opentelemetry/api';
import {
internal,
ExportResultCode,
Expand All @@ -13,8 +13,11 @@ import {
} from '@opentelemetry/core';
import type { Span } from '../Span';
import type { SpanProcessor } from '../SpanProcessor';
import type { SpanProcessorConfig } from './SpanProcessorConfig';
import type { ReadableSpan } from './ReadableSpan';
import type { SpanExporter } from './SpanExporter';
import { SpanProcessorMetrics } from './SpanProcessorMetrics';
import { OTEL_COMPONENT_TYPE_VALUE_SIMPLE_SPAN_PROCESSOR } from '../semconv';

/**
* An implementation of the {@link SpanProcessor} that converts the {@link Span}
Expand All @@ -26,13 +29,22 @@ import type { SpanExporter } from './SpanExporter';
*/
export class SimpleSpanProcessor implements SpanProcessor {
private readonly _exporter: SpanExporter;
private readonly _metrics: SpanProcessorMetrics;
private _shutdownOnce: BindOnceFuture<void>;
private _pendingExports: Set<Promise<void>>;

constructor(exporter: SpanExporter) {
constructor(exporter: SpanExporter, config?: SpanProcessorConfig) {
this._exporter = exporter;
this._shutdownOnce = new BindOnceFuture(this._shutdown, this);
this._pendingExports = new Set<Promise<void>>();

const meter = config?.meterProvider
? config.meterProvider.getMeter('@opentelemetry/sdk-trace')
: createNoopMeter();
this._metrics = new SpanProcessorMetrics(
OTEL_COMPONENT_TYPE_VALUE_SIMPLE_SPAN_PROCESSOR,
meter
);
}

async forceFlush(): Promise<void> {
Expand Down Expand Up @@ -70,6 +82,7 @@ export class SimpleSpanProcessor implements SpanProcessor {
}

const result = await internal._export(this._exporter, [span]);
this._metrics.finishSpans(1, result.error);
if (result.code !== ExportResultCode.SUCCESS) {
throw (
result.error ??
Expand All @@ -83,6 +96,7 @@ export class SimpleSpanProcessor implements SpanProcessor {
}

private _shutdown(): Promise<void> {
this._metrics.shutdown();
return this._exporter.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

import type { MeterProvider } from '@opentelemetry/api';

/**
* Common options for SDK span processors.
*/
export interface SpanProcessorConfig {
meterProvider?: MeterProvider;
}
Loading
Loading