Skip to content

Commit ecc4549

Browse files
authored
Add metadata cmd (#2091)
* Add metadata.json to .gitignore Signed-off-by: wslulciuc <willy@datakin.com> * Add psql conf for pghero Signed-off-by: wslulciuc <willy@datakin.com> * Add pghero Signed-off-by: wslulciuc <willy@datakin.com> * Add metadata cmd Signed-off-by: wslulciuc <willy@datakin.com> * Update javadocs Signed-off-by: wslulciuc <willy@datakin.com> * Add steps to enable query stats with pghero Signed-off-by: wslulciuc <willy@datakin.com> * Give pghero superuser access Signed-off-by: wslulciuc <willy@datakin.com> * Update cmd arg constant for --bytes-per-event Signed-off-by: wslulciuc <willy@datakin.com> * Simplify newOlEvents() Signed-off-by: wslulciuc <willy@datakin.com> Signed-off-by: wslulciuc <willy@datakin.com>
1 parent afb1075 commit ecc4549

6 files changed

Lines changed: 382 additions & 0 deletions

File tree

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ venv
3232
# Marquez configuration
3333
marquez.yml
3434

35+
# Metadata
36+
metadata.json
37+
3538
# jenv
3639
.java-version
3740

api/src/main/java/marquez/MarquezApp.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import lombok.NonNull;
2828
import lombok.extern.slf4j.Slf4j;
2929
import marquez.api.filter.JobRedirectFilter;
30+
import marquez.cli.MetadataCommand;
3031
import marquez.cli.SeedCommand;
3132
import marquez.common.Utils;
3233
import marquez.db.DbMigration;
@@ -75,6 +76,7 @@ public void initialize(@NonNull Bootstrap<MarquezConfig> bootstrap) {
7576
new EnvironmentVariableSubstitutor(ERROR_ON_UNDEFINED)));
7677

7778
// Add CLI commands
79+
bootstrap.addCommand(new MetadataCommand());
7880
bootstrap.addCommand(new SeedCommand());
7981

8082
bootstrap.getObjectMapper().disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
Lines changed: 361 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,361 @@
1+
/*
2+
* Copyright 2018-2022 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package marquez.cli;
7+
8+
import static com.google.common.collect.ImmutableList.toImmutableList;
9+
import static io.openlineage.client.OpenLineage.RunEvent.EventType.COMPLETE;
10+
import static io.openlineage.client.OpenLineage.RunEvent.EventType.START;
11+
12+
import com.google.common.collect.ImmutableList;
13+
import io.dropwizard.cli.Command;
14+
import io.dropwizard.setup.Bootstrap;
15+
import io.openlineage.client.OpenLineage;
16+
import java.io.FileWriter;
17+
import java.io.IOException;
18+
import java.io.PrintWriter;
19+
import java.net.URI;
20+
import java.time.Instant;
21+
import java.time.ZoneId;
22+
import java.time.ZonedDateTime;
23+
import java.util.List;
24+
import java.util.Random;
25+
import java.util.UUID;
26+
import java.util.stream.Stream;
27+
import lombok.NonNull;
28+
import lombok.extern.slf4j.Slf4j;
29+
import marquez.common.Utils;
30+
import marquez.common.models.DatasetName;
31+
import marquez.common.models.FieldName;
32+
import marquez.common.models.JobName;
33+
import marquez.common.models.NamespaceName;
34+
import marquez.common.models.RunId;
35+
import net.sourceforge.argparse4j.inf.Namespace;
36+
import net.sourceforge.argparse4j.inf.Subparser;
37+
38+
/**
39+
* A command to generate random source, dataset, and job metadata using <a
40+
* href="https://openlineage.io">OpenLineage</a>.
41+
*
42+
* <h2>Usage</h2>
43+
*
44+
* For example, the following command will generate {@code metadata.json} with {@code 10} runs
45+
* ({@code 20} events in total), where each START event will have a size of {@code ~16384} bytes;
46+
* events will be written to {@code metadata.json} in the {@code current} directory. You may specify
47+
* the location of {@code metadata.json} by using the command-line argument {@code --output}.
48+
*
49+
* <pre>{@code
50+
* java -jar marquez-api.jar metadata --runs 10 --bytes-per-event 16384
51+
* }</pre>
52+
*/
53+
@Slf4j
54+
public final class MetadataCommand extends Command {
55+
/* Used to calculate (approximate) total bytes per event. */
56+
private static final int BYTES_PER_RUN = 578;
57+
private static final int BYTES_PER_JOB = 58;
58+
private static final int BYTES_PER_FIELD_IN_SCHEMA = 256;
59+
60+
/* Default I/O and schema fields per event. */
61+
private static final int DEFAULT_NUM_OF_IO_PER_EVENT = 8;
62+
private static final int DEFAULT_NUM_OF_FIELDS_IN_SCHEMA_PER_EVENT = 16;
63+
64+
/* Default runs. */
65+
private static final int DEFAULT_RUNS = 25;
66+
67+
/* Default bytes. */
68+
private static final int DEFAULT_BYTES_PER_EVENT =
69+
BYTES_PER_RUN
70+
+ BYTES_PER_JOB
71+
+ ((BYTES_PER_FIELD_IN_SCHEMA * DEFAULT_NUM_OF_FIELDS_IN_SCHEMA_PER_EVENT)
72+
* DEFAULT_NUM_OF_IO_PER_EVENT);
73+
74+
/* Default output. */
75+
private static final String DEFAULT_OUTPUT = "metadata.json";
76+
77+
/* Args for metadata command. */
78+
private static final String CMD_ARG_METADATA_RUNS = "runs";
79+
private static final String CMD_ARG_METADATA_BYTES_PER_EVENT = "bytes-per-event";
80+
private static final String CMD_ARG_METADATA_OUTPUT = "output";
81+
82+
/* Used for event randomization. */
83+
private static final Random RANDOM = new Random();
84+
private static final ZoneId AMERICA_LOS_ANGELES = ZoneId.of("America/Los_Angeles");
85+
private static final List<String> FIELD_TYPES = ImmutableList.of("VARCHAR", "TEXT", "INTEGER");
86+
87+
private static final String OL_NAMESPACE = newNamespaceName().getValue();
88+
private static final OpenLineage OL =
89+
new OpenLineage(
90+
URI.create(
91+
"https://github.com/MarquezProject/marquez/blob/main/api/src/main/java/marquez/cli/MetadataCommand.java"));
92+
93+
/* Define metadata command. */
94+
public MetadataCommand() {
95+
super("metadata", "generate random metadata using the OpenLineage standard");
96+
}
97+
98+
/* Configure metadata command. */
99+
@Override
100+
public void configure(@NonNull Subparser subparser) {
101+
subparser
102+
.addArgument("--runs")
103+
.dest("runs")
104+
.type(Integer.class)
105+
.required(false)
106+
.setDefault(DEFAULT_RUNS)
107+
.help("limits OL runs up to N");
108+
subparser
109+
.addArgument("--bytes-per-event")
110+
.dest("bytes-per-event")
111+
.type(Integer.class)
112+
.required(false)
113+
.setDefault(DEFAULT_BYTES_PER_EVENT)
114+
.help("size (in bytes) per OL event");
115+
subparser
116+
.addArgument("-o", "--output")
117+
.dest("output")
118+
.type(String.class)
119+
.required(false)
120+
.help("the output metadata file")
121+
.setDefault(DEFAULT_OUTPUT);
122+
}
123+
124+
@Override
125+
public void run(@NonNull Bootstrap<?> bootstrap, @NonNull Namespace namespace) {
126+
final int runs = namespace.getInt(CMD_ARG_METADATA_RUNS);
127+
final int bytesPerEvent = namespace.getInt(CMD_ARG_METADATA_BYTES_PER_EVENT);
128+
final String output = namespace.getString(CMD_ARG_METADATA_OUTPUT);
129+
130+
// Generate, then write events to metadata file.
131+
writeOlEvents(newOlEvents(runs, bytesPerEvent), output);
132+
}
133+
134+
/** Returns new {@link OpenLineage.RunEvent} objects with random values. */
135+
private static List<OpenLineage.RunEvent> newOlEvents(
136+
final int numOfRuns, final int bytesPerEvent) {
137+
System.out.format(
138+
"Generating '%d' runs, each COMPLETE event will have a size of '~%d' (bytes)...\n",
139+
numOfRuns, bytesPerEvent);
140+
return Stream.generate(() -> newOlRunEvents(bytesPerEvent))
141+
.limit(numOfRuns)
142+
.flatMap(runEvents -> Stream.of(runEvents.start(), runEvents.complete()))
143+
.collect(toImmutableList());
144+
}
145+
146+
/**
147+
* Returns new {@link RunEvents} objects. A {@link RunEvents} object contains the {@code START}
148+
* and {@code COMPLETE} event for a given run.
149+
*/
150+
private static RunEvents newOlRunEvents(final int bytesPerEvent) {
151+
// (1) Generate run with an optional parent run, then the job.
152+
final OpenLineage.Run olRun = newRun(hasParentRunOrNot());
153+
final OpenLineage.Job olJob = newJob();
154+
155+
// (2) Generate number of I/O for run.
156+
int numOfInputs = RANDOM.nextInt(DEFAULT_NUM_OF_IO_PER_EVENT);
157+
int numOfOutputs = DEFAULT_NUM_OF_IO_PER_EVENT - numOfInputs;
158+
159+
// (3) Generate number of schema fields per I/O for run.
160+
final int numOfFieldsInSchemaForInputs =
161+
RANDOM.nextInt(DEFAULT_NUM_OF_FIELDS_IN_SCHEMA_PER_EVENT);
162+
final int numOfFieldsInSchemaForOutputs =
163+
DEFAULT_NUM_OF_FIELDS_IN_SCHEMA_PER_EVENT - numOfFieldsInSchemaForInputs;
164+
165+
// (4) Generate an event of N bytes if provided; otherwise use default.
166+
if (bytesPerEvent > DEFAULT_BYTES_PER_EVENT) {
167+
// Bytes per event:
168+
// +------------+-----------+-------------------+
169+
// | run meta | job meta | I/O meta |
170+
// +------------+-----------+-------------------+
171+
// |-> 578B <-|-> 78B <-|->(256B x N) x P <-|
172+
// where, N is number of fields per schema, and P is number of I/O per event.
173+
//
174+
// (5) Calculate the total I/O per event to equal the bytes per event.
175+
final int numOfInputsAndOutputsForEvent =
176+
(bytesPerEvent - BYTES_PER_RUN - BYTES_PER_JOB)
177+
/ (DEFAULT_NUM_OF_FIELDS_IN_SCHEMA_PER_EVENT * BYTES_PER_FIELD_IN_SCHEMA);
178+
179+
// (6) Update the number of I/O to generate for run based on calculation.
180+
numOfInputs = RANDOM.nextInt(numOfInputsAndOutputsForEvent);
181+
numOfOutputs = numOfInputsAndOutputsForEvent - numOfInputs;
182+
}
183+
return new RunEvents(
184+
OL.newRunEventBuilder()
185+
.eventType(START)
186+
.eventTime(newEventTime())
187+
.run(olRun)
188+
.job(olJob)
189+
.inputs(newInputs(numOfInputs, numOfFieldsInSchemaForInputs))
190+
.outputs(newOutputs(numOfOutputs, numOfFieldsInSchemaForOutputs))
191+
.build(),
192+
OL.newRunEventBuilder()
193+
.eventType(COMPLETE)
194+
.eventTime(newEventTime())
195+
.run(olRun)
196+
.job(olJob)
197+
.build());
198+
}
199+
200+
/** Write {@link OpenLineage.RunEvent}s to the specified {@code output}. */
201+
private static void writeOlEvents(
202+
@NonNull final List<OpenLineage.RunEvent> olEvents, @NonNull final String output) {
203+
System.out.format("Writing '%d' events to: '%s'\n", olEvents.size(), output);
204+
FileWriter fileWriter;
205+
PrintWriter printWriter = null;
206+
try {
207+
fileWriter = new FileWriter(output);
208+
printWriter = new PrintWriter(fileWriter);
209+
printWriter.write(Utils.toJson(olEvents));
210+
} catch (IOException e) {
211+
e.printStackTrace();
212+
} finally {
213+
if (printWriter != null) {
214+
printWriter.close();
215+
}
216+
}
217+
}
218+
219+
/**
220+
* Returns a new {@link OpenLineage.Run} object. A {@code parent} run will be associated with
221+
* {@code child} run if {@code hasParentRun} is {@code true}; otherwise, the {@code child} run
222+
* will not have a {@code parent} run.
223+
*/
224+
private static OpenLineage.Run newRun(final boolean hasParentRun) {
225+
return OL.newRun(
226+
newRunId().getValue(),
227+
OL.newRunFacetsBuilder()
228+
.parent(
229+
hasParentRun
230+
? OL.newParentRunFacetBuilder().run(newParentRun()).job(newParentJob()).build()
231+
: null)
232+
.nominalTime(
233+
OL.newNominalTimeRunFacetBuilder()
234+
.nominalStartTime(newNominalTime())
235+
.nominalEndTime(newNominalTime().plusHours(1))
236+
.build())
237+
.build());
238+
}
239+
240+
/** Returns a new {@link OpenLineage.ParentRunFacetRun} object. */
241+
private static OpenLineage.ParentRunFacetRun newParentRun() {
242+
return OL.newParentRunFacetRunBuilder().runId(newRunId().getValue()).build();
243+
}
244+
245+
/** Returns a new {@link OpenLineage.ParentRunFacetJob} object. */
246+
private static OpenLineage.ParentRunFacetJob newParentJob() {
247+
return OL.newParentRunFacetJobBuilder()
248+
.namespace(OL_NAMESPACE)
249+
.name(newJobName().getValue())
250+
.build();
251+
}
252+
253+
/** Returns a new {@link OpenLineage.Job} object. */
254+
static OpenLineage.Job newJob() {
255+
return OL.newJobBuilder().namespace(OL_NAMESPACE).name(newJobName().getValue()).build();
256+
}
257+
258+
/** Returns new {@link OpenLineage.InputDataset} objects. */
259+
private static List<OpenLineage.InputDataset> newInputs(
260+
final int numOfInputs, final int numOfFields) {
261+
return Stream.generate(
262+
() ->
263+
OL.newInputDatasetBuilder()
264+
.namespace(OL_NAMESPACE)
265+
.name(newDatasetName().getValue())
266+
.facets(
267+
OL.newDatasetFacetsBuilder().schema(newDatasetSchema(numOfFields)).build())
268+
.build())
269+
.limit(numOfInputs)
270+
.collect(toImmutableList());
271+
}
272+
273+
/** Returns new {@link OpenLineage.OutputDataset} objects. */
274+
static List<OpenLineage.OutputDataset> newOutputs(final int numOfOutputs, final int numOfFields) {
275+
return Stream.generate(
276+
() ->
277+
OL.newOutputDatasetBuilder()
278+
.namespace(OL_NAMESPACE)
279+
.name(newDatasetName().getValue())
280+
.facets(
281+
OL.newDatasetFacetsBuilder().schema(newDatasetSchema(numOfFields)).build())
282+
.build())
283+
.limit(numOfOutputs)
284+
.collect(toImmutableList());
285+
}
286+
287+
/** Returns a new {@link OpenLineage.SchemaDatasetFacet} object. */
288+
private static OpenLineage.SchemaDatasetFacet newDatasetSchema(final int numOfFields) {
289+
return OL.newSchemaDatasetFacetBuilder().fields(newFields(numOfFields)).build();
290+
}
291+
292+
/** Returns new {@link OpenLineage.SchemaDatasetFacetFields} objects. */
293+
private static List<OpenLineage.SchemaDatasetFacetFields> newFields(final int numOfFields) {
294+
return Stream.generate(
295+
() ->
296+
OL.newSchemaDatasetFacetFieldsBuilder()
297+
.name(newFieldName().getValue())
298+
.type(newFieldType())
299+
.description(newDescription())
300+
.build())
301+
.limit(numOfFields)
302+
.collect(toImmutableList());
303+
}
304+
305+
/** Returns a new {@link NamespaceName} object. */
306+
private static NamespaceName newNamespaceName() {
307+
return NamespaceName.of("namespace" + newId());
308+
}
309+
310+
/** Returns a new {@link RunId} object. */
311+
private static RunId newRunId() {
312+
return RunId.of(UUID.randomUUID());
313+
}
314+
315+
/** Returns a new {@link DatasetName} object. */
316+
private static DatasetName newDatasetName() {
317+
return DatasetName.of("dataset" + newId());
318+
}
319+
320+
/** Returns a new {@link FieldName} object. */
321+
private static FieldName newFieldName() {
322+
return FieldName.of("field" + newId());
323+
}
324+
325+
/** Returns a new field {@code type}. */
326+
private static String newFieldType() {
327+
return FIELD_TYPES.get(RANDOM.nextInt(FIELD_TYPES.size()));
328+
}
329+
330+
/** Returns a new {@link JobName} object. */
331+
private static JobName newJobName() {
332+
return JobName.of("job" + newId());
333+
}
334+
335+
/** Returns a new {@code description}. */
336+
private static String newDescription() {
337+
return "description" + newId();
338+
}
339+
340+
/** Returns a new {@code nominal} time. */
341+
private static ZonedDateTime newNominalTime() {
342+
return Instant.now().atZone(AMERICA_LOS_ANGELES);
343+
}
344+
345+
/** Returns a new {@code event} time. */
346+
private static ZonedDateTime newEventTime() {
347+
return Instant.now().atZone(AMERICA_LOS_ANGELES);
348+
}
349+
350+
/** Returns {@code true} if parent run should be generated; {@code false} otherwise. */
351+
private static boolean hasParentRunOrNot() {
352+
return RANDOM.nextBoolean();
353+
}
354+
355+
private static int newId() {
356+
return RANDOM.nextInt(Integer.MAX_VALUE - 1);
357+
}
358+
359+
/** A container class for run info. */
360+
record RunEvents(@NonNull OpenLineage.RunEvent start, @NonNull OpenLineage.RunEvent complete) {}
361+
}

docker-compose.dev.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,11 @@ services:
88
build:
99
context: ./web
1010
dockerfile: Dockerfile
11+
12+
pghero:
13+
image: ankane/pghero
14+
container_name: pghero
15+
ports:
16+
- "8080:8080"
17+
environment:
18+
DATABASE_URL: postgres://postgres:password@db:5432

0 commit comments

Comments
 (0)