Skip to content

Commit 7a94a9e

Browse files
authored
tag atlas.eval.datapoints counters with upstream host (#1899)
The counters emitted by TimeGrouped and AggregatorSettings previously had no dimension identifying the upstream backend, so dropped and buffered counts from all hosts folded together. Thread the groupBy-host key from createStreamsFlow through createProcessorFlow into TimeGrouped so each counter now carries a host tag, and default AggregatorSettings host to "_" for non-host-scoped callers.
1 parent 037a2cb commit 7a94a9e

4 files changed

Lines changed: 40 additions & 15 deletions

File tree

atlas-eval/src/main/scala/com/netflix/atlas/eval/model/AggrDatapoint.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,19 +97,30 @@ object AggrDatapoint {
9797
* Limit for the number of intermediate datapoints.
9898
* @param registry
9999
* Registry used for reporting metrics related to the aggregation behavior.
100+
* @param host
101+
* Upstream host identifier used to tag metrics so dropped counts can be
102+
* attributed to a specific backend. Defaults to `"_"` for non-host-scoped
103+
* callers.
100104
*/
101105
case class AggregatorSettings(
102106
maxInputDatapoints: Int,
103107
maxIntermediateDatapoints: Int,
104-
registry: Registry
108+
registry: Registry,
109+
host: String = "_"
105110
) {
106111

107112
/**
108113
* Counter for tracking number of datapoints that are dropped due to exceeding the
109114
* configured limits.
110115
*/
111116
val droppedCounter: Counter =
112-
registry.counter("atlas.eval.datapoints", "id", "dropped-datapoints-limit-exceeded")
117+
registry.counter(
118+
"atlas.eval.datapoints",
119+
"id",
120+
"dropped-datapoints-limit-exceeded",
121+
"host",
122+
host
123+
)
113124
}
114125

115126
/**

atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvaluatorImpl.scala

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,8 @@ private[stream] abstract class EvaluatorImpl(
202202
}
203203

204204
protected def createPublisherImpl(uri: Uri): Publisher[JsonSupport] = {
205-
val ds = DataSources.of(new DataSource("_", uri.toString()))
205+
val dataSource = new DataSource("_", uri.toString())
206+
val ds = DataSources.of(dataSource)
206207

207208
val source = uri.scheme match {
208209
case s if s.startsWith("http") =>
@@ -217,7 +218,7 @@ private[stream] abstract class EvaluatorImpl(
217218

218219
val (logSrc, context) = createStreamContextSource
219220
source
220-
.via(createProcessorFlow(context))
221+
.via(createProcessorFlow(context, getHost(dataSource)))
221222
.via(new OnUpstreamFinish[MessageEnvelope](context.dsLogger.close()))
222223
.merge(logSrc, eagerComplete = false)
223224
.map(_.message)
@@ -242,8 +243,13 @@ private[stream] abstract class EvaluatorImpl(
242243
.via(new FillRemovedKeysWith[String, DataSources](_ => DataSources.empty()))
243244
.flatMapMerge(Int.MaxValue, dssMap => Source(dssMap.toList))
244245
.groupBy(Int.MaxValue, _._1, true) // groupBy host
245-
.map(_._2) // keep only DataSources
246-
.via(createProcessorFlow(context))
246+
.flatMapPrefix(1) { prefix =>
247+
val host = prefix.headOption.map(_._1).getOrElse("_")
248+
Flow[(String, DataSources)]
249+
.prepend(Source(prefix.toList))
250+
.map(_._2) // keep only DataSources
251+
.via(createProcessorFlow(context, host))
252+
}
247253
.mergeSubstreams
248254
.via(new OnUpstreamFinish[MessageEnvelope](context.dsLogger.close()))
249255
.merge(logSrc, eagerComplete = false)
@@ -263,7 +269,8 @@ private[stream] abstract class EvaluatorImpl(
263269
}
264270

265271
private[stream] def createProcessorFlow(
266-
context: StreamContext
272+
context: StreamContext,
273+
host: String
267274
): Flow[DataSources, MessageEnvelope, NotUsed] = {
268275

269276
val g = GraphDSL.create() { implicit builder =>
@@ -283,7 +290,7 @@ private[stream] abstract class EvaluatorImpl(
283290
Source(t.groupByStep)
284291
}
285292
.groupBy(Int.MaxValue, _.step, allowClosedSubstreamRecreation = true)
286-
.via(new TimeGrouped(context))
293+
.via(new TimeGrouped(context, host))
287294
.mergeSubstreams
288295
.via(context.monitorFlow("11_GroupedDatapoints"))
289296

atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/TimeGrouped.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,13 @@ import com.netflix.atlas.eval.model.TimeGroupsTuple
3737
*
3838
* @param context
3939
* Shared context for the evaluation stream.
40+
* @param host
41+
* Upstream host identifier used to tag metrics so activity can be attributed to a
42+
* specific backend.
4043
*/
4144
private[stream] class TimeGrouped(
42-
context: StreamContext
45+
context: StreamContext,
46+
host: String
4347
) extends GraphStage[FlowShape[DatapointsTuple, TimeGroupsTuple]] {
4448

4549
type AggrMap = java.util.HashMap[DataExpr, AggrDatapoint.Aggregator]
@@ -59,7 +63,8 @@ private[stream] class TimeGrouped(
5963
private val aggrSettings = AggrDatapoint.AggregatorSettings(
6064
maxInputDatapointsPerExpression,
6165
maxIntermediateDatapointsPerExpression,
62-
context.registry
66+
context.registry,
67+
host
6368
)
6469

6570
private val in = Inlet[DatapointsTuple]("TimeGrouped.in")
@@ -69,9 +74,9 @@ private[stream] class TimeGrouped(
6974

7075
private val metricName = "atlas.eval.datapoints"
7176
private val registry = context.registry
72-
private val droppedOld = registry.counter(metricName, "id", "dropped-old")
73-
private val droppedFuture = registry.counter(metricName, "id", "dropped-future")
74-
private val buffered = registry.counter(metricName, "id", "buffered")
77+
private val droppedOld = registry.counter(metricName, "id", "dropped-old", "host", host)
78+
private val droppedFuture = registry.counter(metricName, "id", "dropped-future", "host", host)
79+
private val buffered = registry.counter(metricName, "id", "buffered", "host", host)
7580
private val clock = registry.clock()
7681

7782
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {

atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/TimeGroupedSuite.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ class TimeGroupedSuite extends FunSuite {
4040

4141
private val context = TestContext.createContext(materializer, registry = registry)
4242

43+
private val host = "test"
44+
4345
private val step = 10
4446

4547
private def result(future: Future[List[TimeGroup]]): List[TimeGroup] = {
@@ -57,7 +59,7 @@ class TimeGroupedSuite extends FunSuite {
5759
private def run(data: List[AggrDatapoint]): List[TimeGroup] = {
5860
val future = Source
5961
.single(DatapointsTuple(data))
60-
.via(new TimeGrouped(context))
62+
.via(new TimeGrouped(context, host))
6163
.flatMapConcat(t => Source(t.groups))
6264
.runFold(List.empty[TimeGroup])((acc, g) => g :: acc)
6365
result(future)
@@ -121,7 +123,7 @@ class TimeGroupedSuite extends FunSuite {
121123
}
122124

123125
private def count(id: String): Long = {
124-
registry.counter("atlas.eval.datapoints", "id", id).count()
126+
registry.counter("atlas.eval.datapoints", "id", id, "host", host).count()
125127
}
126128

127129
private def counts: (Long, Long) = {

0 commit comments

Comments
 (0)