Skip to content

Commit f507bfe

Browse files
committed
deletes: 'undelete' job on subsequent OL event
Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>
1 parent 4b09c74 commit f507bfe

2 files changed

Lines changed: 59 additions & 17 deletions

File tree

api/src/main/resources/marquez/db/migration/R__1_Jobs_view_and_rewrite_function.sql

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ BEGIN
3535
INSERT INTO jobs (uuid, type, created_at, updated_at, namespace_uuid, name, description,
3636
current_version_uuid, namespace_name, current_job_context_uuid,
3737
current_location, current_inputs, symlink_target_uuid, parent_job_uuid,
38-
parent_job_uuid_string)
38+
parent_job_uuid_string, is_hidden)
3939
SELECT NEW.uuid,
4040
NEW.type,
4141
NEW.created_at,
@@ -50,7 +50,8 @@ BEGIN
5050
NEW.current_inputs,
5151
NEW.symlink_target_uuid,
5252
NEW.parent_job_uuid,
53-
COALESCE(NEW.parent_job_uuid::char(36), '')
53+
COALESCE(NEW.parent_job_uuid::char(36), ''),
54+
false
5455
ON CONFLICT (name, namespace_uuid, parent_job_uuid_string)
5556
DO UPDATE SET updated_at = EXCLUDED.updated_at,
5657
type = EXCLUDED.type,
@@ -60,8 +61,9 @@ BEGIN
6061
current_inputs = EXCLUDED.current_inputs,
6162
-- update the symlink target if null. otherwise, keep the old value
6263
symlink_target_uuid = COALESCE(jobs.symlink_target_uuid,
63-
EXCLUDED.symlink_target_uuid)
64-
-- the SELECT statement below will get the OLD symlink_target_uuid in case of update and the NEW
64+
EXCLUDED.symlink_target_uuid),
65+
is_hidden = false
66+
-- the SELECT statement below will get the OLD symlink_target_uuid in case of update and the NEW
6567
-- version in case of insert
6668
RETURNING uuid, symlink_target_uuid, (SELECT symlink_target_uuid FROM jobs j2 WHERE j2.uuid=jobs.uuid)
6769
INTO job_uuid, new_symlink_target_uuid, old_symlink_target_uuid;

api/src/test/java/marquez/service/OpenLineageServiceIntegrationTest.java

Lines changed: 53 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,11 @@ public class OpenLineageServiceIntegrationTest {
7070
public static final ZoneId TIMEZONE = ZoneId.of("America/Los_Angeles");
7171
public static final String DATASET_NAME = "theDataset";
7272
private RunService runService;
73+
74+
private JobService jobService;
7375
private OpenLineageDao openLineageDao;
76+
77+
private JobDao jobDao;
7478
private DatasetDao datasetDao;
7579
private DatasetVersionDao datasetVersionDao;
7680
private ArgumentCaptor<JobInputUpdate> runInputListener;
@@ -134,7 +138,9 @@ public ExpectedResults(
134138
public void setup(Jdbi jdbi) throws SQLException {
135139
openLineageDao = jdbi.onDemand(OpenLineageDao.class);
136140
datasetVersionDao = jdbi.onDemand(DatasetVersionDao.class);
141+
jobDao = jdbi.onDemand(JobDao.class);
137142
runService = mock(RunService.class);
143+
jobService = new JobService(jobDao, runService);
138144
runInputListener = ArgumentCaptor.forClass(JobInputUpdate.class);
139145
doNothing().when(runService).notify(runInputListener.capture());
140146
runOutputListener = ArgumentCaptor.forClass(JobOutputUpdate.class);
@@ -146,19 +152,18 @@ public void setup(Jdbi jdbi) throws SQLException {
146152
jdbi.onDemand(NamespaceDao.class)
147153
.upsertNamespaceRow(UUID.randomUUID(), Instant.now(), NAMESPACE, "me");
148154
JobRow job =
149-
jdbi.onDemand(JobDao.class)
150-
.upsertJob(
151-
UUID.randomUUID(),
152-
JobType.BATCH,
153-
Instant.now(),
154-
namespace.getUuid(),
155-
NAMESPACE,
156-
"parentJob",
157-
"description",
158-
null,
159-
null,
160-
null,
161-
null);
155+
jobDao.upsertJob(
156+
UUID.randomUUID(),
157+
JobType.BATCH,
158+
Instant.now(),
159+
namespace.getUuid(),
160+
NAMESPACE,
161+
"parentJob",
162+
"description",
163+
null,
164+
null,
165+
null,
166+
null);
162167
Map<String, String> runArgsMap = new HashMap<>();
163168
RunArgsRow argsRow =
164169
jdbi.onDemand(RunArgsDao.class)
@@ -382,6 +387,41 @@ public void testDatasetVersionUpdatedOnRunCompletion()
382387
.contains(dsVersion1Id);
383388
}
384389

390+
@Test
391+
void testJobIsNotHiddenAfterSubsequentOLEvent() throws ExecutionException, InterruptedException {
392+
String name = "aNotHiddenJob";
393+
394+
LineageEvent.LineageEventBuilder builder =
395+
LineageEvent.builder()
396+
.eventType("COMPLETE")
397+
.job(LineageEvent.Job.builder().name(name).namespace(NAMESPACE).build())
398+
.eventTime(Instant.now().atZone(TIMEZONE))
399+
.inputs(Collections.emptyList())
400+
.outputs(Collections.emptyList());
401+
402+
lineageService
403+
.createAsync(
404+
builder
405+
.run(new LineageEvent.Run(UUID.randomUUID().toString(), RunFacet.builder().build()))
406+
.build())
407+
.get();
408+
409+
assertThat(jobService.findJobByName(NAMESPACE, name)).isNotEmpty();
410+
411+
jobService.delete(NAMESPACE, name);
412+
413+
assertThat(jobService.findJobByName(NAMESPACE, name)).isEmpty();
414+
415+
lineageService
416+
.createAsync(
417+
builder
418+
.run(new LineageEvent.Run(UUID.randomUUID().toString(), RunFacet.builder().build()))
419+
.build())
420+
.get();
421+
422+
assertThat(jobService.findJobByName(NAMESPACE, name)).isNotEmpty();
423+
}
424+
385425
private void checkExists(LineageEvent.Dataset ds) {
386426
DatasetService datasetService = new DatasetService(openLineageDao, runService);
387427

0 commit comments

Comments
 (0)