Background
It seems the data model uses varchar(64) for fields that are meant to store connection URIs. When sending a lineage event, especially from OpenLineage's airflow integration, that contains a URI that is longer than 64 characters in various fields, an error is thrown.
Expected Behavior
Lineage events with a source name > 64 characters, or namespace > 64, should be accepted as a valid event
Observed Behavior
A stack trace is thrown
org.postgresql.util.PSQLException: ERROR: value too long for type character varying(64)
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2565)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2297)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:322)
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:481)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:401)
at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:164)
at org.postgresql.jdbc.PgPreparedStatement.execute(PgPreparedStatement.java:153)
at jdk.internal.reflect.GeneratedMethodAccessor20.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.tomcat.jdbc.pool.StatementFacade$StatementProxy.invoke(StatementFacade.java:118)
at com.sun.proxy.$Proxy42.execute(Unknown Source)
at org.jdbi.v3.core.statement.SqlLoggerUtil.wrap(SqlLoggerUtil.java:31)
at org.jdbi.v3.core.statement.SqlStatement.internalExecute(SqlStatement.java:1782)
... 92 common frames omitted
Causing: org.jdbi.v3.core.statement.UnableToExecuteStatementException: org.postgresql.util.PSQLException: ERROR: value too long for type character varying(64)
at org.jdbi.v3.core.statement.SqlStatement.internalExecute(SqlStatement.java:1790)
at org.jdbi.v3.core.result.ResultProducers.lambda$getResultSet$2(ResultProducers.java:64)
at org.jdbi.v3.core.result.ResultIterable.lambda$of$0(ResultIterable.java:54)
at org.jdbi.v3.core.result.ResultIterable.findFirst(ResultIterable.java:203)
at org.jdbi.v3.sqlobject.statement.internal.ResultReturner$CollectedResultReturner.mappedResult(ResultReturner.java:277)
at org.jdbi.v3.sqlobject.statement.internal.SqlQueryHandler.lambda$configureReturner$0(SqlQueryHandler.java:61)
at org.jdbi.v3.sqlobject.statement.internal.CustomizingStatementHandler.invoke(CustomizingStatementHandler.java:187)
at org.jdbi.v3.sqlobject.statement.internal.SqlQueryHandler.invoke(SqlQueryHandler.java:27)
at org.jdbi.v3.sqlobject.internal.SqlObjectInitData$1.lambda$invoke$0(SqlObjectInitData.java:132)
at org.jdbi.v3.core.internal.Invocations.invokeWith(Invocations.java:44)
at org.jdbi.v3.core.internal.Invocations.invokeWith(Invocations.java:26)
at org.jdbi.v3.core.LazyHandleSupplier.lambda$invokeInContext$1(LazyHandleSupplier.java:77)
at org.jdbi.v3.core.internal.Invocations.invokeWith(Invocations.java:44)
at org.jdbi.v3.core.internal.Invocations.invokeWith(Invocations.java:26)
at org.jdbi.v3.core.LazyHandleSupplier.invokeInContext(LazyHandleSupplier.java:76)
at org.jdbi.v3.sqlobject.internal.SqlObjectInitData$1.call(SqlObjectInitData.java:138)
at org.jdbi.v3.sqlobject.internal.SqlObjectInitData$1.invoke(SqlObjectInitData.java:132)
at org.jdbi.v3.sqlobject.SqlObjectFactory.lambda$attach$2(SqlObjectFactory.java:110)
at org.jdbi.v3.core.internal.OnDemandExtensions.lambda$invoke$5(OnDemandExtensions.java:98)
at org.jdbi.v3.core.internal.exceptions.Unchecked.lambda$function$4(Unchecked.java:76)
at org.jdbi.v3.core.internal.OnDemandExtensions.invoke(OnDemandExtensions.java:98)
at org.jdbi.v3.core.internal.OnDemandExtensions.lambda$createProxy$2(OnDemandExtensions.java:82)
at org.jdbi.v3.core.Jdbi.callWithExtension(Jdbi.java:476)
at org.jdbi.v3.core.Jdbi.withExtension(Jdbi.java:458)
at org.jdbi.v3.core.internal.OnDemandExtensions.lambda$createProxy$3(OnDemandExtensions.java:82)
at com.sun.proxy.$Proxy81.upsert(Unknown Source)
at marquez.db.OpenLineageDao.upsertLineageDataset(OpenLineageDao.java:327)
at java.base/java.lang.invoke.MethodHandle.invokeWithArguments(Unknown Source)
at org.jdbi.v3.core.internal.exceptions.Unchecked.lambda$function$4(Unchecked.java:76)
at org.jdbi.v3.sqlobject.DefaultMethodHandler.invoke(DefaultMethodHandler.java:50)
at org.jdbi.v3.sqlobject.internal.SqlObjectInitData$1.lambda$invoke$0(SqlObjectInitData.java:132)
at org.jdbi.v3.core.internal.Invocations.invokeWith(Invocations.java:44)
at org.jdbi.v3.core.internal.Invocations.invokeWith(Invocations.java:26)
at org.jdbi.v3.core.LazyHandleSupplier.lambda$invokeInContext$1(LazyHandleSupplier.java:77)
at org.jdbi.v3.core.internal.Invocations.invokeWith(Invocations.java:44)
at org.jdbi.v3.core.internal.Invocations.invokeWith(Invocations.java:26)
at org.jdbi.v3.core.LazyHandleSupplier.invokeInContext(LazyHandleSupplier.java:76)
at org.jdbi.v3.sqlobject.internal.SqlObjectInitData$1.call(SqlObjectInitData.java:138)
at org.jdbi.v3.sqlobject.internal.SqlObjectInitData$1.invoke(SqlObjectInitData.java:132)
at org.jdbi.v3.sqlobject.SqlObjectFactory.lambda$attach$2(SqlObjectFactory.java:110)
at com.sun.proxy.$Proxy122.upsertLineageDataset(Unknown Source)
at marquez.db.OpenLineageDao.updateBaseMarquezModel(OpenLineageDao.java:243)
at java.base/java.lang.invoke.MethodHandle.invokeWithArguments(Unknown Source)
at org.jdbi.v3.core.internal.exceptions.Unchecked.lambda$function$4(Unchecked.java:76)
at org.jdbi.v3.sqlobject.DefaultMethodHandler.invoke(DefaultMethodHandler.java:50)
at org.jdbi.v3.sqlobject.internal.SqlObjectInitData$1.lambda$invoke$0(SqlObjectInitData.java:132)
at org.jdbi.v3.core.internal.Invocations.invokeWith(Invocations.java:44)
at org.jdbi.v3.core.internal.Invocations.invokeWith(Invocations.java:26)
at org.jdbi.v3.core.LazyHandleSupplier.lambda$invokeInContext$1(LazyHandleSupplier.java:77)
at org.jdbi.v3.core.internal.Invocations.invokeWith(Invocations.java:44)
at org.jdbi.v3.core.internal.Invocations.invokeWith(Invocations.java:26)
at org.jdbi.v3.core.LazyHandleSupplier.invokeInContext(LazyHandleSupplier.java:76)
at org.jdbi.v3.sqlobject.internal.SqlObjectInitData$1.call(SqlObjectInitData.java:138)
at org.jdbi.v3.sqlobject.internal.SqlObjectInitData$1.invoke(SqlObjectInitData.java:132)
at org.jdbi.v3.sqlobject.SqlObjectFactory.lambda$attach$2(SqlObjectFactory.java:110)
at com.sun.proxy.$Proxy122.updateBaseMarquezModel(Unknown Source)
at marquez.db.OpenLineageDao.updateMarquezModel(OpenLineageDao.java:73)
at java.base/java.lang.invoke.MethodHandle.invokeWithArguments(Unknown Source)
at org.jdbi.v3.core.internal.exceptions.Unchecked.lambda$function$4(Unchecked.java:76)
at org.jdbi.v3.sqlobject.DefaultMethodHandler.invoke(DefaultMethodHandler.java:50)
at org.jdbi.v3.sqlobject.transaction.internal.TransactionDecorator.lambda$decorateHandler$3(TransactionDecorator.java:46)
at org.jdbi.v3.core.transaction.LocalTransactionHandler$BoundLocalTransactionHandler.inTransaction(LocalTransactionHandler.java:208)
at org.jdbi.v3.core.transaction.LocalTransactionHandler$BoundLocalTransactionHandler.inTransaction(LocalTransactionHandler.java:233)
at org.jdbi.v3.core.Handle.inTransaction(Handle.java:505)
at org.jdbi.v3.sqlobject.transaction.internal.TransactionDecorator.lambda$decorateHandler$4(TransactionDecorator.java:54)
at org.jdbi.v3.sqlobject.internal.SqlObjectInitData$1.lambda$invoke$0(SqlObjectInitData.java:132)
at org.jdbi.v3.core.internal.Invocations.invokeWith(Invocations.java:44)
at org.jdbi.v3.core.internal.Invocations.invokeWith(Invocations.java:26)
at org.jdbi.v3.core.LazyHandleSupplier.lambda$invokeInContext$1(LazyHandleSupplier.java:77)
at org.jdbi.v3.core.internal.Invocations.invokeWith(Invocations.java:44)
at org.jdbi.v3.core.internal.Invocations.invokeWith(Invocations.java:26)
at org.jdbi.v3.core.LazyHandleSupplier.invokeInContext(LazyHandleSupplier.java:76)
at org.jdbi.v3.sqlobject.internal.SqlObjectInitData$1.call(SqlObjectInitData.java:138)
at org.jdbi.v3.sqlobject.internal.SqlObjectInitData$1.invoke(SqlObjectInitData.java:132)
at org.jdbi.v3.sqlobject.SqlObjectFactory.lambda$attach$2(SqlObjectFactory.java:110)
at org.jdbi.v3.core.internal.OnDemandExtensions.lambda$invoke$5(OnDemandExtensions.java:98)
at org.jdbi.v3.core.internal.exceptions.Unchecked.lambda$function$4(Unchecked.java:76)
at org.jdbi.v3.core.internal.OnDemandExtensions.invoke(OnDemandExtensions.java:98)
at org.jdbi.v3.core.internal.OnDemandExtensions.lambda$createProxy$2(OnDemandExtensions.java:82)
at org.jdbi.v3.core.Jdbi.callWithExtension(Jdbi.java:476)
at org.jdbi.v3.core.Jdbi.withExtension(Jdbi.java:463)
at org.jdbi.v3.core.internal.OnDemandExtensions.lambda$createProxy$3(OnDemandExtensions.java:82)
at com.sun.proxy.$Proxy90.updateMarquezModel(Unknown Source)
at marquez.service.DelegatingDaos$DelegatingOpenLineageDao.updateMarquezModel(DelegatingDaos.java:59)
at marquez.service.OpenLineageService.lambda$createAsync$0(OpenLineageService.java:55)
at marquez.tracing.SentryPropagating$SentryPropagatingSupplier.get(SentryPropagating.java:23)
... 7 common frames omitted
Causing: java.util.concurrent.CompletionException: org.jdbi.v3.core.statement.UnableToExecuteStatementException: org.postgresql.util.PSQLException: ERROR: value too long for type character varying(64) [s
"/* SourceDao.upsert */
INSERT INTO
sources
(uuid, type, created_at, updated_at, name, connection_url )
VALUES
(:uuid, :type, :now, :now, :name, :connectionUrl)
ON CONFLICT(name) DO UPDATE SET
type = EXCLUDED.type,
updated_at = EXCLUDED.updated_at,
name = EXCLUDED.name,
connection_url = EXCLUDED.connection_url RETURNING *",
arguments:{positional:{0:b01575bf-a4b4-4038-baf9-771115138f5f,1:POSTGRESQL,2:2022-09-13T16:05:30.191569Z,3:postgres://redshift-cluster-abc.1axlbxilf981.us-east-1.redshift.amazonaws.com:5439,4:redshift://redshift-cluster-abc.1axlbxilf981.us-east-1.redshift.amazonaws.com:5439/schemaX}, named:{now:2022-09-13T16:05:30.191569Z,name:postgres://redshift-cluster-abc.1axlbxilf981.us-east-1.redshift.amazonaws.com:5439,connectionUrl:redshift://redshift-cluster-abc.1axlbxilf981.us-east-1.redshift.amazonaws.com:5439/schemaX,type:POSTGRESQL,uuid:b01575bf-a4b4-4038-baf9-771115138f5f}, finder:[]}]
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
Test Data
{
"eventTime": "2022-09-13T16:05:30.191569Z",
"eventType": "COMPLETE",
"inputs": [],
"job": {
"facets": {
"sql": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.14.1/integration/airflow",
"_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet",
"query": "DELETE FROM schema_123.table_1234_with_extra_values_1 WHERE True AND \n time_registered >= (EXTRACT(EPOCH FROM TIMESTAMP WITH TIME ZONE '2022-08-01 00:00:00.000') * 1000)\n AND time_registered < (EXTRACT(EPOCH FROM TIMESTAMP WITH TIME ZONE '2022-09-01 00:00:00.000') * 1000) ;"
}
},
"name": "my_really_really_really_really_long_named_process_v25.action",
"namespace": "dev"
},
"outputs": [
{
"facets": {
"dataSource": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.14.1/integration/airflow",
"_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/DataSourceDatasetFacet",
"name": "postgres://redshift-cluster-abc.abc123xyz987.us-east-1.redshift.amazonaws.com:5439",
"uri": "redshift://redshift-cluster-abc.abc123xyz987.us-east-1.redshift.amazonaws.com:5439/schema1"
},
"schema": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.14.1/integration/airflow",
"_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SchemaDatasetFacet",
"fields": [
{
"name": "project_id",
"type": "varchar"
},
{
"name": "data_object_id",
"type": "varchar"
},
{
"name": "field_10",
"type": "bool"
},
{
"name": "time_registered",
"type": "int8"
}
]
}
},
"name": "schema1.schema_123.table_1234_with_extra_values_1",
"namespace": "postgres://redshift-cluster-abc.abc123xyz987.us-east-1.redshift.amazonaws.com:5439"
}
],
"producer": "https://github.com/OpenLineage/OpenLineage/tree/0.14.1/integration/airflow",
"run": {
"facets": {},
"runId": "72a6d748-d78c-46b0-9ed0-7fa62ff2fa6b"
}
}
Proposed Fix
Most fields that store metadata should be bumped to varchar(256) or some larger field to support longer URLs, and the UI should simply truncate to accomodate
Background
It seems the data model uses
varchar(64)for fields that are meant to store connection URIs. When sending a lineage event, especially from OpenLineage's airflow integration, that contains a URI that is longer than 64 characters in various fields, an error is thrown.Expected Behavior
Lineage events with a source name > 64 characters, or namespace > 64, should be accepted as a valid event
Observed Behavior
A stack trace is thrown
Test Data
Proposed Fix
Most fields that store metadata should be bumped to
varchar(256)or some larger field to support longer URLs, and the UI should simply truncate to accomodate