Skip to content
97 changes: 82 additions & 15 deletions api/src/main/java/marquez/cli/MetadataCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.openlineage.client.OpenLineage.RunEvent.EventType.COMPLETE;
import static io.openlineage.client.OpenLineage.RunEvent.EventType.START;
import static java.time.ZoneOffset.UTC;

import com.google.common.collect.ImmutableList;
import io.dropwizard.cli.Command;
Expand All @@ -21,9 +22,11 @@
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import marquez.common.Utils;
Expand Down Expand Up @@ -63,6 +66,8 @@ public final class MetadataCommand extends Command {

/* Default runs. */
private static final int DEFAULT_RUNS = 25;
private static final int DEFAULT_RUN_EXECUTIONS = 10;
private static final int DEFAULT_RUN_DURATION = 300; // 5.minutes

/* Default bytes. */
private static final int DEFAULT_BYTES_PER_EVENT =
Expand All @@ -76,6 +81,10 @@ public final class MetadataCommand extends Command {

/* Args for metadata command. */
private static final String CMD_ARG_METADATA_RUNS = "runs";
private static final String CMD_ARG_METADATA_RUN_EXECUTIONS = "run-executions";
private static final String CMD_ARG_METADATA_RUN_DURATION = "run-duration";
private static final String CMD_ARG_METADATA_EVENT_START_TIME = "event-start-time";
private static final String CMD_ARG_METADATA_EVENT_END_TIME = "event-end-time";
private static final String CMD_ARG_METADATA_BYTES_PER_EVENT = "bytes-per-event";
private static final String CMD_ARG_METADATA_OUTPUT = "output";

Expand Down Expand Up @@ -105,13 +114,39 @@ public void configure(@NonNull Subparser subparser) {
.required(false)
.setDefault(DEFAULT_RUNS)
.help("limits OL runs up to N");
subparser
.addArgument("--run-executions")
.dest("run-executions")
.type(Integer.class)
.required(false)
.setDefault(DEFAULT_RUN_EXECUTIONS)
.help("limits OL run executions per job up to N");
subparser
.addArgument("--run-duration")
.dest("run-duration")
.type(Integer.class)
.required(false)
.setDefault(DEFAULT_RUN_DURATION)
.help("the total OL run duration (in seconds) per job");
subparser
.addArgument("--event-start-time")
.dest("event-start-time")
.type(String.class)
.required(false)
.help("the OL run event start time (as UTC ISO); timestamp format: 'YYYY-MM-DDTHH:MM:SSZ'");
subparser
.addArgument("--event-end-time")
.dest("event-end-time")
.type(String.class)
.required(false)
.help("the OL run event end time (as UTC ISO); timestamp format: 'YYYY-MM-DDTHH:MM:SSZ'");
subparser
.addArgument("--bytes-per-event")
.dest("bytes-per-event")
.type(Integer.class)
.required(false)
.setDefault(DEFAULT_BYTES_PER_EVENT)
.help("size (in bytes) per OL event");
.help("size (in bytes) per OL run event");
subparser
.addArgument("-o", "--output")
.dest("output")
Expand All @@ -124,20 +159,40 @@ public void configure(@NonNull Subparser subparser) {
@Override
public void run(@NonNull Bootstrap<?> bootstrap, @NonNull Namespace namespace) {
final int runs = namespace.getInt(CMD_ARG_METADATA_RUNS);
final int runExecutions = namespace.getInt(CMD_ARG_METADATA_RUN_EXECUTIONS);
final int runDurationPerExecution = namespace.getInt(CMD_ARG_METADATA_RUN_DURATION);
final String eventStartTime = namespace.getString(CMD_ARG_METADATA_EVENT_START_TIME);
final String eventEndTime = namespace.getString(CMD_ARG_METADATA_EVENT_END_TIME);
final int bytesPerEvent = namespace.getInt(CMD_ARG_METADATA_BYTES_PER_EVENT);
final String output = namespace.getString(CMD_ARG_METADATA_OUTPUT);

// Generate, then write events to metadata file.
writeOlEvents(newOlEvents(runs, bytesPerEvent), output);
writeOlEvents(
newOlEvents(
runs,
runExecutions,
runDurationPerExecution,
eventStartTime,
eventEndTime,
bytesPerEvent),
output);
}

/** Returns new {@link OpenLineage.RunEvent} objects with random values. */
private static List<OpenLineage.RunEvent> newOlEvents(
final int numOfRuns, final int bytesPerEvent) {
final int numOfRuns,
final int runExecutions,
final int runDurationPerExecution,
@Nullable final String eventStartTime,
@Nullable final String eventEndTime,
final int bytesPerEvent) {
System.out.format(
"Generating '%d' runs, each COMPLETE event will have a size of '~%d' (bytes)...\n",
numOfRuns, bytesPerEvent);
return Stream.generate(() -> newOlRunEvents(bytesPerEvent))
return Stream.generate(
() ->
newOlRunEvents(
runDurationPerExecution, eventStartTime, eventEndTime, bytesPerEvent))
.limit(numOfRuns)
.flatMap(runEvents -> Stream.of(runEvents.start(), runEvents.complete()))
.collect(toImmutableList());
Expand All @@ -147,22 +202,34 @@ private static List<OpenLineage.RunEvent> newOlEvents(
* Returns new {@link RunEvents} objects. A {@link RunEvents} object contains the {@code START}
* and {@code COMPLETE} event for a given run.
*/
private static RunEvents newOlRunEvents(final int bytesPerEvent) {
// (1) Generate run with an optional parent run, then the job.
private static RunEvents newOlRunEvents(
final int runDuration,
@Nullable final String eventStartTime,
@Nullable final String eventEndTime,
final int bytesPerEvent) {
// (1) Generate start and end time for run.
final ZonedDateTime eventStartTimeWithTz =
Optional.ofNullable(eventStartTime).map(ZonedDateTime::parse).orElse(newEventTimeAsUtc());
final ZonedDateTime eventEndTimeWithTz =
Optional.ofNullable(eventEndTime)
.map(ZonedDateTime::parse)
.orElse(eventStartTimeWithTz.plusSeconds(runDuration));

// (2) Generate run with an optional parent run, then the job.
final OpenLineage.Run olRun = newRun(hasParentRunOrNot());
final OpenLineage.Job olJob = newJob();

// (2) Generate number of I/O for run.
// (3) Generate number of I/O for run.
int numOfInputs = RANDOM.nextInt(DEFAULT_NUM_OF_IO_PER_EVENT);
int numOfOutputs = DEFAULT_NUM_OF_IO_PER_EVENT - numOfInputs;

// (3) Generate number of schema fields per I/O for run.
// (4) Generate number of schema fields per I/O for run.
final int numOfFieldsInSchemaForInputs =
RANDOM.nextInt(DEFAULT_NUM_OF_FIELDS_IN_SCHEMA_PER_EVENT);
final int numOfFieldsInSchemaForOutputs =
DEFAULT_NUM_OF_FIELDS_IN_SCHEMA_PER_EVENT - numOfFieldsInSchemaForInputs;

// (4) Generate an event of N bytes if provided; otherwise use default.
// (5) Generate an event of N bytes if provided; otherwise use default.
if (bytesPerEvent > DEFAULT_BYTES_PER_EVENT) {
// Bytes per event:
// +------------+-----------+-------------------+
Expand All @@ -171,27 +238,27 @@ private static RunEvents newOlRunEvents(final int bytesPerEvent) {
// |-> 578B <-|-> 78B <-|->(256B x N) x P <-|
// where, N is number of fields per schema, and P is number of I/O per event.
//
// (5) Calculate the total I/O per event to equal the bytes per event.
// (6) Calculate the total I/O per event to equal the bytes per event.
final int numOfInputsAndOutputsForEvent =
(bytesPerEvent - BYTES_PER_RUN - BYTES_PER_JOB)
/ (DEFAULT_NUM_OF_FIELDS_IN_SCHEMA_PER_EVENT * BYTES_PER_FIELD_IN_SCHEMA);

// (6) Update the number of I/O to generate for run based on calculation.
// (7) Update the number of I/O to generate for run based on calculation.
numOfInputs = RANDOM.nextInt(numOfInputsAndOutputsForEvent);
numOfOutputs = numOfInputsAndOutputsForEvent - numOfInputs;
}
return new RunEvents(
OL.newRunEventBuilder()
.eventType(START)
.eventTime(newEventTime())
.eventTime(eventStartTimeWithTz)
.run(olRun)
.job(olJob)
.inputs(newInputs(numOfInputs, numOfFieldsInSchemaForInputs))
.outputs(newOutputs(numOfOutputs, numOfFieldsInSchemaForOutputs))
.build(),
OL.newRunEventBuilder()
.eventType(COMPLETE)
.eventTime(newEventTime())
.eventTime(eventEndTimeWithTz)
.run(olRun)
.job(olJob)
.build());
Expand Down Expand Up @@ -343,8 +410,8 @@ private static ZonedDateTime newNominalTime() {
}

/** Returns a new {@code event} time. */
private static ZonedDateTime newEventTime() {
return Instant.now().atZone(AMERICA_LOS_ANGELES);
private static ZonedDateTime newEventTimeAsUtc() {
return ZonedDateTime.now(UTC);
Comment thread
wslulciuc marked this conversation as resolved.
}

/** Returns {@code true} if parent run should be generated; {@code false} otherwise. */
Expand Down