Skip to content

Commit 46e26da

Browse files
Runless events - refactor job_versions_io_mapping
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
1 parent a0cf6ab commit 46e26da

9 files changed

Lines changed: 117 additions & 48 deletions

api/src/main/java/marquez/db/JobVersionDao.java

Lines changed: 39 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -183,38 +183,65 @@ ExtendedJobVersionRow upsertJobVersion(
183183
/**
184184
* Used to link an input dataset to a given job version.
185185
*
186-
* @param jobVersionUuid The unique ID of the job version.
187186
* @param inputDatasetUuid The unique ID of the input dataset.
187+
* @param jobUuid The unique ID of the job.
188188
*/
189-
default void upsertInputDatasetFor(UUID jobVersionUuid, UUID inputDatasetUuid) {
190-
upsertInputOrOutputDatasetFor(jobVersionUuid, inputDatasetUuid, IoType.INPUT);
189+
default void upsertInputDatasetFor(UUID jobVersionUuid, UUID inputDatasetUuid, UUID jobUuid) {
190+
markVersionIOMappingNotCurrent(jobVersionUuid, jobUuid, IoType.INPUT);
191+
upsertCurrentInputOrOutputDatasetFor(jobVersionUuid, inputDatasetUuid, jobUuid, IoType.INPUT);
192+
// TODO: include this in test -> check if jobUuid is set
191193
}
192194

193195
/**
194196
* Used to link an output dataset to a given job version.
195197
*
196-
* @param jobVersionUuid The unique ID of the job version.
197198
* @param outputDatasetUuid The unique ID of the output dataset.
199+
* @param jobUuid The unique ID of the job.
198200
*/
199-
default void upsertOutputDatasetFor(UUID jobVersionUuid, UUID outputDatasetUuid) {
200-
upsertInputOrOutputDatasetFor(jobVersionUuid, outputDatasetUuid, IoType.OUTPUT);
201+
default void upsertOutputDatasetFor(UUID jobVersionUuid, UUID outputDatasetUuid, UUID jobUuid) {
202+
markVersionIOMappingNotCurrent(jobVersionUuid, jobUuid, IoType.OUTPUT);
203+
upsertCurrentInputOrOutputDatasetFor(jobVersionUuid, outputDatasetUuid, jobUuid, IoType.OUTPUT);
204+
// TODO: include this in test -> check if jobUuid is set
201205
}
202206

207+
@SqlUpdate(
208+
"""
209+
UPDATE job_versions_io_mapping
210+
SET is_job_version_current = FALSE
211+
WHERE (job_uuid = :jobUuid OR symlink_target_job_uuid = :jobUuid)
212+
AND job_version_uuid != :jobVersionUuid
213+
AND io_type = :ioType
214+
AND is_job_version_current = TRUE;
215+
""")
216+
void markVersionIOMappingNotCurrent(UUID jobVersionUuid, UUID jobUuid, IoType ioType);
217+
218+
@SqlUpdate(
219+
"""
220+
UPDATE job_versions_io_mapping
221+
SET is_job_version_current = FALSE
222+
WHERE (job_uuid = :jobUuid OR symlink_target_job_uuid = :jobUuid)
223+
AND io_type = :ioType
224+
AND is_job_version_current = TRUE;
225+
""")
226+
void markVersionIOMappingNotCurrent(UUID jobUuid, IoType ioType);
227+
203228
/**
204229
* Used to upsert an input or output dataset to a given job version.
205230
*
206231
* @param jobVersionUuid The unique ID of the job version.
207232
* @param datasetUuid The unique ID of the output dataset
208233
* @param ioType The {@link IoType} of the dataset.
234+
* @param jobUuid The unique ID of the job.
209235
*/
210236
@SqlUpdate(
211237
"""
212238
INSERT INTO job_versions_io_mapping (
213-
job_version_uuid, dataset_uuid, io_type)
214-
VALUES (:jobVersionUuid, :datasetUuid, :ioType)
215-
ON CONFLICT DO NOTHING
239+
job_version_uuid, dataset_uuid, io_type, job_uuid, is_job_version_current)
240+
VALUES (:jobVersionUuid, :datasetUuid, :ioType, :jobUuid, TRUE)
241+
ON CONFLICT (job_version_uuid, dataset_uuid, io_type, job_uuid) DO UPDATE SET is_job_version_current = TRUE
216242
""")
217-
void upsertInputOrOutputDatasetFor(UUID jobVersionUuid, UUID datasetUuid, IoType ioType);
243+
void upsertCurrentInputOrOutputDatasetFor(
244+
UUID jobVersionUuid, UUID datasetUuid, UUID jobUuid, IoType ioType);
218245

219246
/**
220247
* Returns the input datasets to a given job version.
@@ -344,14 +371,14 @@ default BagOfJobVersionInfo upsertJobVersionOnRunTransition(
344371
jobVersionInputs.forEach(
345372
jobVersionInput -> {
346373
jobVersionDao.upsertInputDatasetFor(
347-
jobVersionRow.getUuid(), jobVersionInput.getDatasetUuid());
374+
jobVersionRow.getUuid(), jobVersionInput.getDatasetUuid(), jobRow.getUuid());
348375
});
349376

350377
// Link the output datasets to the job version.
351378
jobVersionOutputs.forEach(
352379
jobVersionOutput -> {
353380
jobVersionDao.upsertOutputDatasetFor(
354-
jobVersionRow.getUuid(), jobVersionOutput.getDatasetUuid());
381+
jobVersionRow.getUuid(), jobVersionOutput.getDatasetUuid(), jobRow.getUuid());
355382
});
356383

357384
// Link the job version to the run.

api/src/main/java/marquez/db/LineageDao.java

Lines changed: 34 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -39,46 +39,51 @@ public interface LineageDao {
3939
@SqlQuery(
4040
"""
4141
WITH RECURSIVE
42-
-- Find the current version of a job or its symlink target if the target has no
43-
-- current_version_uuid. This ensures that we don't lose lineage for a job after it is
44-
-- symlinked to another job but before that target job has run successfully.
45-
job_current_version AS (
46-
SELECT COALESCE(j.symlink_target_uuid, j.uuid) AS job_uuid,
47-
COALESCE(s.current_version_uuid, j.current_version_uuid) AS job_version_uuid
48-
FROM jobs j
49-
LEFT JOIN jobs s ON s.uuid=j.symlink_target_uuid
50-
WHERE s.current_version_uuid IS NULL
51-
),
52-
job_io AS (
53-
SELECT j.job_uuid,
54-
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='INPUT') AS inputs,
55-
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='OUTPUT') AS outputs
42+
job_io AS (
43+
SELECT
44+
io.job_uuid AS job_uuid,
45+
io.symlink_target_job_uuid AS symlink_target_job_uuid,
46+
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io.io_type='INPUT') AS inputs,
47+
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io.io_type='OUTPUT') AS outputs
5648
FROM job_versions_io_mapping io
57-
INNER JOIN job_current_version j ON io.job_version_uuid=j.job_version_uuid
58-
GROUP BY j.job_uuid
49+
WHERE io.is_job_version_current = TRUE
50+
GROUP BY io.symlink_target_job_uuid, io.job_uuid
5951
),
60-
lineage(job_uuid, inputs, outputs) AS (
61-
SELECT v.job_uuid AS job_uuid,
52+
lineage(job_uuid, symlink_target_job_uuid, inputs, outputs) AS (
53+
SELECT job_uuid,
54+
symlink_target_job_uuid,
6255
COALESCE(inputs, Array[]::uuid[]) AS inputs,
6356
COALESCE(outputs, Array[]::uuid[]) AS outputs,
6457
0 AS depth
65-
FROM jobs j
66-
INNER JOIN job_current_version v ON (j.symlink_target_uuid IS NULL AND j.uuid=v.job_uuid) OR v.job_uuid=j.symlink_target_uuid
67-
LEFT JOIN job_io io ON io.job_uuid=v.job_uuid
68-
WHERE j.uuid IN (<jobIds>) OR j.symlink_target_uuid IN (<jobIds>)
58+
FROM job_io
59+
WHERE job_uuid IN (<jobIds>) OR symlink_target_job_uuid IN (<jobIds>)
6960
UNION
70-
SELECT io.job_uuid, io.inputs, io.outputs, l.depth + 1
71-
FROM job_io io,
72-
lineage l
73-
WHERE io.job_uuid != l.job_uuid AND
61+
SELECT io.job_uuid, io.symlink_target_job_uuid, io.inputs, io.outputs, l.depth + 1
62+
FROM job_io io, lineage l
63+
WHERE (io.job_uuid != l.job_uuid) AND
7464
array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs)
75-
AND depth < :depth)
65+
AND depth < :depth),
66+
lineage_outside_job_io(job_uuid) AS (
67+
SELECT
68+
param_jobs.param_job_uuid as job_uuid,
69+
j.symlink_target_uuid,
70+
Array[]::uuid[] AS inputs,
71+
Array[]::uuid[] AS outputs,
72+
0 AS depth
73+
FROM (SELECT unnest(ARRAY[<jobIds>]::UUID[]) AS param_job_uuid) param_jobs
74+
LEFT JOIN lineage l on param_jobs.param_job_uuid = l.job_uuid
75+
INNER JOIN jobs j ON j.uuid = param_jobs.param_job_uuid
76+
WHERE l.job_uuid IS NULL
77+
)
7678
SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids
77-
FROM lineage l2
78-
INNER JOIN jobs_view j ON j.uuid=l2.job_uuid;
79+
FROM (SELECT * FROM lineage UNION SELECT * FROM lineage_outside_job_io) l2
80+
INNER JOIN jobs_view j ON (j.uuid=l2.job_uuid OR j.uuid=l2.symlink_target_job_uuid)
7981
""")
8082
Set<JobData> getLineage(@BindList Set<UUID> jobIds, int depth);
8183

84+
// TODO: verify size of the lineage without (DISCTINCT -> recursion is growing but should not
85+
// happen)
86+
8287
@SqlQuery(
8388
"""
8489
SELECT ds.*, dv.fields, dv.lifecycle_state

api/src/main/java/marquez/db/OpenLineageDao.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import marquez.common.models.SourceType;
3434
import marquez.db.DatasetFieldDao.DatasetFieldMapping;
3535
import marquez.db.JobVersionDao.BagOfJobVersionInfo;
36+
import marquez.db.JobVersionDao.IoType;
3637
import marquez.db.mappers.LineageEventMapper;
3738
import marquez.db.models.ColumnLineageRow;
3839
import marquez.db.models.DatasetFieldRow;
@@ -225,6 +226,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
225226
DatasetDao datasetDao = createDatasetDao();
226227
SourceDao sourceDao = createSourceDao();
227228
JobDao jobDao = createJobDao();
229+
JobVersionDao jobVersionDao = createJobVersionDao();
228230
JobFacetsDao jobFacetsDao = createJobFacetsDao();
229231
DatasetVersionDao datasetVersionDao = createDatasetVersionDao();
230232
DatasetFieldDao datasetFieldDao = createDatasetFieldDao();
@@ -342,7 +344,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
342344

343345
// RunInput list uses null as a sentinel value
344346
List<DatasetRecord> datasetInputs = null;
345-
if (event.getInputs() != null) {
347+
if (event.getInputs() != null && !event.getInputs().isEmpty()) {
346348
datasetInputs = new ArrayList<>();
347349
for (Dataset dataset : event.getInputs()) {
348350
DatasetRecord record =
@@ -385,11 +387,15 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
385387
event.getEventType(),
386388
facets));
387389
}
390+
} else {
391+
// mark job_versions_io_mapping as non-current
392+
jobVersionDao.markVersionIOMappingNotCurrent(job.getUuid(), IoType.INPUT);
388393
}
394+
389395
bag.setInputs(Optional.ofNullable(datasetInputs));
390396
// RunInput list uses null as a sentinel value
391397
List<DatasetRecord> datasetOutputs = null;
392-
if (event.getOutputs() != null) {
398+
if (event.getOutputs() != null && !event.getOutputs().isEmpty()) {
393399
datasetOutputs = new ArrayList<>();
394400
for (Dataset dataset : event.getOutputs()) {
395401
DatasetRecord record =
@@ -432,6 +438,9 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
432438
event.getEventType(),
433439
facets));
434440
}
441+
} else {
442+
// mark job_versions_io_mapping as non-current
443+
jobVersionDao.markVersionIOMappingNotCurrent(job.getUuid(), IoType.OUTPUT);
435444
}
436445

437446
bag.setOutputs(Optional.ofNullable(datasetOutputs));

api/src/main/resources/marquez/db/migration/R__1_Jobs_view_and_rewrite_function.sql

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,10 @@ BEGIN
111111
LEFT JOIN aliases a ON a.link_target_uuid = j.uuid
112112
) j
113113
WHERE jobs.uuid=j.uuid;
114+
UPDATE job_versions_io_mapping
115+
SET symlink_target_job_uuid=j.symlink_target_uuid
116+
FROM jobs j
117+
WHERE job_versions_io_mapping.job_uuid=j.uuid AND j.uuid = NEW.uuid;
114118
END IF;
115119
SELECT * INTO inserted_job FROM jobs_view
116120
WHERE uuid=job_uuid OR (new_symlink_target_uuid IS NOT NULL AND uuid=new_symlink_target_uuid);
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
ALTER TABLE job_versions_io_mapping ADD COLUMN job_uuid uuid REFERENCES jobs(uuid) ON DELETE CASCADE;
2+
ALTER TABLE job_versions_io_mapping ADD COLUMN symlink_target_job_uuid uuid REFERENCES jobs(uuid) ON DELETE CASCADE;
3+
ALTER TABLE job_versions_io_mapping ADD COLUMN is_job_version_current boolean DEFAULT FALSE;
4+
-- TODO: create index for lineage query and update
5+
6+
CREATE INDEX job_versions_io_mapping_job_uuid ON job_versions_io_mapping (job_uuid);
7+
8+
-- To add job_uuid to the unique constraint, we first drop the primary key, then recreate it; note given that job_version_uuid can be NULL, we need to check that job_version_uuid != NULL before inserting (duplicate columns otherwise)
9+
ALTER TABLE job_versions_io_mapping DROP CONSTRAINT job_versions_io_mapping_pkey;
10+
ALTER TABLE job_versions_io_mapping ADD CONSTRAINT job_versions_io_mapping_pkey UNIQUE (job_version_uuid,dataset_uuid,io_type,job_uuid);
11+
12+
-- TODO: add a test which verifies correctness for UNIQUE <- adds multiple rows to job_versions_io_mapping
13+
14+
-- TODO: take care of is_current
15+
-- TODO: take care of symlink_job_uuid
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
UPDATE job_versions_io_mapping
2+
SET job_uuid = job_versions.job_uuid
3+
FROM job_versions
4+
WHERE job_versions_io_mapping.job_version_uuid = job_versions.uuid;
5+
6+
-- TODO: include a test for that, can be a migration test

api/src/test/java/marquez/db/JobVersionDaoTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,8 @@ public void testGetJobVersion() {
172172
.orElseThrow(
173173
() -> new IllegalStateException("Can't find test dataset " + ds.getName()));
174174

175-
jobVersionDao.upsertInputDatasetFor(jobVersionRow.getUuid(), dataset.getUuid());
175+
jobVersionDao.upsertInputDatasetFor(
176+
jobVersionRow.getUuid(), dataset.getUuid(), jobRow.getUuid());
176177
}
177178
for (DatasetId ds : jobMeta.getOutputs()) {
178179
DatasetRow dataset =
@@ -181,7 +182,8 @@ public void testGetJobVersion() {
181182
.orElseThrow(
182183
() -> new IllegalStateException("Can't find test dataset " + ds.getName()));
183184

184-
jobVersionDao.upsertOutputDatasetFor(jobVersionRow.getUuid(), dataset.getUuid());
185+
jobVersionDao.upsertOutputDatasetFor(
186+
jobVersionRow.getUuid(), dataset.getUuid(), jobVersionRow.getJobUuid());
185187
}
186188
Optional<JobVersion> jobVersion =
187189
jobVersionDao.findJobVersion(namespaceRow.getName(), jobRow.getName(), version.getValue());

api/src/test/java/marquez/db/LineageDaoTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,6 @@ public void testGetLineage() {
169169

170170
@Test
171171
public void testGetLineageForSymlinkedJob() throws SQLException {
172-
173172
UpdateLineageRow writeJob =
174173
LineageTestUtils.createLineageRow(
175174
openLineageDao,

api/src/test/java/marquez/db/TestingDb.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,10 @@ JobVersionRow upsert(@NonNull JobVersionRow row) {
186186
row.getJobName(),
187187
row.getNamespaceUuid(),
188188
row.getNamespaceName());
189-
row.getInputUuids().forEach(in -> dao.upsertInputDatasetFor(row.getUuid(), in));
190-
row.getInputUuids().forEach(out -> dao.upsertInputDatasetFor(row.getUuid(), out));
189+
row.getInputUuids()
190+
.forEach(in -> dao.upsertInputDatasetFor(row.getUuid(), in, row.getJobUuid()));
191+
row.getInputUuids()
192+
.forEach(out -> dao.upsertInputDatasetFor(row.getUuid(), out, row.getJobUuid()));
191193
// ...
192194
delegate.onDemand(JobDao.class).updateVersionFor(row.getJobUuid(), NOW, upserted.getUuid());
193195
return upserted;

0 commit comments

Comments
 (0)