Skip to content

Commit ce3835d

Browse files
authored
Merge branch 'main' into observability/add_endpoint_name_to_metrics_name
2 parents d141c1f + 879031a commit ce3835d

26 files changed

Lines changed: 564 additions & 150 deletions

File tree

api/src/main/java/marquez/api/DatasetResource.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,11 @@ public Response listVersions(
133133
final List<DatasetVersion> datasetVersions =
134134
datasetVersionService.findAllWithRun(
135135
namespaceName.getValue(), datasetName.getValue(), limit, offset);
136-
return Response.ok(new DatasetVersions(datasetVersions)).build();
136+
137+
final int totalCount =
138+
datasetVersionService.countDatasetVersions(
139+
namespaceName.getValue(), datasetName.getValue());
140+
return Response.ok(new DatasetVersions(datasetVersions, totalCount)).build();
137141
}
138142

139143
@Timed
@@ -301,5 +305,8 @@ static class DatasetVersions {
301305
@NonNull
302306
@JsonProperty("versions")
303307
List<DatasetVersion> value;
308+
309+
@JsonProperty("totalCount")
310+
int totalCount;
304311
}
305312
}

api/src/main/java/marquez/api/JobResource.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,8 @@ public Response listRuns(
238238

239239
final List<Run> runs =
240240
runService.findAll(namespaceName.getValue(), jobName.getValue(), limit, offset);
241-
return Response.ok(new Runs(runs)).build();
241+
final int totalCount = jobService.countJobRuns(namespaceName.getValue(), jobName.getValue());
242+
return Response.ok(new Runs(runs, totalCount)).build();
242243
}
243244

244245
@Path("/jobs/runs/{id}")
@@ -329,5 +330,8 @@ public static class Runs {
329330
@NonNull
330331
@JsonProperty("runs")
331332
List<Run> value;
333+
334+
@JsonProperty("totalCount")
335+
int totalCount;
332336
}
333337
}

api/src/main/java/marquez/db/DatasetVersionDao.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,13 +278,19 @@ LEFT JOIN (
278278
WHERE dv.namespace_name = :namespaceName
279279
AND dv.dataset_name = :datasetName
280280
LIMIT :limit OFFSET :offset
281+
),
282+
dataset_symlinks_names as (
283+
SELECT DISTINCT dataset_uuid, name
284+
FROM dataset_symlinks
285+
WHERE NOT is_primary
281286
)
282287
SELECT
283288
type, name, physical_name, namespace_name, source_name, description, lifecycle_state,
284289
created_at, version, dataset_schema_version_uuid, fields, createdByRunUuid, schema_location,
285290
tags, dataset_version_uuid,
286291
JSONB_AGG(facets ORDER BY lineage_event_time ASC) AS facets
287292
FROM dataset_info
293+
WHERE name NOT IN (SELECT name FROM dataset_symlinks_names)
288294
GROUP BY type, name, physical_name, namespace_name, source_name, description, lifecycle_state,
289295
created_at, version, dataset_schema_version_uuid, fields, createdByRunUuid, schema_location,
290296
tags, dataset_version_uuid
@@ -309,6 +315,20 @@ default List<DatasetVersion> findAllWithRun(
309315
@SqlQuery(SELECT + "WHERE dv.uuid = :uuid")
310316
Optional<DatasetVersionRow> findRowByUuid(UUID uuid);
311317

318+
@SqlQuery(
319+
"""
320+
select
321+
count(*)
322+
from
323+
dataset_versions
324+
where
325+
namespace_name = :namespaceName
326+
and
327+
dataset_name = :dataset
328+
;
329+
""")
330+
int countDatasetVersions(String namespaceName, String dataset);
331+
312332
@SqlQuery(
313333
"INSERT INTO dataset_versions "
314334
+ "(uuid, created_at, dataset_uuid, version, dataset_schema_version_uuid, run_uuid, fields, namespace_name, dataset_name, lifecycle_state) "

api/src/main/java/marquez/db/JobDao.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,20 @@ job_tags as (
252252
@SqlQuery("SELECT count(*) FROM jobs_view AS j WHERE symlink_target_uuid IS NULL")
253253
int count();
254254

255+
@SqlQuery(
256+
"""
257+
select
258+
count(*)
259+
from
260+
runs
261+
where
262+
namespace_name = :namespaceName
263+
and
264+
job_name = :job
265+
;
266+
""")
267+
int countJobRuns(String namespaceName, String job);
268+
255269
@SqlQuery(
256270
"SELECT count(*) FROM jobs_view AS j WHERE j.namespace_name = :namespaceName\n"
257271
+ "AND symlink_target_uuid IS NULL")

api/src/test/java/marquez/DatasetIntegrationTest.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
import static marquez.db.ColumnLineageTestUtils.getDatasetA;
99
import static marquez.db.ColumnLineageTestUtils.getDatasetB;
10+
import static marquez.db.LineageTestUtils.PRODUCER_URL;
11+
import static marquez.db.LineageTestUtils.SCHEMA_URL;
1012
import static org.assertj.core.api.Assertions.assertThat;
1113

1214
import com.fasterxml.jackson.core.type.TypeReference;
@@ -574,4 +576,57 @@ public void testApp_doesNotShowDeletedDatasetAfterUndeleteNamespace() throws IOE
574576
jobs = client.listJobs(namespaceName);
575577
assertThat(jobs).hasSize(1);
576578
}
579+
580+
@Test
581+
public void testApp_getTableVersionsWithSymlinks() {
582+
client.createDataset(NAMESPACE_NAME, DB_TABLE_NAME, DB_TABLE_META);
583+
584+
ImmutableMap<String, Object> outputFacets =
585+
ImmutableMap.of("outputFacetKey", "outputFacetValue");
586+
ImmutableMap<String, Object> inputFacets = ImmutableMap.of("inputFacetKey", "inputFacetValue");
587+
588+
final LineageEvent.DatasetFacets datasetFacets =
589+
LineageTestUtils.newDatasetFacet(
590+
outputFacets,
591+
LineageEvent.SchemaField.builder()
592+
.name("firstname")
593+
.type("string")
594+
.description("the first name")
595+
.build());
596+
datasetFacets
597+
.getDocumentation()
598+
.setDescription(DB_TABLE_META.getDescription().orElse("the dataset documentation"));
599+
datasetFacets.setSymlinks(
600+
new LineageEvent.DatasetSymlinkFacet(
601+
PRODUCER_URL,
602+
SCHEMA_URL,
603+
Collections.singletonList(
604+
new LineageEvent.SymlinkIdentifier("symlinkNamespace", "symlinkName", "type"))));
605+
final LineageEvent lineageEvent =
606+
LineageEvent.builder()
607+
.producer("testApp_getTableVersionsWithSymlinks")
608+
.eventType("COMPLETE")
609+
.run(
610+
new LineageEvent.Run(
611+
UUID.randomUUID().toString(), LineageEvent.RunFacet.builder().build()))
612+
.job(LineageEvent.Job.builder().namespace(NAMESPACE_NAME).name(JOB_NAME).build())
613+
.eventTime(ZonedDateTime.now())
614+
.inputs(
615+
Collections.singletonList(
616+
LineageEvent.Dataset.builder()
617+
.namespace(NAMESPACE_NAME)
618+
.name(DB_TABLE_NAME)
619+
.facets(datasetFacets)
620+
.build()))
621+
.outputs(Collections.emptyList())
622+
.build();
623+
final CompletableFuture<Integer> resp = sendEvent(lineageEvent);
624+
assertThat(resp.join()).isEqualTo(201);
625+
List<DatasetVersion> versions = client.listDatasetVersions(NAMESPACE_NAME, DB_TABLE_NAME);
626+
627+
versions.forEach(
628+
datasetVersion -> {
629+
assertThat(datasetVersion.getName()).isNotEqualTo("symlinkName");
630+
});
631+
}
577632
}

api/src/test/java/marquez/db/DatasetDaoTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,10 @@ void testUpsertMetadataMultipleRuns() {
114114
assertEquals(firstResult.getId(), secondResult.getId());
115115
assertNotEquals(firstResult.getCurrentVersion(), secondResult.getCurrentVersion());
116116
assertEquals(resolveSchemaVersion(firstResult), resolveSchemaVersion(secondResult));
117+
118+
// test count of dataset versions
119+
int jobCount = datasetVersionDao.countDatasetVersions(NAMESPACE, DATASET);
120+
assertEquals(jobCount, 2);
117121
}
118122

119123
@Test
@@ -310,6 +314,7 @@ public void testGetDatasetWithMultipleVersions() {
310314

311315
Optional<marquez.service.models.Dataset> datasetByName =
312316
datasetDao.findDatasetByName(NAMESPACE, DATASET);
317+
313318
assertThat(datasetByName)
314319
.isPresent()
315320
.get()

api/src/test/java/marquez/db/JobDaoTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ public void testCountFor() {
136136
assertThat(jobDao.count()).isEqualTo(4);
137137

138138
assertThat(jobDao.countFor(namespace.getName())).isEqualTo(3);
139+
assertThat(jobDao.countJobRuns(namespace.getName(), "targetJob")).isEqualTo(0);
139140
}
140141

141142
@Test

web/src/components/datasets/DatasetDetailPage.tsx

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import {
2828
deleteDataset,
2929
dialogToggle,
3030
fetchDataset,
31-
fetchDatasetVersions,
31+
fetchInitialDatasetVersions,
3232
resetDataset,
3333
resetDatasetVersions,
3434
setTabIndex,
@@ -56,15 +56,15 @@ import StorageIcon from '@mui/icons-material/Storage'
5656
interface StateProps {
5757
lineageDataset: LineageDataset
5858
dataset: Dataset
59-
versions: DatasetVersion[]
60-
versionsLoading: boolean
59+
initVersions: DatasetVersion[]
60+
initVersionsLoading: boolean
6161
datasets: IState['datasets']
6262
display: IState['display']
6363
tabIndex: IState['lineage']['tabIndex']
6464
}
6565

6666
interface DispatchProps {
67-
fetchDatasetVersions: typeof fetchDatasetVersions
67+
fetchInitialDatasetVersions: typeof fetchInitialDatasetVersions
6868
fetchDataset: typeof fetchDataset
6969
resetDatasetVersions: typeof resetDatasetVersions
7070
resetDataset: typeof resetDataset
@@ -88,13 +88,13 @@ const DatasetDetailPage: FunctionComponent<IProps> = (props) => {
8888
dataset,
8989
display,
9090
fetchDataset,
91-
fetchDatasetVersions,
9291
resetDataset,
9392
resetDatasetVersions,
93+
fetchInitialDatasetVersions,
9494
deleteDataset,
9595
dialogToggle,
96-
versions,
97-
versionsLoading,
96+
initVersions,
97+
initVersionsLoading,
9898
lineageDataset,
9999
tabIndex,
100100
setTabIndex,
@@ -114,8 +114,9 @@ const DatasetDetailPage: FunctionComponent<IProps> = (props) => {
114114
[]
115115
)
116116

117+
// might need to map first version to its own state
117118
useEffect(() => {
118-
fetchDatasetVersions(lineageDataset.namespace, lineageDataset.name)
119+
fetchInitialDatasetVersions(lineageDataset.namespace, lineageDataset.name)
119120
fetchDataset(lineageDataset.namespace, lineageDataset.name)
120121
}, [lineageDataset.name, showTags])
121122

@@ -130,19 +131,19 @@ const DatasetDetailPage: FunctionComponent<IProps> = (props) => {
130131
setTabIndex(newValue)
131132
}
132133

133-
if (versionsLoading && versions.length === 0) {
134+
if (initVersionsLoading && initVersions.length === 0) {
134135
return (
135136
<Box display={'flex'} justifyContent={'center'} mt={2}>
136137
<CircularProgress color='primary' />
137138
</Box>
138139
)
139140
}
140141

141-
if (versions.length === 0) {
142+
if (initVersions.length === 0) {
142143
return null
143144
}
144145

145-
const firstVersion = versions[0]
146+
const firstVersion = initVersions[0]
146147
const { name, tags, description } = firstVersion
147148
const facetsStatus = datasetFacetsStatus(firstVersion.facets)
148149

@@ -326,7 +327,7 @@ const DatasetDetailPage: FunctionComponent<IProps> = (props) => {
326327
checked={showTags}
327328
onChange={() => setShowTags(!showTags)}
328329
inputProps={{ 'aria-label': 'toggle show tags' }}
329-
disabled={versionsLoading}
330+
disabled={initVersionsLoading}
330331
/>
331332
}
332333
label={i18next.t('datasets.show_field_tags')}
@@ -344,7 +345,7 @@ const DatasetDetailPage: FunctionComponent<IProps> = (props) => {
344345
isCurrentVersion
345346
/>
346347
)}
347-
{tabIndex === 1 && <DatasetVersions dataset={dataset} versions={props.versions} />}
348+
{tabIndex === 1 && <DatasetVersions dataset={dataset} />}
348349
</Box>
349350
)
350351
}
@@ -353,15 +354,15 @@ const mapStateToProps = (state: IState) => ({
353354
datasets: state.datasets,
354355
dataset: state.dataset.result,
355356
display: state.display,
356-
versions: state.datasetVersions.result.versions,
357-
versionsLoading: state.datasetVersions.isLoading,
357+
initVersions: state.datasetVersions.initDsVersion.versions,
358+
initVersionsLoading: state.datasetVersions.isInitDsVerLoading,
358359
tabIndex: state.lineage.tabIndex,
359360
})
360361

361362
const mapDispatchToProps = (dispatch: Redux.Dispatch) =>
362363
bindActionCreators(
363364
{
364-
fetchDatasetVersions: fetchDatasetVersions,
365+
fetchInitialDatasetVersions: fetchInitialDatasetVersions,
365366
fetchDataset: fetchDataset,
366367
resetDatasetVersions: resetDatasetVersions,
367368
resetDataset: resetDataset,

web/src/components/datasets/DatasetInfo.tsx

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,11 @@ const DatasetInfo: FunctionComponent<DatasetInfoProps> = (props) => {
4444
const { datasetFields, facets, run, dataset, fetchJobFacets, resetFacets, showTags } = props
4545
const i18next = require('i18next')
4646
const dsNamespace = useSelector(
47-
(state: IState) => state.datasetVersions.result.versions[0].namespace
47+
(state: IState) => state.datasetVersions.initDsVersion.versions[0].namespace
48+
)
49+
const dsName = useSelector(
50+
(state: IState) => state.datasetVersions.initDsVersion.versions[0].name
4851
)
49-
const dsName = useSelector((state: IState) => state.datasetVersions.result.versions[0].name)
5052

5153
useEffect(() => {
5254
run && fetchJobFacets(run.id)

0 commit comments

Comments
 (0)