Skip to content

Commit 801831c

Browse files
Add endpoint method and path to metrics name. (#2850)
* Add endpoint method and path to metrics name. Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com> Add MetricsIntegrationTest. Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com> * Introduce labels to metrics and add them to v2 metrics endpoint. Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com> * Rename metric name and labels. Rename endpoint name for v2 metrics. Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com> --------- Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com> Co-authored-by: Willy Lulciuc <willy@datakin.com>
1 parent 879031a commit 801831c

10 files changed

Lines changed: 427 additions & 1 deletion

api/src/main/java/marquez/MarquezApp.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@
3535
import marquez.common.Utils;
3636
import marquez.db.DbMigration;
3737
import marquez.jobs.DbRetentionJob;
38+
import marquez.logging.DelegatingSqlLogger;
39+
import marquez.logging.LabelledSqlLogger;
3840
import marquez.logging.LoggingMdcFilter;
41+
import marquez.service.DatabaseMetrics;
3942
import marquez.tracing.SentryConfig;
4043
import marquez.tracing.TracingContainerResponseFilter;
4144
import marquez.tracing.TracingSQLLogger;
@@ -57,7 +60,9 @@ public final class MarquezApp extends Application<MarquezConfig> {
5760

5861
// Monitoring
5962
private static final String PROMETHEUS = "prometheus";
63+
private static final String PROMETHEUS_V2 = "prometheus_v2";
6064
private static final String PROMETHEUS_ENDPOINT = "/metrics";
65+
private static final String PROMETHEUS_ENDPOINT_V2 = "/v2beta/metrics";
6166

6267
public static void main(final String[] args) throws Exception {
6368
new MarquezApp().run(args);
@@ -73,7 +78,9 @@ public void initialize(@NonNull Bootstrap<MarquezConfig> bootstrap) {
7378
// Enable metric collection for prometheus.
7479
CollectorRegistry.defaultRegistry.register(
7580
new DropwizardExports(bootstrap.getMetricRegistry()));
81+
DatabaseMetrics.registry.register(new DropwizardExports(bootstrap.getMetricRegistry()));
7682
DefaultExports.initialize(); // Add metrics for CPU, JVM memory, etc.
83+
DefaultExports.register(DatabaseMetrics.registry);
7784

7885
// Enable variable substitution with environment variables.
7986
bootstrap.setConfigurationSourceProvider(
@@ -162,7 +169,8 @@ private Jdbi newJdbi(
162169
.installPlugin(new SqlObjectPlugin())
163170
.installPlugin(new PostgresPlugin())
164171
.installPlugin(new Jackson2Plugin());
165-
SqlLogger sqlLogger = new InstrumentedSqlLogger(env.metrics());
172+
SqlLogger sqlLogger =
173+
new DelegatingSqlLogger(new LabelledSqlLogger(), new InstrumentedSqlLogger(env.metrics()));
166174
if (isSentryEnabled(config)) {
167175
sqlLogger = new TracingSQLLogger(sqlLogger);
168176
}
@@ -197,6 +205,9 @@ private void registerServlets(@NonNull Environment env) {
197205

198206
// Expose metrics for monitoring.
199207
env.servlets().addServlet(PROMETHEUS, new MetricsServlet()).addMapping(PROMETHEUS_ENDPOINT);
208+
env.servlets()
209+
.addServlet(PROMETHEUS_V2, new MetricsServlet(DatabaseMetrics.registry))
210+
.addMapping(PROMETHEUS_ENDPOINT_V2);
200211
}
201212

202213
private void registerFilters(@NonNull Environment env, MarquezContext marquezContext) {
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2018-2023 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package marquez.logging;
7+
8+
import java.sql.SQLException;
9+
import java.util.Arrays;
10+
import java.util.List;
11+
import org.jdbi.v3.core.statement.SqlLogger;
12+
import org.jdbi.v3.core.statement.StatementContext;
13+
14+
/** A {@link SqlLogger} implementation that delegates to multiple {@link SqlLogger}s. */
15+
public class DelegatingSqlLogger implements SqlLogger {
16+
private final List<SqlLogger> sqlLoggers;
17+
18+
public DelegatingSqlLogger(SqlLogger... sqlLoggers) {
19+
this.sqlLoggers = Arrays.asList(sqlLoggers);
20+
}
21+
22+
@Override
23+
public void logAfterExecution(StatementContext statementContext) {
24+
for (SqlLogger sqlLogger : sqlLoggers) {
25+
sqlLogger.logAfterExecution(statementContext);
26+
}
27+
}
28+
29+
@Override
30+
public void logException(StatementContext statementContext, SQLException ex) {
31+
for (SqlLogger sqlLogger : sqlLoggers) {
32+
sqlLogger.logException(statementContext, ex);
33+
}
34+
}
35+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2018-2023 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package marquez.logging;
7+
8+
import java.sql.SQLException;
9+
import java.time.temporal.ChronoUnit;
10+
import marquez.service.DatabaseMetrics;
11+
import org.jdbi.v3.core.extension.ExtensionMethod;
12+
import org.jdbi.v3.core.statement.SqlLogger;
13+
import org.jdbi.v3.core.statement.StatementContext;
14+
import org.slf4j.MDC;
15+
16+
/**
17+
* A {@link SqlLogger} implementation for JDBI which uses the SQL objects' class names and method
18+
* names for nanosecond-precision timers.
19+
*/
20+
public class LabelledSqlLogger implements SqlLogger {
21+
22+
@Override
23+
public void logAfterExecution(StatementContext context) {
24+
log(context);
25+
}
26+
27+
@Override
28+
public void logException(StatementContext context, SQLException ex) {
29+
log(context);
30+
}
31+
32+
private void log(StatementContext context) {
33+
ExtensionMethod extensionMethod = context.getExtensionMethod();
34+
if (extensionMethod != null) {
35+
final long elapsed = context.getElapsedTime(ChronoUnit.NANOS);
36+
if (MDC.get("method") != null && MDC.get("pathWithParams") != null) {
37+
DatabaseMetrics.recordDbDuration(
38+
extensionMethod.getType().getName(),
39+
extensionMethod.getMethod().getName(),
40+
MDC.get("method"),
41+
MDC.get("pathWithParams"),
42+
elapsed / 1e9);
43+
}
44+
}
45+
}
46+
}

api/src/main/java/marquez/logging/LoggingMdcFilter.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,18 @@
66
package marquez.logging;
77

88
import java.io.IOException;
9+
import java.util.List;
910
import java.util.UUID;
1011
import javax.ws.rs.container.CompletionCallback;
1112
import javax.ws.rs.container.ContainerRequestContext;
1213
import javax.ws.rs.container.ContainerRequestFilter;
1314
import javax.ws.rs.container.ContainerResponseContext;
1415
import javax.ws.rs.container.ContainerResponseFilter;
16+
import javax.ws.rs.core.Context;
1517
import javax.ws.rs.core.Response;
1618
import lombok.extern.slf4j.Slf4j;
19+
import org.glassfish.jersey.server.ExtendedUriInfo;
20+
import org.glassfish.jersey.uri.UriTemplate;
1721
import org.slf4j.MDC;
1822

1923
/**
@@ -28,19 +32,30 @@ public class LoggingMdcFilter
2832
private static final String REQUEST_ID = "requestID";
2933
private static final String METHOD = "method";
3034
private static final String PATH = "path";
35+
private static final String PATH_WITH_PARAMETERS = "pathWithParams";
36+
37+
@Context private ExtendedUriInfo uriInfo;
3138

3239
@Override
3340
public void onComplete(Throwable throwable) {
3441
MDC.remove(REQUEST_ID);
3542
MDC.remove(METHOD);
3643
MDC.remove(PATH);
44+
MDC.remove(PATH_WITH_PARAMETERS);
3745
}
3846

3947
@Override
4048
public void filter(ContainerRequestContext requestContext) throws IOException {
4149
MDC.put(REQUEST_ID, UUID.randomUUID().toString());
4250
MDC.put(METHOD, requestContext.getMethod());
4351
MDC.put(PATH, requestContext.getUriInfo().getPath());
52+
StringBuilder pathWithPlaceholders = new StringBuilder();
53+
54+
List<UriTemplate> matchedTemplates = uriInfo.getMatchedTemplates();
55+
for (int i = matchedTemplates.size() - 1; i >= 0; i--) {
56+
pathWithPlaceholders.append(matchedTemplates.get(i).getTemplate());
57+
}
58+
MDC.put(PATH_WITH_PARAMETERS, pathWithPlaceholders.toString());
4459
}
4560

4661
@Override
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright 2018-2023 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package marquez.service;
7+
8+
import io.prometheus.client.CollectorRegistry;
9+
import io.prometheus.client.Histogram;
10+
11+
public class DatabaseMetrics {
12+
public static final CollectorRegistry registry = new io.prometheus.client.CollectorRegistry();
13+
14+
public static final Histogram dbDurationSeconds =
15+
Histogram.build()
16+
.namespace("marquez")
17+
.labelNames("sql_class", "sql_method", "http_method", "http_path")
18+
.name("db_duration_seconds_by_http_call")
19+
.help("The time to make the DB call for a given HTTP endpoint.")
20+
.register(registry);
21+
22+
public static void recordDbDuration(
23+
String sqlClass, String sqlMethod, String httpMethod, String httpPath, double duration) {
24+
dbDurationSeconds.labels(sqlClass, sqlMethod, httpMethod, httpPath).observe(duration);
25+
}
26+
}

api/src/test/java/marquez/BaseIntegrationTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,20 @@ protected CompletableFuture<HttpResponse<String>> sendLineage(String body) {
213213
return http2.sendAsync(request, BodyHandlers.ofString());
214214
}
215215

216+
protected CompletableFuture<HttpResponse<String>> getMetrics() {
217+
HttpRequest request =
218+
HttpRequest.newBuilder().uri(URI.create(baseUrl + "/metrics")).GET().build();
219+
220+
return http2.sendAsync(request, BodyHandlers.ofString());
221+
}
222+
223+
protected CompletableFuture<HttpResponse<String>> getMetricsV2() {
224+
HttpRequest request =
225+
HttpRequest.newBuilder().uri(URI.create(baseUrl + "/v2beta/metrics")).GET().build();
226+
227+
return http2.sendAsync(request, BodyHandlers.ofString());
228+
}
229+
216230
protected CompletableFuture<HttpResponse<String>> fetchLineage(String nodeId) {
217231
HttpRequest request =
218232
HttpRequest.newBuilder()
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2018-2023 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package marquez;
7+
8+
import static org.assertj.core.api.Assertions.assertThat;
9+
10+
import java.io.IOException;
11+
import java.net.http.HttpResponse;
12+
import java.util.concurrent.CompletableFuture;
13+
import lombok.extern.slf4j.Slf4j;
14+
import org.junit.jupiter.api.Assertions;
15+
import org.junit.jupiter.api.Test;
16+
17+
@org.junit.jupiter.api.Tag("IntegrationTests")
18+
@Slf4j
19+
public class MetricsIntegrationTest extends BaseIntegrationTest {
20+
@Test
21+
public void testCheckMetricV1Name() throws IOException {
22+
client.listNamespaces();
23+
CompletableFuture<String> response =
24+
this.getMetrics()
25+
.thenApply(HttpResponse::body)
26+
.whenComplete(
27+
(val, error) -> {
28+
if (error != null) {
29+
Assertions.fail("Could not complete request");
30+
}
31+
});
32+
assertThat(response.join()).contains("marquez_db_NamespaceDao_findAll_count");
33+
}
34+
35+
@Test
36+
public void testCheckMetricV2Name() throws IOException {
37+
client.listNamespaces();
38+
CompletableFuture<String> response =
39+
this.getMetricsV2()
40+
.thenApply(HttpResponse::body)
41+
.whenComplete(
42+
(val, error) -> {
43+
if (error != null) {
44+
Assertions.fail("Could not complete request");
45+
}
46+
});
47+
assertThat(response.join())
48+
.contains(
49+
"marquez_db_duration_seconds_by_http_call_sum{sql_class=\"marquez.db.NamespaceDao\","
50+
+ "sql_method=\"findAll\","
51+
+ "http_method=\"GET\","
52+
+ "http_path=\"/api/v1/namespaces\",}");
53+
}
54+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright 2018-2023 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package marquez.logging;
7+
8+
import static org.mockito.Mockito.mock;
9+
import static org.mockito.Mockito.times;
10+
import static org.mockito.Mockito.verify;
11+
12+
import java.sql.SQLException;
13+
import org.jdbi.v3.core.statement.SqlLogger;
14+
import org.jdbi.v3.core.statement.StatementContext;
15+
import org.junit.jupiter.api.BeforeEach;
16+
import org.junit.jupiter.api.Test;
17+
18+
public class DelegatingSqlLoggerTest {
19+
private SqlLogger logger1;
20+
private SqlLogger logger2;
21+
private DelegatingSqlLogger delegatingSqlLogger;
22+
private StatementContext statementContext;
23+
private SQLException sqlException;
24+
25+
@BeforeEach
26+
public void setUp() {
27+
logger1 = mock(SqlLogger.class);
28+
logger2 = mock(SqlLogger.class);
29+
delegatingSqlLogger = new DelegatingSqlLogger(logger1, logger2);
30+
statementContext = mock(StatementContext.class);
31+
sqlException = new SQLException("Test SQL Exception");
32+
}
33+
34+
@Test
35+
public void testLogAfterExecution() {
36+
// Act
37+
delegatingSqlLogger.logAfterExecution(statementContext);
38+
39+
// Assert
40+
verify(logger1, times(1)).logAfterExecution(statementContext);
41+
verify(logger2, times(1)).logAfterExecution(statementContext);
42+
}
43+
44+
@Test
45+
public void testLogException() {
46+
// Act
47+
delegatingSqlLogger.logException(statementContext, sqlException);
48+
49+
// Assert
50+
verify(logger1, times(1)).logException(statementContext, sqlException);
51+
verify(logger2, times(1)).logException(statementContext, sqlException);
52+
}
53+
}

0 commit comments

Comments
 (0)