Skip to content

Commit 96781af

Browse files
committed
deletes: 'undelete' job on subsequent OL event
Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>
1 parent 4b09c74 commit 96781af

2 files changed

Lines changed: 41 additions & 6 deletions

File tree

api/src/main/resources/marquez/db/migration/R__1_Jobs_view_and_rewrite_function.sql

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ BEGIN
3535
INSERT INTO jobs (uuid, type, created_at, updated_at, namespace_uuid, name, description,
3636
current_version_uuid, namespace_name, current_job_context_uuid,
3737
current_location, current_inputs, symlink_target_uuid, parent_job_uuid,
38-
parent_job_uuid_string)
38+
parent_job_uuid_string, is_hidden)
3939
SELECT NEW.uuid,
4040
NEW.type,
4141
NEW.created_at,
@@ -50,7 +50,8 @@ BEGIN
5050
NEW.current_inputs,
5151
NEW.symlink_target_uuid,
5252
NEW.parent_job_uuid,
53-
COALESCE(NEW.parent_job_uuid::char(36), '')
53+
COALESCE(NEW.parent_job_uuid::char(36), ''),
54+
false
5455
ON CONFLICT (name, namespace_uuid, parent_job_uuid_string)
5556
DO UPDATE SET updated_at = EXCLUDED.updated_at,
5657
type = EXCLUDED.type,
@@ -60,8 +61,9 @@ BEGIN
6061
current_inputs = EXCLUDED.current_inputs,
6162
-- update the symlink target if null. otherwise, keep the old value
6263
symlink_target_uuid = COALESCE(jobs.symlink_target_uuid,
63-
EXCLUDED.symlink_target_uuid)
64-
-- the SELECT statement below will get the OLD symlink_target_uuid in case of update and the NEW
64+
EXCLUDED.symlink_target_uuid),
65+
is_hidden = false
66+
-- the SELECT statement below will get the OLD symlink_target_uuid in case of update and the NEW
6567
-- version in case of insert
6668
RETURNING uuid, symlink_target_uuid, (SELECT symlink_target_uuid FROM jobs j2 WHERE j2.uuid=jobs.uuid)
6769
INTO job_uuid, new_symlink_target_uuid, old_symlink_target_uuid;

api/src/test/java/marquez/service/OpenLineageServiceIntegrationTest.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,11 @@ public class OpenLineageServiceIntegrationTest {
7070
public static final ZoneId TIMEZONE = ZoneId.of("America/Los_Angeles");
7171
public static final String DATASET_NAME = "theDataset";
7272
private RunService runService;
73+
74+
private JobService jobService;
7375
private OpenLineageDao openLineageDao;
76+
77+
private JobDao jobDao;
7478
private DatasetDao datasetDao;
7579
private DatasetVersionDao datasetVersionDao;
7680
private ArgumentCaptor<JobInputUpdate> runInputListener;
@@ -134,7 +138,9 @@ public ExpectedResults(
134138
public void setup(Jdbi jdbi) throws SQLException {
135139
openLineageDao = jdbi.onDemand(OpenLineageDao.class);
136140
datasetVersionDao = jdbi.onDemand(DatasetVersionDao.class);
141+
jobDao = jdbi.onDemand(JobDao.class);
137142
runService = mock(RunService.class);
143+
jobService = new JobService(jobDao, runService);
138144
runInputListener = ArgumentCaptor.forClass(JobInputUpdate.class);
139145
doNothing().when(runService).notify(runInputListener.capture());
140146
runOutputListener = ArgumentCaptor.forClass(JobOutputUpdate.class);
@@ -146,8 +152,7 @@ public void setup(Jdbi jdbi) throws SQLException {
146152
jdbi.onDemand(NamespaceDao.class)
147153
.upsertNamespaceRow(UUID.randomUUID(), Instant.now(), NAMESPACE, "me");
148154
JobRow job =
149-
jdbi.onDemand(JobDao.class)
150-
.upsertJob(
155+
jobDao.upsertJob(
151156
UUID.randomUUID(),
152157
JobType.BATCH,
153158
Instant.now(),
@@ -382,6 +387,34 @@ public void testDatasetVersionUpdatedOnRunCompletion()
382387
.contains(dsVersion1Id);
383388
}
384389

390+
@Test
391+
void testJobIsNotHiddenAfterSubsequentOLEvent() throws ExecutionException, InterruptedException {
392+
String name = "aNotHiddenJob";
393+
394+
LineageEvent.LineageEventBuilder builder = LineageEvent.builder()
395+
.eventType("COMPLETE")
396+
.job(LineageEvent.Job.builder().name(name).namespace(NAMESPACE).build())
397+
.eventTime(Instant.now().atZone(TIMEZONE))
398+
.inputs(Collections.emptyList())
399+
.outputs(Collections.emptyList());
400+
401+
lineageService.createAsync(
402+
builder.run(new LineageEvent.Run(UUID.randomUUID().toString(), RunFacet.builder().build())).build()
403+
).get();
404+
405+
assertThat(jobService.findJobByName(NAMESPACE, name)).isNotEmpty();
406+
407+
jobService.delete(NAMESPACE, name);
408+
409+
assertThat(jobService.findJobByName(NAMESPACE, name)).isEmpty();
410+
411+
lineageService.createAsync(
412+
builder.run(new LineageEvent.Run(UUID.randomUUID().toString(), RunFacet.builder().build())).build()
413+
).get();
414+
415+
assertThat(jobService.findJobByName(NAMESPACE, name)).isNotEmpty();
416+
}
417+
385418
private void checkExists(LineageEvent.Dataset ds) {
386419
DatasetService datasetService = new DatasetService(openLineageDao, runService);
387420

0 commit comments

Comments
 (0)