Skip to content

Commit 0995b0a

Browse files
enable point-in-time column lineage in python client (#2276)
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com> Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com> Co-authored-by: Willy Lulciuc <willy@datakin.com>
1 parent a658a01 commit 0995b0a

2 files changed

Lines changed: 42 additions & 36 deletions

File tree

clients/python/marquez_client/client.py

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -343,8 +343,18 @@ def get_job_run(self, run_id):
343343
Utils.is_valid_uuid(run_id, 'run_id')
344344
return self._get(self._url('/jobs/runs/{0}', run_id))
345345

346-
def get_column_lineage_by_dataset(self, namespace, dataset, depth=None, with_downstream=None):
347-
node_id = "dataset:{0}:{1}".format(namespace, dataset)
346+
def get_column_lineage_by_dataset(
347+
self,
348+
namespace,
349+
dataset,
350+
depth=None,
351+
with_downstream=None,
352+
version=None
353+
):
354+
node_id = self._append_version_to_node_id(
355+
"dataset:{0}:{1}".format(namespace, dataset),
356+
version
357+
)
348358
return self._get_column_lineage(node_id, depth, with_downstream)
349359

350360
def get_column_lineage_by_dataset_field(
@@ -353,15 +363,29 @@ def get_column_lineage_by_dataset_field(
353363
dataset,
354364
field,
355365
depth=None,
356-
with_downstream=None
366+
with_downstream=None,
367+
version=None,
357368
):
358-
node_id = "datasetField:{0}:{1}:{2}".format(namespace, dataset, field)
369+
node_id = self._append_version_to_node_id(
370+
"datasetField:{0}:{1}:{2}".format(namespace, dataset, field),
371+
version
372+
)
359373
return self._get_column_lineage(node_id, depth, with_downstream)
360374

361-
def get_column_lineage_by_job(self, namespace, job, depth=None, with_downstream=None):
362-
node_id = "job:{0}:{1}".format(namespace, job)
375+
def get_column_lineage_by_job(self, namespace, job, depth=None,
376+
with_downstream=None, version=None):
377+
node_id = self._append_version_to_node_id(
378+
"job:{0}:{1}".format(namespace, job),
379+
version
380+
)
363381
return self._get_column_lineage(node_id, depth, with_downstream)
364382

383+
def _append_version_to_node_id(self, node_id, version):
384+
if version is not None:
385+
return node_id + "#" + version
386+
else:
387+
return node_id
388+
365389
def _get_column_lineage(self, node_id, depth, with_downstream):
366390
return self._get(
367391
self._url('/column-lineage'),

clients/python/tests/test_marquez_client.py

Lines changed: 12 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -979,21 +979,16 @@ def test_get_column_lineage_by_dataset(mock_get, client):
979979
mock_get.return_value.json.return_value = COLUMN_LINEAGE
980980

981981
column_lineage = client.get_column_lineage_by_dataset(
982-
"namespace_a",
983-
"dataset_a",
984-
DEFAULT_DEPTH,
985-
DEFAULT_WITH_DOWNSTREAM
982+
namespace="namespace_a", dataset="dataset_a", depth=DEFAULT_DEPTH,
983+
with_downstream=DEFAULT_WITH_DOWNSTREAM, version="some-version"
986984
)
987985

988986
assert column_lineage == COLUMN_LINEAGE
989-
990987
mock_get.assert_called_once_with(
991-
url=client._url(
992-
'/column-lineage'
993-
),
988+
url=client._url('/column-lineage'),
994989
headers=mock.ANY,
995990
params={
996-
'nodeId': 'dataset:namespace_a:dataset_a',
991+
'nodeId': 'dataset:namespace_a:dataset_a#some-version',
997992
'depth': DEFAULT_DEPTH,
998993
'withDownstream': DEFAULT_WITH_DOWNSTREAM
999994
},
@@ -1005,24 +1000,17 @@ def test_get_column_lineage_by_dataset(mock_get, client):
10051000
def test_get_column_lineage_by_dataset_field(mock_get, client):
10061001
mock_get.return_value.status_code.return_value = HTTPStatus.OK
10071002
mock_get.return_value.json.return_value = COLUMN_LINEAGE
1008-
10091003
column_lineage = client.get_column_lineage_by_dataset_field(
1010-
"namespace_a",
1011-
"dataset_a",
1012-
"field_a",
1013-
DEFAULT_DEPTH,
1014-
DEFAULT_WITH_DOWNSTREAM
1004+
namespace="namespace_a", dataset="dataset_a", field="field_a", depth=DEFAULT_DEPTH,
1005+
with_downstream=DEFAULT_WITH_DOWNSTREAM, version="some-version"
10151006
)
10161007

10171008
assert column_lineage == COLUMN_LINEAGE
1018-
10191009
mock_get.assert_called_once_with(
1020-
url=client._url(
1021-
'/column-lineage'
1022-
),
1010+
url=client._url('/column-lineage'),
10231011
headers=mock.ANY,
10241012
params={
1025-
'nodeId': 'datasetField:namespace_a:dataset_a:field_a',
1013+
'nodeId': 'datasetField:namespace_a:dataset_a:field_a#some-version',
10261014
'depth': DEFAULT_DEPTH,
10271015
'withDownstream': DEFAULT_WITH_DOWNSTREAM
10281016
},
@@ -1034,23 +1022,17 @@ def test_get_column_lineage_by_dataset_field(mock_get, client):
10341022
def test_get_column_lineage_by_job(mock_get, client):
10351023
mock_get.return_value.status_code.return_value = HTTPStatus.OK
10361024
mock_get.return_value.json.return_value = COLUMN_LINEAGE
1037-
10381025
column_lineage = client.get_column_lineage_by_job(
1039-
"namespace_a",
1040-
"job_a",
1041-
DEFAULT_DEPTH,
1042-
DEFAULT_WITH_DOWNSTREAM
1026+
namespace="namespace_a", job="job_a", depth=DEFAULT_DEPTH,
1027+
with_downstream=DEFAULT_WITH_DOWNSTREAM, version="some-version"
10431028
)
10441029

10451030
assert column_lineage == COLUMN_LINEAGE
1046-
10471031
mock_get.assert_called_once_with(
1048-
url=client._url(
1049-
'/column-lineage'
1050-
),
1032+
url=client._url('/column-lineage'),
10511033
headers=mock.ANY,
10521034
params={
1053-
'nodeId': 'job:namespace_a:job_a',
1035+
'nodeId': 'job:namespace_a:job_a#some-version',
10541036
'depth': DEFAULT_DEPTH,
10551037
'withDownstream': DEFAULT_WITH_DOWNSTREAM
10561038
},

0 commit comments

Comments
 (0)