Skip to content

Commit 60d7d90

Browse files
Runless events - consume job event (#2661)
* Runless events - consume job event Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com> * Runless events - fix listLineage API Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com> * fix rebase Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com> * modify event type column Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com> * make _event_type varchar(64) instead of enum Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com> --------- Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
1 parent 3a26e50 commit 60d7d90

22 files changed

Lines changed: 999 additions & 111 deletions

CHANGELOG.md

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22

33
## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.42.0...HEAD)
44

5+
### Added
6+
* API: support `DatasetEvent` [`#2641`](https://github.com/MarquezProject/marquez/pull/2641) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
7+
*Save into Marquez model datasets sent via `DatasetEvent` event type
8+
* API: support `JobEvent` [`#2661`](https://github.com/MarquezProject/marquez/pull/2661) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
9+
*Save into Marquez model jobs and datasets sent via `JobEvent` event type.
10+
511
## [0.42.0](https://github.com/MarquezProject/marquez/compare/0.41.0...0.42.0) - 2023-10-17
612
### Added
713
* Client: add Java client method for dataset/job lineage [`#2623`](https://github.com/MarquezProject/marquez/pull/2623) [@davidjgoss](https://github.com/davidjgoss)
@@ -28,10 +34,6 @@
2834
* Web: fix Unix epoch time display for null `endedAt` values [`#2647`](https://github.com/MarquezProject/marquez/pull/2647) [@merobi-hub](https://github.com/merobi-hub)
2935
*Fixes the issue of the GUI displaying Unix epoch time (midnight on January 1, 1970) in the case of running jobs/null `endedAt` values.*
3036

31-
### Added
32-
* API: support `DatasetEvent` [`#2641`](https://github.com/MarquezProject/marquez/pull/2641) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
33-
*Save into Marquez model datasets sent via `DatasetEvent` event type
34-
3537
## [0.41.0](https://github.com/MarquezProject/marquez/compare/0.40.0...0.41.0) - 2023-09-20
3638
### Added
3739
* API: add support for the following parameters in the `SearchDao` [`#2556`](https://github.com/MarquezProject/marquez/pull/2556) [@tati](https://github.com/tati) [@wslulciuc](https://github.com/wslulciuc)

api/src/main/java/marquez/api/JobResource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public Response getJob(
110110

111111
final Job job =
112112
jobService
113-
.findWithRun(namespaceName.getValue(), jobName.getValue())
113+
.findWithDatasetsAndRun(namespaceName.getValue(), jobName.getValue())
114114
.orElseThrow(() -> new JobNotFoundException(jobName));
115115
return Response.ok(job).build();
116116
}

api/src/main/java/marquez/api/OpenLineageResource.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import marquez.service.ServiceFactory;
4242
import marquez.service.models.BaseEvent;
4343
import marquez.service.models.DatasetEvent;
44+
import marquez.service.models.JobEvent;
4445
import marquez.service.models.LineageEvent;
4546
import marquez.service.models.NodeId;
4647

@@ -74,6 +75,10 @@ public void create(@Valid @NotNull BaseEvent event, @Suspended final AsyncRespon
7475
openLineageService
7576
.createAsync((DatasetEvent) event)
7677
.whenComplete((result, err) -> onComplete(result, err, asyncResponse));
78+
} else if (event instanceof JobEvent) {
79+
openLineageService
80+
.createAsync((JobEvent) event)
81+
.whenComplete((result, err) -> onComplete(result, err, asyncResponse));
7782
} else {
7883
log.warn("Unsupported event type {}. Skipping without error", event.getClass().getName());
7984

api/src/main/java/marquez/db/JobDao.java

Lines changed: 41 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import marquez.common.models.JobName;
2121
import marquez.common.models.JobType;
2222
import marquez.common.models.NamespaceName;
23+
import marquez.db.JobVersionDao.IoType;
24+
import marquez.db.JobVersionDao.JobDataset;
25+
import marquez.db.JobVersionDao.JobDatasetMapper;
2326
import marquez.db.mappers.JobMapper;
2427
import marquez.db.mappers.JobRowMapper;
2528
import marquez.db.models.JobRow;
@@ -34,6 +37,7 @@
3437

3538
@RegisterRowMapper(JobRowMapper.class)
3639
@RegisterRowMapper(JobMapper.class)
40+
@RegisterRowMapper(JobDatasetMapper.class)
3741
public interface JobDao extends BaseDao {
3842

3943
@SqlQuery(
@@ -56,21 +60,14 @@ SELECT EXISTS (
5660

5761
@SqlQuery(
5862
"""
59-
SELECT j.*, f.facets
63+
WITH job_versions_facets AS (
64+
SELECT job_version_uuid, JSON_AGG(facet) as facets
65+
FROM job_facets
66+
GROUP BY job_version_uuid
67+
)
68+
SELECT j.*, facets
6069
FROM jobs_view j
61-
LEFT OUTER JOIN job_versions AS jv ON jv.uuid = j.current_version_uuid
62-
LEFT OUTER JOIN (
63-
SELECT run_uuid, JSON_AGG(e.facet) AS facets
64-
FROM (
65-
SELECT jf.run_uuid, jf.facet
66-
FROM job_facets_view AS jf
67-
INNER JOIN job_versions jv2 ON jv2.latest_run_uuid=jf.run_uuid
68-
INNER JOIN jobs_view j2 ON j2.current_version_uuid=jv2.uuid
69-
WHERE j2.name=:jobName AND j2.namespace_name=:namespaceName
70-
ORDER BY lineage_event_time ASC
71-
) e
72-
GROUP BY e.run_uuid
73-
) f ON f.run_uuid=jv.latest_run_uuid
70+
LEFT OUTER JOIN job_versions_facets f ON j.current_version_uuid = f.job_version_uuid
7471
WHERE j.namespace_name=:namespaceName AND (j.name=:jobName OR :jobName = ANY(j.aliases))
7572
""")
7673
Optional<Job> findJobByName(String namespaceName, String jobName);
@@ -94,12 +91,18 @@ SELECT run_uuid, JSON_AGG(e.facet) AS facets
9491
""")
9592
void deleteByNamespaceName(String namespaceName);
9693

97-
default Optional<Job> findWithRun(String namespaceName, String jobName) {
94+
default Optional<Job> findWithDatasetsAndRun(String namespaceName, String jobName) {
9895
Optional<Job> job = findJobByName(namespaceName, jobName);
9996
job.ifPresent(
10097
j -> {
10198
Optional<Run> run = createRunDao().findByLatestJob(namespaceName, jobName);
102-
run.ifPresent(r -> this.setJobData(r, j));
99+
run.ifPresentOrElse(
100+
r -> this.setJobData(r, j),
101+
() ->
102+
this.setJobData(
103+
createJobVersionDao()
104+
.findCurrentInputOutputDatasetsFor(namespaceName, jobName),
105+
j));
103106
});
104107
return job;
105108
}
@@ -200,6 +203,28 @@ default List<Job> findAllWithRun(String namespaceName, int limit, int offset) {
200203
.collect(Collectors.toList());
201204
}
202205

206+
default void setJobData(List<JobDataset> datasets, Job j) {
207+
Optional.of(
208+
datasets.stream()
209+
.filter(d -> d.ioType().equals(IoType.INPUT))
210+
.map(
211+
ds ->
212+
new DatasetId(NamespaceName.of(ds.namespace()), DatasetName.of(ds.name())))
213+
.collect(Collectors.toSet()))
214+
.filter(s -> !s.isEmpty())
215+
.ifPresent(s -> j.setInputs(s));
216+
217+
Optional.of(
218+
datasets.stream()
219+
.filter(d -> d.ioType().equals(IoType.OUTPUT))
220+
.map(
221+
ds ->
222+
new DatasetId(NamespaceName.of(ds.namespace()), DatasetName.of(ds.name())))
223+
.collect(Collectors.toSet()))
224+
.filter(s -> !s.isEmpty())
225+
.ifPresent(s -> j.setOutputs(s));
226+
}
227+
203228
default void setJobData(Run run, Job j) {
204229
j.setLatestRun(run);
205230
DatasetVersionDao datasetVersionDao = createDatasetVersionDao();

api/src/main/java/marquez/db/JobFacetsDao.java

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.util.Spliterators;
1212
import java.util.UUID;
1313
import java.util.stream.StreamSupport;
14+
import javax.annotation.Nullable;
1415
import lombok.NonNull;
1516
import marquez.common.Utils;
1617
import marquez.db.mappers.JobFacetsMapper;
@@ -55,6 +56,32 @@ void insertJobFacet(
5556
String name,
5657
PGobject facet);
5758

59+
@SqlUpdate(
60+
"""
61+
INSERT INTO job_facets (
62+
created_at,
63+
job_uuid,
64+
job_version_uuid,
65+
lineage_event_time,
66+
name,
67+
facet
68+
) VALUES (
69+
:createdAt,
70+
:jobUuid,
71+
:jobVersionUuid,
72+
:lineageEventTime,
73+
:name,
74+
:facet
75+
)
76+
""")
77+
void insertJobFacet(
78+
Instant createdAt,
79+
UUID jobUuid,
80+
UUID jobVersionUuid,
81+
Instant lineageEventTime,
82+
String name,
83+
PGobject facet);
84+
5885
@SqlQuery(
5986
"""
6087
SELECT
@@ -72,9 +99,31 @@ void insertJobFacet(
7299
@Transaction
73100
default void insertJobFacetsFor(
74101
@NonNull UUID jobUuid,
75-
@NonNull UUID runUuid,
102+
@NonNull UUID jobVersionUuid,
103+
@NonNull Instant lineageEventTime,
104+
@NonNull LineageEvent.JobFacet jobFacet) {
105+
final Instant now = Instant.now();
106+
107+
JsonNode jsonNode = Utils.getMapper().valueToTree(jobFacet);
108+
StreamSupport.stream(
109+
Spliterators.spliteratorUnknownSize(jsonNode.fieldNames(), Spliterator.DISTINCT), false)
110+
.forEach(
111+
fieldName ->
112+
insertJobFacet(
113+
now,
114+
jobUuid,
115+
jobVersionUuid,
116+
lineageEventTime,
117+
fieldName,
118+
FacetUtils.toPgObject(fieldName, jsonNode.get(fieldName))));
119+
}
120+
121+
@Transaction
122+
default void insertJobFacetsFor(
123+
@NonNull UUID jobUuid,
124+
@Nullable UUID runUuid,
76125
@NonNull Instant lineageEventTime,
77-
@NonNull String lineageEventType,
126+
@Nullable String lineageEventType,
78127
@NonNull LineageEvent.JobFacet jobFacet) {
79128
final Instant now = Instant.now();
80129

0 commit comments

Comments
 (0)