Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions api/src/main/java/marquez/api/JobResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,7 @@ public Response delete(
.findJobByName(namespaceName.getValue(), jobName.getValue())
.orElseThrow(() -> new JobNotFoundException(jobName));

// Should be simple name from `jobs_fqn`.
jobService.delete(namespaceName.getValue(), job.getSimpleName());
jobService.delete(namespaceName.getValue(), job.getName().getValue());
return Response.ok(job).build();
}

Expand Down
5 changes: 0 additions & 5 deletions api/src/main/java/marquez/db/JobDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ SELECT run_uuid, JSON_AGG(e.facet) AS facets
GROUP BY e.run_uuid
) f ON f.run_uuid=jv.latest_run_uuid
WHERE j.namespace_name=:namespaceName AND (j.name=:jobName OR :jobName = ANY(j.aliases))
AND j.symlink_target_uuid IS NULL
""")
Optional<Job> findJobByName(String namespaceName, String jobName);

Expand Down Expand Up @@ -121,7 +120,6 @@ default Optional<Job> findWithRun(String namespaceName, String jobName) {
INNER JOIN namespaces AS n ON j.namespace_uuid = n.uuid
WHERE j.namespace_name=:namespaceName AND
(j.name=:jobName OR :jobName = ANY(j.aliases))
AND j.symlink_target_uuid IS NULL
""")
Optional<JobRow> findJobByNameAsRow(String namespaceName, String jobName);

Expand All @@ -143,7 +141,6 @@ SELECT run_uuid, JSON_AGG(e.facet) AS facets
GROUP BY e.run_uuid
) f ON f.run_uuid=jv.latest_run_uuid
WHERE j.namespace_name = :namespaceName
AND j.symlink_target_uuid IS NULL
ORDER BY j.name LIMIT :limit OFFSET :offset
""")
List<Job> findAll(String namespaceName, int limit, int offset);
Expand Down Expand Up @@ -292,7 +289,6 @@ JobRow upsertJob(
INSERT INTO jobs_view AS j (
uuid,
parent_job_uuid,
parent_job_uuid_string,
type,
created_at,
updated_at,
Expand All @@ -307,7 +303,6 @@ INSERT INTO jobs_view AS j (
) VALUES (
:uuid,
:parentJobUuid,
COALESCE(:parentJobUuid::text, ''),
:type,
:now,
:now,
Expand Down
14 changes: 7 additions & 7 deletions api/src/main/java/marquez/db/LineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ public interface LineageDao {
WITH RECURSIVE
job_io AS (
SELECT COALESCE(j.symlink_target_uuid, j.uuid) AS job_uuid,
ARRAY_AGG(DISTINCT j.uuid) AS ids,
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='INPUT') AS inputs,
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='OUTPUT') AS outputs
FROM job_versions_io_mapping io
INNER JOIN job_versions v ON io.job_version_uuid=v.uuid
INNER JOIN jobs_view j on j.current_version_uuid = v.uuid
LEFT JOIN jobs_view s ON s.uuid=j.symlink_target_uuid
WHERE s.current_version_uuid IS NULL
FROM jobs j
LEFT JOIN jobs_view s On s.uuid=j.symlink_target_uuid
LEFT JOIN job_versions v on v.uuid=COALESCE(s.current_version_uuid, j.current_version_uuid)
LEFT JOIN job_versions_io_mapping io ON io.job_version_uuid=v.uuid
GROUP BY COALESCE(j.symlink_target_uuid, j.uuid)
),
lineage(job_uuid, inputs, outputs) AS (
Expand All @@ -56,8 +56,8 @@ SELECT COALESCE(j.symlink_target_uuid, j.uuid) AS job_uuid,
COALESCE(outputs, Array[]::uuid[]) AS outputs,
0 AS depth
FROM jobs_view j
LEFT JOIN job_io io ON io.job_uuid=j.uuid OR j.symlink_target_uuid=io.job_uuid
WHERE j.uuid IN (<jobIds>)
INNER JOIN job_io io ON j.uuid=ANY(io.ids)
WHERE io.ids && ARRAY[<jobIds>]::uuid[]
UNION
SELECT io.job_uuid, io.inputs, io.outputs, l.depth + 1
FROM job_io io,
Expand Down
17 changes: 3 additions & 14 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -170,19 +170,8 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
Optional<UUID> parentUuid = parentRun.map(Utils::findParentRunUuid);

JobRow job =
runDao
.findJobRowByRunUuid(runToUuid(event.getRun().getRunId()))
.orElseGet(
() ->
buildJobFromEvent(
event,
mapper,
jobDao,
now,
namespace,
nominalStartTime,
nominalEndTime,
parentRun));
buildJobFromEvent(
event, mapper, jobDao, now, namespace, nominalStartTime, nominalEndTime, parentRun);

bag.setJob(job);

Expand Down Expand Up @@ -812,7 +801,7 @@ private List<ColumnLineageRow> upsertColumnLineage(
log.error(
"Cannot produce column lineage for missing output field in output dataset: {}",
columnName);
return Stream.<ColumnLineageRow>empty();
return Stream.empty();
}

// get field uuids of input columns related to this run
Expand Down
16 changes: 3 additions & 13 deletions api/src/main/java/marquez/db/RunDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -411,22 +411,12 @@ default RunRow upsertRunMeta(
void updateJobVersion(UUID runUuid, UUID jobVersionUuid);

@SqlQuery(
"""
WITH RECURSIVE job_names AS (
SELECT uuid, namespace_name, name, symlink_target_uuid
FROM jobs_view j
WHERE j.namespace_name=:namespace AND j.name=:jobName
UNION
SELECT j.uuid, j.namespace_name, j.name, j.symlink_target_uuid
FROM jobs_view j
INNER JOIN job_names jn ON j.uuid=jn.symlink_target_uuid OR j.symlink_target_uuid=jn.uuid
)
"""
+ BASE_FIND_RUN_SQL
BASE_FIND_RUN_SQL
+ """
WHERE r.uuid=(
SELECT r.uuid FROM runs_view r
INNER JOIN job_names j ON j.namespace_name=r.namespace_name AND j.name=r.job_name
INNER JOIN jobs_view j ON j.namespace_name=r.namespace_name AND j.name=r.job_name
WHERE j.namespace_name=:namespace AND (j.name=:jobName OR j.name=ANY(j.aliases))
ORDER BY transitioned_at DESC
LIMIT 1
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,26 @@
CREATE OR REPLACE VIEW jobs_view
AS
SELECT f.uuid,
f.job_fqn AS name,
f.namespace_name,
j.name AS simple_name,
SELECT j.uuid,
j.name,
j.namespace_name,
j.simple_name AS simple_name,
j.parent_job_uuid,
f.parent_job_name::text,
p.name::text AS parent_job_name,
j.type,
j.created_at,
j.updated_at,
f.namespace_uuid,
j.namespace_uuid,
j.description,
j.current_version_uuid,
j.current_job_context_uuid,
j.current_location,
j.current_inputs,
j.symlink_target_uuid,
j.parent_job_uuid_string,
f.aliases
FROM jobs_fqn f,
jobs j
WHERE j.uuid = f.uuid
AND j.is_hidden IS FALSE;

j.parent_job_uuid::char(36) AS parent_job_uuid_string,
j.aliases
FROM jobs j
LEFT JOIN jobs p ON j.parent_job_uuid=p.uuid
WHERE j.is_hidden IS FALSE AND j.symlink_target_uuid IS NULL;

CREATE OR REPLACE FUNCTION rewrite_jobs_fqn_table() RETURNS TRIGGER AS
$$
Expand All @@ -32,16 +30,24 @@ DECLARE
new_symlink_target_uuid uuid;
old_symlink_target_uuid uuid;
inserted_job jobs_view%rowtype;
full_name varchar;
BEGIN
INSERT INTO jobs (uuid, type, created_at, updated_at, namespace_uuid, name, description,
full_name = NEW.name;
IF NEW.parent_job_uuid IS NOT NULL THEN
SELECT p.name || '.' || NEW.name INTO full_name
FROM jobs p
WHERE p.uuid=NEW.parent_job_uuid;
END IF;
INSERT INTO jobs (uuid, type, created_at, updated_at, namespace_uuid, name, simple_name, description,
current_version_uuid, namespace_name, current_job_context_uuid,
current_location, current_inputs, symlink_target_uuid, parent_job_uuid,
parent_job_uuid_string, is_hidden)
is_hidden)
SELECT NEW.uuid,
NEW.type,
NEW.created_at,
NEW.updated_at,
NEW.namespace_uuid,
full_name,
NEW.name,
NEW.description,
NEW.current_version_uuid,
Expand All @@ -51,10 +57,14 @@ BEGIN
NEW.current_inputs,
NEW.symlink_target_uuid,
NEW.parent_job_uuid,
COALESCE(NEW.parent_job_uuid::char(36), ''),
false
ON CONFLICT (name, namespace_uuid, parent_job_uuid_string)
ON CONFLICT (namespace_uuid, name)
DO UPDATE SET updated_at = now(),
parent_job_uuid = COALESCE(jobs.parent_job_uuid, EXCLUDED.parent_job_uuid),
simple_name = CASE
WHEN EXCLUDED.parent_job_uuid IS NOT NULL THEN EXCLUDED.name
ELSE jobs.name
END,
type = EXCLUDED.type,
description = EXCLUDED.description,
current_job_context_uuid = EXCLUDED.current_job_context_uuid,
Expand All @@ -73,94 +83,38 @@ BEGIN
INTO job_uuid, job_updated_at, new_symlink_target_uuid, old_symlink_target_uuid;


-- update the jobs_fqn table when inserting a new record
-- (NEW.uuid will equal the job_uuid when inserting a new record)
-- AND if the symlink target is null
-- Avoid constructing the symlinks and aliases, as that is expensive
IF TG_OP='INSERT'
AND NEW.uuid = job_uuid
AND NEW.symlink_target_uuid IS NULL
AND NEW.updated_at=job_updated_at THEN
RAISE DEBUG 'Inserting into jobs_fqn for new job % (%)', NEW.name, job_uuid;
WITH fqn AS (SELECT j.uuid,
CASE
WHEN j.parent_job_uuid IS NULL THEN j.name
ELSE jf.job_fqn || '.' || j.name
END AS name,
j.namespace_uuid,
j.namespace_name,
jf.job_fqn AS parent_job_name,
j.parent_job_uuid
FROM jobs j
LEFT JOIN jobs_fqn jf ON jf.uuid=j.parent_job_uuid
WHERE j.uuid=job_uuid)
INSERT
INTO jobs_fqn
SELECT j.uuid,
jf.namespace_uuid,
jf.namespace_name,
jf.parent_job_name,
ARRAY[jf.name]::text[],
jf.name AS job_fqn
FROM jobs j
INNER JOIN fqn jf ON jf.uuid = j.uuid;
-- or when the symlink_target_uuid is being updated.
ELSIF (new_symlink_target_uuid IS NOT NULL AND new_symlink_target_uuid IS DISTINCT FROM old_symlink_target_uuid) THEN
RAISE DEBUG 'Updating jobs_fqn due to % to job % (%)', TG_OP, NEW.name, job_uuid;
-- update the jobs table when updating a job's symlink target
IF (new_symlink_target_uuid IS NOT NULL AND new_symlink_target_uuid IS DISTINCT FROM old_symlink_target_uuid) THEN
RAISE INFO 'Updating jobs aliases and symlinks due to % to job % (%)', TG_OP, NEW.name, job_uuid;
WITH RECURSIVE
jobs_symlink AS (SELECT j.uuid, j.uuid AS link_target_uuid, j.symlink_target_uuid
FROM jobs j
-- include only jobs that have symlinks pointing to them to keep this table small
INNER JOIN jobs js ON js.symlink_target_uuid=j.uuid
-- include only jobs that have symlinks pointing to them to keep this table small
INNER JOIN jobs js ON js.symlink_target_uuid=j.uuid
WHERE j.symlink_target_uuid IS NULL
UNION
SELECT j.uuid, jn.link_target_uuid, j.symlink_target_uuid
FROM jobs j
INNER JOIN jobs_symlink jn ON j.symlink_target_uuid = jn.uuid),
fqn AS (SELECT j.uuid,
CASE
WHEN j.parent_job_uuid IS NULL THEN j.name
ELSE jf.job_fqn || '.' || j.name
END AS name,
j.namespace_uuid,
j.namespace_name,
jf.job_fqn AS parent_job_name,
j.parent_job_uuid
FROM jobs j
LEFT JOIN jobs_fqn jf ON jf.uuid=j.parent_job_uuid
LEFT JOIN jobs_symlink js ON js.link_target_uuid=j.uuid
WHERE j.uuid=job_uuid OR j.symlink_target_uuid=job_uuid OR js.uuid=job_uuid
UNION
SELECT j1.uuid,
f.name || '.' || j1.name AS name,
f.namespace_uuid AS namespace_uuid,
f.namespace_name AS namespace_name,
f.name AS parent_job_name,
j1.parent_job_uuid
FROM jobs j1
INNER JOIN fqn f ON f.uuid = j1.parent_job_uuid),
INNER JOIN jobs_symlink jn ON j.symlink_target_uuid = jn.uuid),
aliases AS (SELECT s.link_target_uuid,
ARRAY_AGG(DISTINCT f.job_fqn) FILTER (WHERE f.job_fqn IS NOT NULL) AS aliases
ARRAY_AGG(DISTINCT f.name) AS aliases
FROM jobs_symlink s
INNER JOIN jobs_fqn f ON f.uuid = s.uuid
INNER JOIN jobs f ON f.uuid = s.uuid
GROUP BY s.link_target_uuid)
INSERT
INTO jobs_fqn
SELECT j.uuid,
jf.namespace_uuid,
jf.namespace_name,
jf.parent_job_name,
a.aliases,
jf.name AS job_fqn
FROM jobs j
LEFT JOIN jobs_symlink js ON j.uuid = js.uuid
LEFT JOIN aliases a ON a.link_target_uuid = js.link_target_uuid
INNER JOIN fqn jf ON jf.uuid = COALESCE(js.link_target_uuid, j.uuid)
ON CONFLICT (uuid) DO UPDATE
SET job_fqn=EXCLUDED.job_fqn,
aliases = (SELECT array_agg(DISTINCT a) FROM (SELECT unnest(jobs_fqn.aliases) AS a UNION SELECT unnest(EXCLUDED.aliases) AS a) al);
UPDATE jobs
SET aliases = j.aliases, symlink_target_uuid=j.link_target_uuid
FROM (
SELECT j.uuid,
CASE WHEN j.uuid=s.link_target_uuid THEN NULL ELSE s.link_target_uuid END AS link_target_uuid,
a.aliases
FROM jobs j
LEFT JOIN jobs_symlink s ON s.uuid=j.uuid
LEFT JOIN aliases a ON a.link_target_uuid = j.uuid
) j
WHERE jobs.uuid=j.uuid;
END IF;
SELECT * INTO inserted_job FROM jobs_view WHERE uuid=job_uuid;
SELECT * INTO inserted_job FROM jobs_view
WHERE uuid=job_uuid OR (new_symlink_target_uuid IS NOT NULL AND uuid=new_symlink_target_uuid);
return inserted_job;
END;
$$ LANGUAGE plpgsql;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ SELECT r.uuid,
ended_at,
job_context_uuid,
job_uuid,
j.name AS job_name,
j.namespace_name
COALESCE(s.name, j.name) AS job_name,
COALESCE(s.namespace_name, j.namespace_name) AS namespace_name
FROM runs r
INNER JOIN jobs_view j ON j.uuid = r.job_uuid;
INNER JOIN jobs j ON j.uuid = r.job_uuid
LEFT JOIN jobs_view s ON j.symlink_target_uuid=s.uuid;
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
UPDATE jobs SET symlink_target_uuid=q.target_uuid
FROM (
SELECT j.uuid, j.namespace_name, j.name, j.simple_name, jv.uuid AS target_uuid, jv.simple_name
FROM jobs_view j INNER JOIN jobs_view jv ON j.namespace_name=jv.namespace_name AND j.name=jv.name AND j.simple_name != jv.simple_name
WHERE j.symlink_target_uuid IS NULL
AND jv.symlink_target_uuid IS NULL
AND j.parent_job_uuid IS NULL) q
WHERE jobs.uuid=q.uuid;

ALTER TABLE jobs RENAME COLUMN name TO simple_name;
ALTER TABLE jobs ADD COLUMN name varchar;
ALTER TABLE jobs ADD COLUMN aliases varchar[];

WITH RECURSIVE
job_fqn AS (SELECT j.uuid,
j.simple_name AS simple_name,
j.simple_name AS name
FROM jobs j
WHERE j.parent_job_uuid IS NULL
UNION
SELECT j1.uuid,
j1.simple_name AS simple_name,
f.name || '.' || j1.simple_name AS name
FROM jobs j1
INNER JOIN job_fqn f ON f.uuid = j1.parent_job_uuid)
UPDATE jobs SET simple_name=f.simple_name, name=f.name
FROM job_fqn f
WHERE jobs.uuid=f.uuid;

ALTER TABLE jobs ALTER COLUMN name SET NOT NULL;

ALTER TABLE jobs DROP CONSTRAINT unique_jobs_namespace_uuid_name_parent;
ALTER TABLE jobs ADD CONSTRAINT unique_jobs_namespace_uuid_name_parent UNIQUE (namespace_uuid, name);
Loading