Skip to content

Commit 7bb7545

Browse files
committed
Introduce labels to metrics and add them to v2 metrics endpoint.
Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com>
1 parent 89ef52f commit 7bb7545

9 files changed

Lines changed: 136 additions & 119 deletions

File tree

api/src/main/java/marquez/MarquezApp.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,9 @@
3535
import marquez.common.Utils;
3636
import marquez.db.DbMigration;
3737
import marquez.jobs.DbRetentionJob;
38+
import marquez.logging.DelegatingSqlLogger;
39+
import marquez.logging.LabelledSqlLogger;
3840
import marquez.logging.LoggingMdcFilter;
39-
import marquez.logging.MarquezMetricNameStrategy;
4041
import marquez.tracing.SentryConfig;
4142
import marquez.tracing.TracingContainerResponseFilter;
4243
import marquez.tracing.TracingSQLLogger;
@@ -58,7 +59,11 @@ public final class MarquezApp extends Application<MarquezConfig> {
5859

5960
// Monitoring
6061
private static final String PROMETHEUS = "prometheus";
62+
private static final String PROMETHEUS_V2 = "prometheus_v2";
6163
private static final String PROMETHEUS_ENDPOINT = "/metrics";
64+
private static final String PROMETHEUS_ENDPOINT_V2 = "/metrics/v2";
65+
66+
private static final CollectorRegistry registry = new CollectorRegistry();
6267

6368
public static void main(final String[] args) throws Exception {
6469
new MarquezApp().run(args);
@@ -74,7 +79,9 @@ public void initialize(@NonNull Bootstrap<MarquezConfig> bootstrap) {
7479
// Enable metric collection for prometheus.
7580
CollectorRegistry.defaultRegistry.register(
7681
new DropwizardExports(bootstrap.getMetricRegistry()));
82+
registry.register(new DropwizardExports(bootstrap.getMetricRegistry()));
7783
DefaultExports.initialize(); // Add metrics for CPU, JVM memory, etc.
84+
DefaultExports.register(registry);
7885

7986
// Enable variable substitution with environment variables.
8087
bootstrap.setConfigurationSourceProvider(
@@ -163,7 +170,9 @@ private Jdbi newJdbi(
163170
.installPlugin(new SqlObjectPlugin())
164171
.installPlugin(new PostgresPlugin())
165172
.installPlugin(new Jackson2Plugin());
166-
SqlLogger sqlLogger = new InstrumentedSqlLogger(env.metrics(), new MarquezMetricNameStrategy());
173+
SqlLogger sqlLogger =
174+
new DelegatingSqlLogger(
175+
new LabelledSqlLogger(registry), new InstrumentedSqlLogger(env.metrics()));
167176
if (isSentryEnabled(config)) {
168177
sqlLogger = new TracingSQLLogger(sqlLogger);
169178
}
@@ -198,6 +207,9 @@ private void registerServlets(@NonNull Environment env) {
198207

199208
// Expose metrics for monitoring.
200209
env.servlets().addServlet(PROMETHEUS, new MetricsServlet()).addMapping(PROMETHEUS_ENDPOINT);
210+
env.servlets()
211+
.addServlet(PROMETHEUS_V2, new MetricsServlet(registry))
212+
.addMapping(PROMETHEUS_ENDPOINT_V2);
201213
}
202214

203215
private void registerFilters(@NonNull Environment env, MarquezContext marquezContext) {

api/src/main/java/marquez/api/DatasetResource.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import marquez.common.models.NamespaceName;
4040
import marquez.common.models.TagName;
4141
import marquez.common.models.Version;
42+
import marquez.service.JobMetrics;
4243
import marquez.service.ServiceFactory;
4344
import marquez.service.models.Dataset;
4445
import marquez.service.models.DatasetMeta;
@@ -151,6 +152,7 @@ public Response list(
151152
datasetService.findAllWithTags(namespaceName.getValue(), limit, offset);
152153
columnLineageService.enrichWithColumnLineage(datasets);
153154
final int totalCount = datasetService.countFor(namespaceName.getValue());
155+
JobMetrics.emitJobCreationMetric("test", "mymeta");
154156
return Response.ok(new ResultsPage<>("datasets", datasets, totalCount)).build();
155157
}
156158

api/src/main/java/marquez/db/JobDao.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import marquez.db.mappers.JobRowMapper;
2828
import marquez.db.models.JobRow;
2929
import marquez.db.models.NamespaceRow;
30+
import marquez.service.JobMetrics;
3031
import marquez.service.models.Job;
3132
import marquez.service.models.JobMeta;
3233
import marquez.service.models.Run;
@@ -313,6 +314,7 @@ default void setJobData(Run run, Job j) {
313314

314315
default JobRow upsertJobMeta(
315316
NamespaceName namespaceName, JobName jobName, JobMeta jobMeta, ObjectMapper mapper) {
317+
JobMetrics.emitJobCreationMetric(namespaceName.getValue(), jobMeta.getType().toString());
316318
return upsertJobMeta(namespaceName, jobName, null, jobMeta, mapper);
317319
}
318320

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package marquez.logging;
2+
3+
import java.sql.SQLException;
4+
import java.util.Arrays;
5+
import java.util.List;
6+
import org.jdbi.v3.core.statement.SqlLogger;
7+
import org.jdbi.v3.core.statement.StatementContext;
8+
9+
/** A {@link SqlLogger} implementation that delegates to multiple {@link SqlLogger}s. */
10+
public class DelegatingSqlLogger implements SqlLogger {
11+
private final List<SqlLogger> sqlLoggers;
12+
13+
public DelegatingSqlLogger(SqlLogger... sqlLoggers) {
14+
this.sqlLoggers = Arrays.asList(sqlLoggers);
15+
}
16+
17+
@Override
18+
public void logAfterExecution(StatementContext statementContext) {
19+
for (SqlLogger sqlLogger : sqlLoggers) {
20+
sqlLogger.logAfterExecution(statementContext);
21+
}
22+
}
23+
24+
@Override
25+
public void logException(StatementContext statementContext, SQLException ex) {
26+
for (SqlLogger sqlLogger : sqlLoggers) {
27+
sqlLogger.logException(statementContext, ex);
28+
}
29+
}
30+
}

api/src/main/java/marquez/logging/EndpointNameStrategy.java

Lines changed: 0 additions & 33 deletions
This file was deleted.
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package marquez.logging;
2+
3+
import io.prometheus.client.CollectorRegistry;
4+
import io.prometheus.client.Histogram;
5+
import java.sql.SQLException;
6+
import java.time.temporal.ChronoUnit;
7+
import org.jdbi.v3.core.extension.ExtensionMethod;
8+
import org.jdbi.v3.core.statement.SqlLogger;
9+
import org.jdbi.v3.core.statement.StatementContext;
10+
import org.slf4j.MDC;
11+
12+
/**
13+
* A {@link SqlLogger} implementation for JDBI which uses the SQL objects' class names and method
14+
* names for nanosecond-precision timers.
15+
*/
16+
public class LabelledSqlLogger implements SqlLogger {
17+
private final Histogram metric;
18+
19+
public LabelledSqlLogger() {
20+
this(CollectorRegistry.defaultRegistry);
21+
}
22+
23+
public LabelledSqlLogger(CollectorRegistry registry) {
24+
this.metric =
25+
Histogram.build()
26+
.namespace("marquez")
27+
.labelNames("object_name", "method_name", "endpoint_method", "endpoint_path")
28+
.name("sql_duration_seconds")
29+
.help("SQL execution duration in seconds")
30+
.register(registry);
31+
}
32+
33+
@Override
34+
public void logAfterExecution(StatementContext context) {
35+
log(context);
36+
}
37+
38+
@Override
39+
public void logException(StatementContext context, SQLException ex) {
40+
log(context);
41+
}
42+
43+
private void log(StatementContext context) {
44+
ExtensionMethod extensionMethod = context.getExtensionMethod();
45+
if (extensionMethod != null) {
46+
final long elapsed = context.getElapsedTime(ChronoUnit.NANOS);
47+
if (MDC.get("method") != null && MDC.get("pathWithParams") != null) {
48+
metric
49+
.labels(
50+
extensionMethod.getType().getName(),
51+
extensionMethod.getMethod().getName(),
52+
MDC.get("method"),
53+
MDC.get("pathWithParams"))
54+
.observe(elapsed / 1e9);
55+
}
56+
}
57+
}
58+
}

api/src/main/java/marquez/logging/MarquezMetricNameStrategy.java

Lines changed: 0 additions & 20 deletions
This file was deleted.

api/src/test/java/marquez/BaseIntegrationTest.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -215,10 +215,14 @@ protected CompletableFuture<HttpResponse<String>> sendLineage(String body) {
215215

216216
protected CompletableFuture<HttpResponse<String>> getMetrics() {
217217
HttpRequest request =
218-
HttpRequest.newBuilder()
219-
.uri(URI.create(baseUrl + "/metrics"))
220-
.GET()
221-
.build();
218+
HttpRequest.newBuilder().uri(URI.create(baseUrl + "/metrics")).GET().build();
219+
220+
return http2.sendAsync(request, BodyHandlers.ofString());
221+
}
222+
223+
protected CompletableFuture<HttpResponse<String>> getMetricsV2() {
224+
HttpRequest request =
225+
HttpRequest.newBuilder().uri(URI.create(baseUrl + "/metrics/v2")).GET().build();
222226

223227
return http2.sendAsync(request, BodyHandlers.ofString());
224228
}

api/src/test/java/marquez/MetricsIntegrationTest.java

Lines changed: 22 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -5,76 +5,20 @@
55

66
package marquez;
77

8-
import static marquez.db.LineageTestUtils.PRODUCER_URL;
9-
import static marquez.db.LineageTestUtils.SCHEMA_URL;
10-
import static org.assertj.core.api.Assertions.as;
118
import static org.assertj.core.api.Assertions.assertThat;
12-
import static org.junit.jupiter.api.Assertions.assertEquals;
139

14-
import com.fasterxml.jackson.core.JsonProcessingException;
15-
import com.fasterxml.jackson.core.type.TypeReference;
16-
import com.fasterxml.jackson.databind.JsonNode;
17-
import com.fasterxml.jackson.databind.ObjectMapper;
18-
import com.fasterxml.jackson.databind.node.ObjectNode;
19-
import com.fasterxml.jackson.databind.node.TextNode;
20-
import com.google.common.base.Predicate;
21-
import com.google.common.collect.ImmutableMap;
22-
import com.google.common.collect.Maps;
23-
import io.dropwizard.util.Resources;
24-
import io.openlineage.client.OpenLineage;
25-
import io.openlineage.client.OpenLineage.RunEvent;
26-
import io.openlineage.client.OpenLineage.RunEvent.EventType;
27-
import io.openlineage.client.OpenLineage.RunFacet;
28-
import io.openlineage.client.OpenLineage.RunFacetsBuilder;
2910
import java.io.IOException;
30-
import java.net.URI;
3111
import java.net.http.HttpResponse;
32-
import java.nio.charset.Charset;
33-
import java.time.Instant;
34-
import java.time.ZoneId;
35-
import java.time.ZonedDateTime;
36-
import java.time.temporal.ChronoField;
37-
import java.time.temporal.ChronoUnit;
38-
import java.util.Arrays;
39-
import java.util.Collections;
40-
import java.util.List;
41-
import java.util.Map;
42-
import java.util.Optional;
43-
import java.util.UUID;
4412
import java.util.concurrent.CompletableFuture;
45-
import java.util.concurrent.ExecutionException;
46-
import java.util.concurrent.TimeUnit;
47-
import java.util.concurrent.TimeoutException;
48-
import lombok.NonNull;
49-
import lombok.SneakyThrows;
5013
import lombok.extern.slf4j.Slf4j;
51-
import marquez.api.JdbiUtils;
52-
import marquez.client.MarquezClient;
53-
import marquez.client.models.Dataset;
54-
import marquez.client.models.DatasetVersion;
55-
import marquez.client.models.Job;
56-
import marquez.client.models.JobId;
57-
import marquez.client.models.JobVersion;
58-
import marquez.client.models.LineageEvent;
59-
import marquez.client.models.Run;
60-
import marquez.common.Utils;
61-
import marquez.db.LineageTestUtils;
62-
import org.assertj.core.api.InstanceOfAssertFactories;
63-
import org.jdbi.v3.core.Jdbi;
64-
import org.jetbrains.annotations.NotNull;
65-
import org.junit.jupiter.api.AfterEach;
6614
import org.junit.jupiter.api.Assertions;
6715
import org.junit.jupiter.api.Test;
68-
import org.junit.jupiter.params.ParameterizedTest;
69-
import org.junit.jupiter.params.provider.MethodSource;
70-
import org.junit.jupiter.params.provider.ValueSource;
71-
import org.slf4j.LoggerFactory;
7216

7317
@org.junit.jupiter.api.Tag("IntegrationTests")
7418
@Slf4j
7519
public class MetricsIntegrationTest extends BaseIntegrationTest {
7620
@Test
77-
public void testCheckMetricName() throws IOException {
21+
public void testCheckMetricV1Name() throws IOException {
7822
client.listNamespaces();
7923
CompletableFuture<String> response =
8024
this.getMetrics()
@@ -85,8 +29,26 @@ public void testCheckMetricName() throws IOException {
8529
Assertions.fail("Could not complete request");
8630
}
8731
});
88-
assertThat(response.join())
89-
.contains(
90-
"marquez_db_NamespaceDao_findAll_GET__api_v1_namespaces");
32+
assertThat(response.join()).contains("marquez_db_NamespaceDao_findAll_count");
9133
}
34+
35+
@Test
36+
public void testCheckMetricV2Name() throws IOException {
37+
client.listNamespaces();
38+
CompletableFuture<String> response =
39+
this.getMetricsV2()
40+
.thenApply(HttpResponse::body)
41+
.whenComplete(
42+
(val, error) -> {
43+
if (error != null) {
44+
Assertions.fail("Could not complete request");
45+
}
46+
});
47+
assertThat(response.join())
48+
.contains(
49+
"marquez_sql_duration_seconds_sum{object_name=\"marquez.db.NamespaceDao\","
50+
+ "method_name=\"findAll\","
51+
+ "endpoint_method=\"GET\","
52+
+ "endpoint_path=\"/api/v1/namespaces\",}");
53+
}
9254
}

0 commit comments

Comments
 (0)