Skip to content

Commit bdcfec7

Browse files
Runless events - consume job event
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
1 parent 3aacc39 commit bdcfec7

16 files changed

Lines changed: 582 additions & 98 deletions

api/src/main/java/marquez/api/JobResource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public Response getJob(
110110

111111
final Job job =
112112
jobService
113-
.findWithRun(namespaceName.getValue(), jobName.getValue())
113+
.findWithDatasetsAndRun(namespaceName.getValue(), jobName.getValue())
114114
.orElseThrow(() -> new JobNotFoundException(jobName));
115115
return Response.ok(job).build();
116116
}

api/src/main/java/marquez/api/OpenLineageResource.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import marquez.service.ServiceFactory;
4141
import marquez.service.models.BaseEvent;
4242
import marquez.service.models.DatasetEvent;
43+
import marquez.service.models.JobEvent;
4344
import marquez.service.models.LineageEvent;
4445
import marquez.service.models.NodeId;
4546

@@ -73,6 +74,10 @@ public void create(@Valid @NotNull BaseEvent event, @Suspended final AsyncRespon
7374
openLineageService
7475
.createAsync((DatasetEvent) event)
7576
.whenComplete((result, err) -> onComplete(result, err, asyncResponse));
77+
} else if (event instanceof JobEvent) {
78+
openLineageService
79+
.createAsync((JobEvent) event)
80+
.whenComplete((result, err) -> onComplete(result, err, asyncResponse));
7681
} else {
7782
log.warn("Unsupported event type {}. Skipping without error", event.getClass().getName());
7883

api/src/main/java/marquez/db/JobDao.java

Lines changed: 41 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import marquez.common.models.JobName;
2121
import marquez.common.models.JobType;
2222
import marquez.common.models.NamespaceName;
23+
import marquez.db.JobVersionDao.IoType;
24+
import marquez.db.JobVersionDao.JobDataset;
25+
import marquez.db.JobVersionDao.JobDatasetMapper;
2326
import marquez.db.mappers.JobMapper;
2427
import marquez.db.mappers.JobRowMapper;
2528
import marquez.db.models.JobRow;
@@ -34,6 +37,7 @@
3437

3538
@RegisterRowMapper(JobRowMapper.class)
3639
@RegisterRowMapper(JobMapper.class)
40+
@RegisterRowMapper(JobDatasetMapper.class)
3741
public interface JobDao extends BaseDao {
3842

3943
@SqlQuery(
@@ -56,21 +60,14 @@ SELECT EXISTS (
5660

5761
@SqlQuery(
5862
"""
59-
SELECT j.*, f.facets
63+
WITH job_versions_facets AS (
64+
SELECT job_version_uuid, JSON_AGG(facet) as facets
65+
FROM job_facets
66+
GROUP BY job_version_uuid
67+
)
68+
SELECT j.*, facets
6069
FROM jobs_view j
61-
LEFT OUTER JOIN job_versions AS jv ON jv.uuid = j.current_version_uuid
62-
LEFT OUTER JOIN (
63-
SELECT run_uuid, JSON_AGG(e.facet) AS facets
64-
FROM (
65-
SELECT jf.run_uuid, jf.facet
66-
FROM job_facets_view AS jf
67-
INNER JOIN job_versions jv2 ON jv2.latest_run_uuid=jf.run_uuid
68-
INNER JOIN jobs_view j2 ON j2.current_version_uuid=jv2.uuid
69-
WHERE j2.name=:jobName AND j2.namespace_name=:namespaceName
70-
ORDER BY lineage_event_time ASC
71-
) e
72-
GROUP BY e.run_uuid
73-
) f ON f.run_uuid=jv.latest_run_uuid
70+
LEFT OUTER JOIN job_versions_facets f ON j.current_version_uuid = f.job_version_uuid
7471
WHERE j.namespace_name=:namespaceName AND (j.name=:jobName OR :jobName = ANY(j.aliases))
7572
""")
7673
Optional<Job> findJobByName(String namespaceName, String jobName);
@@ -94,12 +91,18 @@ SELECT run_uuid, JSON_AGG(e.facet) AS facets
9491
""")
9592
void deleteByNamespaceName(String namespaceName);
9693

97-
default Optional<Job> findWithRun(String namespaceName, String jobName) {
94+
default Optional<Job> findWithDatasetsAndRun(String namespaceName, String jobName) {
9895
Optional<Job> job = findJobByName(namespaceName, jobName);
9996
job.ifPresent(
10097
j -> {
10198
Optional<Run> run = createRunDao().findByLatestJob(namespaceName, jobName);
102-
run.ifPresent(r -> this.setJobData(r, j));
99+
run.ifPresentOrElse(
100+
r -> this.setJobData(r, j),
101+
() ->
102+
this.setJobData(
103+
createJobVersionDao()
104+
.findCurrentInputOutputDatasetsFor(namespaceName, jobName),
105+
j));
103106
});
104107
return job;
105108
}
@@ -200,6 +203,28 @@ default List<Job> findAllWithRun(String namespaceName, int limit, int offset) {
200203
.collect(Collectors.toList());
201204
}
202205

206+
default void setJobData(List<JobDataset> datasets, Job j) {
207+
Optional.of(
208+
datasets.stream()
209+
.filter(d -> d.ioType().equals(IoType.INPUT))
210+
.map(
211+
ds ->
212+
new DatasetId(NamespaceName.of(ds.namespace()), DatasetName.of(ds.name())))
213+
.collect(Collectors.toSet()))
214+
.filter(s -> !s.isEmpty())
215+
.ifPresent(s -> j.setInputs(s));
216+
217+
Optional.of(
218+
datasets.stream()
219+
.filter(d -> d.ioType().equals(IoType.OUTPUT))
220+
.map(
221+
ds ->
222+
new DatasetId(NamespaceName.of(ds.namespace()), DatasetName.of(ds.name())))
223+
.collect(Collectors.toSet()))
224+
.filter(s -> !s.isEmpty())
225+
.ifPresent(s -> j.setOutputs(s));
226+
}
227+
203228
default void setJobData(Run run, Job j) {
204229
j.setLatestRun(run);
205230
DatasetVersionDao datasetVersionDao = createDatasetVersionDao();

api/src/main/java/marquez/db/JobFacetsDao.java

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.util.Spliterators;
1212
import java.util.UUID;
1313
import java.util.stream.StreamSupport;
14+
import javax.annotation.Nullable;
1415
import lombok.NonNull;
1516
import marquez.common.Utils;
1617
import marquez.db.mappers.JobFacetsMapper;
@@ -55,6 +56,32 @@ void insertJobFacet(
5556
String name,
5657
PGobject facet);
5758

59+
@SqlUpdate(
60+
"""
61+
INSERT INTO job_facets (
62+
created_at,
63+
job_uuid,
64+
job_version_uuid,
65+
lineage_event_time,
66+
name,
67+
facet
68+
) VALUES (
69+
:createdAt,
70+
:jobUuid,
71+
:jobVersionUuid,
72+
:lineageEventTime,
73+
:name,
74+
:facet
75+
)
76+
""")
77+
void insertJobFacet(
78+
Instant createdAt,
79+
UUID jobUuid,
80+
UUID jobVersionUuid,
81+
Instant lineageEventTime,
82+
String name,
83+
PGobject facet);
84+
5885
@SqlQuery(
5986
"""
6087
SELECT
@@ -72,9 +99,31 @@ void insertJobFacet(
7299
@Transaction
73100
default void insertJobFacetsFor(
74101
@NonNull UUID jobUuid,
75-
@NonNull UUID runUuid,
102+
@NonNull UUID jobVersionUuid,
103+
@NonNull Instant lineageEventTime,
104+
@NonNull LineageEvent.JobFacet jobFacet) {
105+
final Instant now = Instant.now();
106+
107+
JsonNode jsonNode = Utils.getMapper().valueToTree(jobFacet);
108+
StreamSupport.stream(
109+
Spliterators.spliteratorUnknownSize(jsonNode.fieldNames(), Spliterator.DISTINCT), false)
110+
.forEach(
111+
fieldName ->
112+
insertJobFacet(
113+
now,
114+
jobUuid,
115+
jobVersionUuid,
116+
lineageEventTime,
117+
fieldName,
118+
FacetUtils.toPgObject(fieldName, jsonNode.get(fieldName))));
119+
}
120+
121+
@Transaction
122+
default void insertJobFacetsFor(
123+
@NonNull UUID jobUuid,
124+
@Nullable UUID runUuid,
76125
@NonNull Instant lineageEventTime,
77-
@NonNull String lineageEventType,
126+
@Nullable String lineageEventType,
78127
@NonNull LineageEvent.JobFacet jobFacet) {
79128
final Instant now = Instant.now();
80129

0 commit comments

Comments
 (0)