Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions api/src/main/java/marquez/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ private Columns() {}

/* COMMON ROW COLUMNS */
public static final String ROW_UUID = "uuid";
public static final String PARENT_RUN_UUID = "parent_run_uuid";
public static final String TYPE = "type";
public static final String CREATED_AT = "created_at";
public static final String UPDATED_AT = "updated_at";
Expand Down Expand Up @@ -80,6 +81,8 @@ private Columns() {}
public static final String SCHEMA_LOCATION = "schema_location";

/* JOB ROW COLUMNS */
public static final String PARENT_JOB_NAME = "parent_job_name";
public static final String SIMPLE_NAME = "simple_name";
public static final String SYMLINK_TARGET_UUID = "symlink_target_uuid";

/* JOB VERSION I/O ROW COLUMNS */
Expand Down
88 changes: 82 additions & 6 deletions api/src/main/java/marquez/db/JobDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ SELECT run_uuid, JSON_AGG(e.facets) AS facets
SELECT run_uuid, event->'job'->'facets' AS facets
FROM lineage_events AS le
INNER JOIN job_versions jv2 ON jv2.latest_run_uuid=le.run_uuid
INNER JOIN jobs j2 ON j2.current_version_uuid=jv2.uuid
INNER JOIN jobs_view j2 ON j2.current_version_uuid=jv2.uuid
WHERE j2.name=:jobName AND j2.namespace_name=:namespaceName
ORDER BY event_time ASC
) e
Expand All @@ -88,6 +88,24 @@ default Optional<Job> findWithRun(String namespaceName, String jobName) {
return job;
}

@SqlQuery(
"""
WITH RECURSIVE job_ids AS (
SELECT uuid, uuid AS link_target_uuid, symlink_target_uuid
FROM jobs_view j
WHERE j.uuid=:jobUuid
UNION
SELECT jn.uuid, j.uuid AS link_target_uuid, j.symlink_target_uuid
FROM jobs_view j
INNER JOIN job_ids jn ON j.uuid=jn.symlink_target_uuid
)
SELECT j.*, n.name AS namespace_name
FROM jobs_view AS j
INNER JOIN job_ids jn ON jn.link_target_uuid=j.uuid AND jn.symlink_target_uuid IS NULL
INNER JOIN namespaces AS n ON j.namespace_uuid = n.uuid
""")
Optional<JobRow> findJobByUuidAsRow(UUID jobUuid);

@SqlQuery(
"""
WITH RECURSIVE job_ids AS (
Expand All @@ -100,7 +118,7 @@ WITH RECURSIVE job_ids AS (
INNER JOIN job_ids jn ON j.uuid=jn.symlink_target_uuid
)
SELECT j.*, n.name AS namespace_name
FROM jobs AS j
FROM jobs_view AS j
INNER JOIN job_ids jn ON jn.link_target_uuid=j.uuid AND jn.symlink_target_uuid IS NULL
INNER JOIN namespaces AS n ON j.namespace_uuid = n.uuid
""")
Expand Down Expand Up @@ -263,9 +281,9 @@ INSERT INTO jobs AS j (
current_inputs = EXCLUDED.current_inputs,
-- update the symlink target if not null. otherwise, keep the old value
symlink_target_uuid = COALESCE(EXCLUDED.symlink_target_uuid, j.symlink_target_uuid)
RETURNING *
RETURNING uuid
""")
JobRow upsertJob(
UUID upsertJobNoParent(
UUID uuid,
JobType type,
Instant now,
Expand All @@ -278,6 +296,34 @@ JobRow upsertJob(
UUID symlinkTargetId,
PGobject inputs);

default JobRow upsertJob(
UUID uuid,
JobType type,
Instant now,
UUID namespaceUuid,
String namespaceName,
String name,
String description,
UUID jobContextUuid,
String location,
UUID symlinkTargetId,
PGobject inputs) {
UUID jobUuid =
upsertJobNoParent(

@wslulciuc wslulciuc May 18, 2022

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't we want the upsertJobNoParent() call to the JobRow object similar to other upsert calls? This would keep contracts the same across DAOs but also avoid the subsequent findJobByUuidAsRow() call.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, do you mean that the upsertJobNoParent query would go back to RETURNING * instead of RETURNING uuid? If that's what you mean, I made this change so that the subsequent findJobByUuidAsRow call queries the jobs_view - returning the FQN rather than the simple name.

uuid,
type,
now,
namespaceUuid,
namespaceName,
name,
description,
jobContextUuid,
location,
symlinkTargetId,
inputs);
return findJobByUuidAsRow(jobUuid).get();
}

@SqlQuery(
"""
INSERT INTO jobs AS j (
Expand Down Expand Up @@ -318,9 +364,9 @@ INSERT INTO jobs AS j (
current_inputs = EXCLUDED.current_inputs,
-- update the symlink target if not null. otherwise, keep the old value
symlink_target_uuid = COALESCE(EXCLUDED.symlink_target_uuid, j.symlink_target_uuid)
RETURNING *
RETURNING uuid
""")
JobRow upsertJob(
UUID upsertJobWithParent(
UUID uuid,
UUID parentJobUuid,
JobType type,
Expand All @@ -333,4 +379,34 @@ JobRow upsertJob(
String location,
UUID symlinkTargetId,
PGobject inputs);

default JobRow upsertJob(
UUID uuid,
UUID parentJobUuid,
JobType type,
Instant now,
UUID namespaceUuid,
String namespaceName,
String name,
String description,
UUID jobContextUuid,
String location,
UUID symlinkTargetId,
PGobject inputs) {
UUID jobUuid =
upsertJobWithParent(
uuid,
parentJobUuid,
type,
now,
namespaceUuid,
namespaceName,
name,
description,
jobContextUuid,
location,
symlinkTargetId,
inputs);
return findJobByUuidAsRow(jobUuid).get();
}
}
Loading