2020import marquez .common .models .JobName ;
2121import marquez .common .models .JobType ;
2222import marquez .common .models .NamespaceName ;
23+ import marquez .db .JobVersionDao .IoType ;
24+ import marquez .db .JobVersionDao .JobDataset ;
25+ import marquez .db .JobVersionDao .JobDatasetMapper ;
2326import marquez .db .mappers .JobMapper ;
2427import marquez .db .mappers .JobRowMapper ;
2528import marquez .db .models .JobRow ;
3437
3538@ RegisterRowMapper (JobRowMapper .class )
3639@ RegisterRowMapper (JobMapper .class )
40+ @ RegisterRowMapper (JobDatasetMapper .class )
3741public interface JobDao extends BaseDao {
3842
3943 @ SqlQuery (
@@ -56,21 +60,14 @@ SELECT EXISTS (
5660
5761 @ SqlQuery (
5862 """
59- SELECT j.*, f.facets
63+ WITH job_versions_facets AS (
64+ SELECT job_version_uuid, JSON_AGG(facet) as facets
65+ FROM job_facets
66+ GROUP BY job_version_uuid
67+ )
68+ SELECT j.*, facets
6069 FROM jobs_view j
61- LEFT OUTER JOIN job_versions AS jv ON jv.uuid = j.current_version_uuid
62- LEFT OUTER JOIN (
63- SELECT run_uuid, JSON_AGG(e.facet) AS facets
64- FROM (
65- SELECT jf.run_uuid, jf.facet
66- FROM job_facets_view AS jf
67- INNER JOIN job_versions jv2 ON jv2.latest_run_uuid=jf.run_uuid
68- INNER JOIN jobs_view j2 ON j2.current_version_uuid=jv2.uuid
69- WHERE j2.name=:jobName AND j2.namespace_name=:namespaceName
70- ORDER BY lineage_event_time ASC
71- ) e
72- GROUP BY e.run_uuid
73- ) f ON f.run_uuid=jv.latest_run_uuid
70+ LEFT OUTER JOIN job_versions_facets f ON j.current_version_uuid = f.job_version_uuid
7471 WHERE j.namespace_name=:namespaceName AND (j.name=:jobName OR :jobName = ANY(j.aliases))
7572 """ )
7673 Optional <Job > findJobByName (String namespaceName , String jobName );
@@ -94,12 +91,18 @@ SELECT run_uuid, JSON_AGG(e.facet) AS facets
9491 """ )
9592 void deleteByNamespaceName (String namespaceName );
9693
97- default Optional <Job > findWithRun (String namespaceName , String jobName ) {
94+ default Optional <Job > findWithDatasetsAndRun (String namespaceName , String jobName ) {
9895 Optional <Job > job = findJobByName (namespaceName , jobName );
9996 job .ifPresent (
10097 j -> {
10198 Optional <Run > run = createRunDao ().findByLatestJob (namespaceName , jobName );
102- run .ifPresent (r -> this .setJobData (r , j ));
99+ run .ifPresentOrElse (
100+ r -> this .setJobData (r , j ),
101+ () ->
102+ this .setJobData (
103+ createJobVersionDao ()
104+ .findCurrentInputOutputDatasetsFor (namespaceName , jobName ),
105+ j ));
103106 });
104107 return job ;
105108 }
@@ -200,6 +203,28 @@ default List<Job> findAllWithRun(String namespaceName, int limit, int offset) {
200203 .collect (Collectors .toList ());
201204 }
202205
206+ default void setJobData (List <JobDataset > datasets , Job j ) {
207+ Optional .of (
208+ datasets .stream ()
209+ .filter (d -> d .ioType ().equals (IoType .INPUT ))
210+ .map (
211+ ds ->
212+ new DatasetId (NamespaceName .of (ds .namespace ()), DatasetName .of (ds .name ())))
213+ .collect (Collectors .toSet ()))
214+ .filter (s -> !s .isEmpty ())
215+ .ifPresent (s -> j .setInputs (s ));
216+
217+ Optional .of (
218+ datasets .stream ()
219+ .filter (d -> d .ioType ().equals (IoType .OUTPUT ))
220+ .map (
221+ ds ->
222+ new DatasetId (NamespaceName .of (ds .namespace ()), DatasetName .of (ds .name ())))
223+ .collect (Collectors .toSet ()))
224+ .filter (s -> !s .isEmpty ())
225+ .ifPresent (s -> j .setOutputs (s ));
226+ }
227+
203228 default void setJobData (Run run , Job j ) {
204229 j .setLatestRun (run );
205230 DatasetVersionDao datasetVersionDao = createDatasetVersionDao ();
0 commit comments