@@ -37,35 +37,39 @@ public interface LineageDao {
3737 * @return
3838 */
3939 @ SqlQuery (
40- // dataset_ids: all the input and output datasets of the current version of the specified jobs
41- "WITH RECURSIVE\n "
42- + " job_io AS (\n "
43- + " SELECT j.uuid AS job_uuid,\n "
44- + " ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='INPUT') AS inputs,\n "
45- + " ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='OUTPUT') AS outputs\n "
46- + " FROM jobs_view j\n "
47- + " LEFT JOIN jobs_view s ON s.symlink_target_uuid=j.uuid\n "
48- + " LEFT JOIN job_versions v on COALESCE(j.current_version_uuid, s.current_version_uuid) = v.uuid\n "
49- + " LEFT JOIN job_versions_io_mapping io on v.uuid = io.job_version_uuid\n "
50- + " GROUP BY j.uuid\n "
51- + " ),\n "
52- + " lineage(job_uuid, inputs, outputs) AS (\n "
53- + " SELECT job_uuid, inputs, outputs, 0 AS depth\n "
54- + " FROM job_io\n "
55- + " WHERE job_uuid IN (<jobIds>)\n "
56- + " UNION\n "
57- + " SELECT io.job_uuid, io.inputs, io.outputs, l.depth + 1\n "
58- + " FROM job_io io,\n "
59- + " lineage l\n "
60- + " WHERE io.job_uuid != l.job_uuid AND\n "
61- + " array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs)\n "
62- + " AND depth < :depth"
63- + " )\n "
64- + "SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids, jc.context\n "
65- + "FROM lineage l2\n "
66- + "INNER JOIN jobs_view s ON s.uuid=l2.job_uuid\n "
67- + "INNER JOIN jobs_view j ON j.uuid=COALESCE(s.symlink_target_uuid, s.uuid)\n "
68- + "LEFT JOIN job_contexts jc on jc.uuid = j.current_job_context_uuid" )
40+ """
41+ WITH RECURSIVE
42+ job_io AS (
43+ SELECT COALESCE(j.symlink_target_uuid, j.uuid) AS job_uuid,
44+ ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='INPUT') AS inputs,
45+ ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='OUTPUT') AS outputs
46+ FROM job_versions_io_mapping io
47+ INNER JOIN job_versions v ON io.job_version_uuid=v.uuid
48+ INNER JOIN jobs_view j on j.current_version_uuid = v.uuid
49+ LEFT JOIN jobs_view s ON s.uuid=j.symlink_target_uuid
50+ WHERE s.current_version_uuid IS NULL
51+ GROUP BY COALESCE(j.symlink_target_uuid, j.uuid)
52+ ),
53+ lineage(job_uuid, inputs, outputs) AS (
54+ SELECT COALESCE(j.symlink_target_uuid, j.uuid) AS job_uuid,
55+ COALESCE(inputs, Array[]::uuid[]) AS inputs,
56+ COALESCE(outputs, Array[]::uuid[]) AS outputs,
57+ 0 AS depth
58+ FROM jobs_view j
59+ LEFT JOIN job_io io ON io.job_uuid=j.uuid OR j.symlink_target_uuid=io.job_uuid
60+ WHERE j.uuid IN (<jobIds>)
61+ UNION
62+ SELECT io.job_uuid, io.inputs, io.outputs, l.depth + 1
63+ FROM job_io io,
64+ lineage l
65+ WHERE io.job_uuid != l.job_uuid AND
66+ array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs)
67+ AND depth < :depth)
68+ SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids, jc.context
69+ FROM lineage l2
70+ INNER JOIN jobs_view j ON j.uuid=l2.job_uuid
71+ LEFT JOIN job_contexts jc on jc.uuid = j.current_job_context_uuid;
72+ """ )
6973 Set <JobData > getLineage (@ BindList Set <UUID > jobIds , int depth );
7074
7175 @ SqlQuery (
0 commit comments