Skip to content

Commit 700f2d8

Browse files
committed
Introduce labels to metrics and add them to v2 metrics endpoint.
Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com>
1 parent 89ef52f commit 700f2d8

8 files changed

Lines changed: 154 additions & 117 deletions

File tree

api/src/main/java/marquez/MarquezApp.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,10 @@
3535
import marquez.common.Utils;
3636
import marquez.db.DbMigration;
3737
import marquez.jobs.DbRetentionJob;
38+
import marquez.logging.DelegatingSqlLogger;
39+
import marquez.logging.LabelledSqlLogger;
3840
import marquez.logging.LoggingMdcFilter;
39-
import marquez.logging.MarquezMetricNameStrategy;
41+
import marquez.service.SqlMetrics;
4042
import marquez.tracing.SentryConfig;
4143
import marquez.tracing.TracingContainerResponseFilter;
4244
import marquez.tracing.TracingSQLLogger;
@@ -58,7 +60,9 @@ public final class MarquezApp extends Application<MarquezConfig> {
5860

5961
// Monitoring
6062
private static final String PROMETHEUS = "prometheus";
63+
private static final String PROMETHEUS_V2 = "prometheus_v2";
6164
private static final String PROMETHEUS_ENDPOINT = "/metrics";
65+
private static final String PROMETHEUS_ENDPOINT_V2 = "/metrics/v2";
6266

6367
public static void main(final String[] args) throws Exception {
6468
new MarquezApp().run(args);
@@ -74,7 +78,9 @@ public void initialize(@NonNull Bootstrap<MarquezConfig> bootstrap) {
7478
// Enable metric collection for prometheus.
7579
CollectorRegistry.defaultRegistry.register(
7680
new DropwizardExports(bootstrap.getMetricRegistry()));
81+
SqlMetrics.registry.register(new DropwizardExports(bootstrap.getMetricRegistry()));
7782
DefaultExports.initialize(); // Add metrics for CPU, JVM memory, etc.
83+
DefaultExports.register(SqlMetrics.registry);
7884

7985
// Enable variable substitution with environment variables.
8086
bootstrap.setConfigurationSourceProvider(
@@ -163,7 +169,8 @@ private Jdbi newJdbi(
163169
.installPlugin(new SqlObjectPlugin())
164170
.installPlugin(new PostgresPlugin())
165171
.installPlugin(new Jackson2Plugin());
166-
SqlLogger sqlLogger = new InstrumentedSqlLogger(env.metrics(), new MarquezMetricNameStrategy());
172+
SqlLogger sqlLogger =
173+
new DelegatingSqlLogger(new LabelledSqlLogger(), new InstrumentedSqlLogger(env.metrics()));
167174
if (isSentryEnabled(config)) {
168175
sqlLogger = new TracingSQLLogger(sqlLogger);
169176
}
@@ -198,6 +205,9 @@ private void registerServlets(@NonNull Environment env) {
198205

199206
// Expose metrics for monitoring.
200207
env.servlets().addServlet(PROMETHEUS, new MetricsServlet()).addMapping(PROMETHEUS_ENDPOINT);
208+
env.servlets()
209+
.addServlet(PROMETHEUS_V2, new MetricsServlet(SqlMetrics.registry))
210+
.addMapping(PROMETHEUS_ENDPOINT_V2);
201211
}
202212

203213
private void registerFilters(@NonNull Environment env, MarquezContext marquezContext) {
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2018-2023 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package marquez.logging;
7+
8+
import java.sql.SQLException;
9+
import java.util.Arrays;
10+
import java.util.List;
11+
import org.jdbi.v3.core.statement.SqlLogger;
12+
import org.jdbi.v3.core.statement.StatementContext;
13+
14+
/** A {@link SqlLogger} implementation that delegates to multiple {@link SqlLogger}s. */
15+
public class DelegatingSqlLogger implements SqlLogger {
16+
private final List<SqlLogger> sqlLoggers;
17+
18+
public DelegatingSqlLogger(SqlLogger... sqlLoggers) {
19+
this.sqlLoggers = Arrays.asList(sqlLoggers);
20+
}
21+
22+
@Override
23+
public void logAfterExecution(StatementContext statementContext) {
24+
for (SqlLogger sqlLogger : sqlLoggers) {
25+
sqlLogger.logAfterExecution(statementContext);
26+
}
27+
}
28+
29+
@Override
30+
public void logException(StatementContext statementContext, SQLException ex) {
31+
for (SqlLogger sqlLogger : sqlLoggers) {
32+
sqlLogger.logException(statementContext, ex);
33+
}
34+
}
35+
}

api/src/main/java/marquez/logging/EndpointNameStrategy.java

Lines changed: 0 additions & 33 deletions
This file was deleted.
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright 2018-2023 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package marquez.logging;
7+
8+
import java.sql.SQLException;
9+
import java.time.temporal.ChronoUnit;
10+
import marquez.service.SqlMetrics;
11+
import org.jdbi.v3.core.extension.ExtensionMethod;
12+
import org.jdbi.v3.core.statement.SqlLogger;
13+
import org.jdbi.v3.core.statement.StatementContext;
14+
import org.slf4j.MDC;
15+
16+
/**
17+
* A {@link SqlLogger} implementation for JDBI which uses the SQL objects' class names and method
18+
* names for nanosecond-precision timers.
19+
*/
20+
public class LabelledSqlLogger implements SqlLogger {
21+
22+
@Override
23+
public void logAfterExecution(StatementContext context) {
24+
log(context);
25+
}
26+
27+
@Override
28+
public void logException(StatementContext context, SQLException ex) {
29+
log(context);
30+
}
31+
32+
private void log(StatementContext context) {
33+
ExtensionMethod extensionMethod = context.getExtensionMethod();
34+
if (extensionMethod != null) {
35+
final long elapsed = context.getElapsedTime(ChronoUnit.NANOS);
36+
if (MDC.get("method") != null && MDC.get("pathWithParams") != null) {
37+
SqlMetrics.duration
38+
.labels(
39+
extensionMethod.getType().getName(),
40+
extensionMethod.getMethod().getName(),
41+
MDC.get("method"),
42+
MDC.get("pathWithParams"))
43+
.observe(elapsed / 1e9);
44+
}
45+
}
46+
}
47+
}

api/src/main/java/marquez/logging/MarquezMetricNameStrategy.java

Lines changed: 0 additions & 20 deletions
This file was deleted.
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2018-2023 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package marquez.service;
7+
8+
import io.prometheus.client.CollectorRegistry;
9+
import io.prometheus.client.Histogram;
10+
11+
public class SqlMetrics {
12+
public static final CollectorRegistry registry = new io.prometheus.client.CollectorRegistry();
13+
14+
public static final Histogram duration =
15+
Histogram.build()
16+
.namespace("marquez")
17+
.labelNames("object_name", "method_name", "endpoint_method", "endpoint_path")
18+
.name("sql_duration_seconds")
19+
.help("SQL execution duration in seconds")
20+
.register(registry);
21+
22+
public static void emitSqlDurationMetrics(
23+
String objectName,
24+
String methodName,
25+
String endpointMethod,
26+
String endpointPath,
27+
double duration) {
28+
SqlMetrics.duration
29+
.labels(objectName, methodName, endpointMethod, endpointPath)
30+
.observe(duration);
31+
}
32+
}

api/src/test/java/marquez/BaseIntegrationTest.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -215,10 +215,14 @@ protected CompletableFuture<HttpResponse<String>> sendLineage(String body) {
215215

216216
protected CompletableFuture<HttpResponse<String>> getMetrics() {
217217
HttpRequest request =
218-
HttpRequest.newBuilder()
219-
.uri(URI.create(baseUrl + "/metrics"))
220-
.GET()
221-
.build();
218+
HttpRequest.newBuilder().uri(URI.create(baseUrl + "/metrics")).GET().build();
219+
220+
return http2.sendAsync(request, BodyHandlers.ofString());
221+
}
222+
223+
protected CompletableFuture<HttpResponse<String>> getMetricsV2() {
224+
HttpRequest request =
225+
HttpRequest.newBuilder().uri(URI.create(baseUrl + "/metrics/v2")).GET().build();
222226

223227
return http2.sendAsync(request, BodyHandlers.ofString());
224228
}

api/src/test/java/marquez/MetricsIntegrationTest.java

Lines changed: 20 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -5,76 +5,20 @@
55

66
package marquez;
77

8-
import static marquez.db.LineageTestUtils.PRODUCER_URL;
9-
import static marquez.db.LineageTestUtils.SCHEMA_URL;
10-
import static org.assertj.core.api.Assertions.as;
118
import static org.assertj.core.api.Assertions.assertThat;
12-
import static org.junit.jupiter.api.Assertions.assertEquals;
139

14-
import com.fasterxml.jackson.core.JsonProcessingException;
15-
import com.fasterxml.jackson.core.type.TypeReference;
16-
import com.fasterxml.jackson.databind.JsonNode;
17-
import com.fasterxml.jackson.databind.ObjectMapper;
18-
import com.fasterxml.jackson.databind.node.ObjectNode;
19-
import com.fasterxml.jackson.databind.node.TextNode;
20-
import com.google.common.base.Predicate;
21-
import com.google.common.collect.ImmutableMap;
22-
import com.google.common.collect.Maps;
23-
import io.dropwizard.util.Resources;
24-
import io.openlineage.client.OpenLineage;
25-
import io.openlineage.client.OpenLineage.RunEvent;
26-
import io.openlineage.client.OpenLineage.RunEvent.EventType;
27-
import io.openlineage.client.OpenLineage.RunFacet;
28-
import io.openlineage.client.OpenLineage.RunFacetsBuilder;
2910
import java.io.IOException;
30-
import java.net.URI;
3111
import java.net.http.HttpResponse;
32-
import java.nio.charset.Charset;
33-
import java.time.Instant;
34-
import java.time.ZoneId;
35-
import java.time.ZonedDateTime;
36-
import java.time.temporal.ChronoField;
37-
import java.time.temporal.ChronoUnit;
38-
import java.util.Arrays;
39-
import java.util.Collections;
40-
import java.util.List;
41-
import java.util.Map;
42-
import java.util.Optional;
43-
import java.util.UUID;
4412
import java.util.concurrent.CompletableFuture;
45-
import java.util.concurrent.ExecutionException;
46-
import java.util.concurrent.TimeUnit;
47-
import java.util.concurrent.TimeoutException;
48-
import lombok.NonNull;
49-
import lombok.SneakyThrows;
5013
import lombok.extern.slf4j.Slf4j;
51-
import marquez.api.JdbiUtils;
52-
import marquez.client.MarquezClient;
53-
import marquez.client.models.Dataset;
54-
import marquez.client.models.DatasetVersion;
55-
import marquez.client.models.Job;
56-
import marquez.client.models.JobId;
57-
import marquez.client.models.JobVersion;
58-
import marquez.client.models.LineageEvent;
59-
import marquez.client.models.Run;
60-
import marquez.common.Utils;
61-
import marquez.db.LineageTestUtils;
62-
import org.assertj.core.api.InstanceOfAssertFactories;
63-
import org.jdbi.v3.core.Jdbi;
64-
import org.jetbrains.annotations.NotNull;
65-
import org.junit.jupiter.api.AfterEach;
6614
import org.junit.jupiter.api.Assertions;
6715
import org.junit.jupiter.api.Test;
68-
import org.junit.jupiter.params.ParameterizedTest;
69-
import org.junit.jupiter.params.provider.MethodSource;
70-
import org.junit.jupiter.params.provider.ValueSource;
71-
import org.slf4j.LoggerFactory;
7216

7317
@org.junit.jupiter.api.Tag("IntegrationTests")
7418
@Slf4j
7519
public class MetricsIntegrationTest extends BaseIntegrationTest {
7620
@Test
77-
public void testCheckMetricName() throws IOException {
21+
public void testCheckMetricV1Name() throws IOException {
7822
client.listNamespaces();
7923
CompletableFuture<String> response =
8024
this.getMetrics()
@@ -85,8 +29,26 @@ public void testCheckMetricName() throws IOException {
8529
Assertions.fail("Could not complete request");
8630
}
8731
});
32+
assertThat(response.join()).contains("marquez_db_NamespaceDao_findAll_count");
33+
}
34+
35+
@Test
36+
public void testCheckMetricV2Name() throws IOException {
37+
client.listNamespaces();
38+
CompletableFuture<String> response =
39+
this.getMetricsV2()
40+
.thenApply(HttpResponse::body)
41+
.whenComplete(
42+
(val, error) -> {
43+
if (error != null) {
44+
Assertions.fail("Could not complete request");
45+
}
46+
});
8847
assertThat(response.join())
8948
.contains(
90-
"marquez_db_NamespaceDao_findAll_GET__api_v1_namespaces");
49+
"marquez_sql_duration_seconds_sum{object_name=\"marquez.db.NamespaceDao\","
50+
+ "method_name=\"findAll\","
51+
+ "endpoint_method=\"GET\","
52+
+ "endpoint_path=\"/api/v1/namespaces\",}");
9153
}
9254
}

0 commit comments

Comments
 (0)