Skip to content

Lineage data generated from PythonOperator results in 500 Error #2158

@howardyoo

Description

@howardyoo

Problem

Environment

  • Airflow version: 2.4.1+astro.1
  • Openlineage airflow version : 0.14.1
  • Astro runtime version : 6.0.2

How to reproduce

  • Create a simple python operator in a DAG and run.
  • Observe the lineage data generated by enableing log level = DEBUG
  • The following events may be observed:

START

{
  "eventTime": "2022-10-03T01:17:55.010405Z",
  "eventType": "START",
  "inputs": [],
  "job": {
    "facets": {},
    "name": "inlet_outlet_demo.test-operator",
    "namespace": "uninhabited-magnify-7821"
  },
  "outputs": [],
  "producer": "https://github.com/OpenLineage/OpenLineage/tree/0.14.1/integration/airflow",
  "run": {
    "facets": {
      "airflow_runArgs": {
        "_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.14.1/integration/airflow",
        "_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet",
        "externalTrigger": true
      },
      "airflow_version": {
        "_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.14.1/integration/airflow",
        "_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet",
        "airflowVersion": "2.4.1+astro.1",
        "openlineageAirflowVersion": "0.14.1",
        "operator": "airflow.operators.python.PythonOperator",
        "taskInfo": {
          "_BaseOperator__from_mapped": false,
          "_BaseOperator__init_kwargs": {
            "depends_on_past": false,
            "email": [],
            "email_on_failure": false,
            "email_on_retry": false,
            "op_kwargs": {
              "x": "Apache Airflow"
            },
            "owner": "demo",
            "python_callable": "<function python_operator at 0x403f2e4820>",
            "start_date": "2022-10-02T00:00:00+00:00",
            "task_id": "test-operator"
          },
          "_BaseOperator__instantiated": true,
          "_dag": {
            "dag_id": "inlet_outlet_demo",
            "tags": []
          },
          "_log": "<Logger airflow.task.operators (DEBUG)>",
          "depends_on_past": false,
          "do_xcom_push": true,
          "downstream_task_ids": "{'end'}",
          "email": [],
          "email_on_failure": false,
          "email_on_retry": false,
          "executor_config": {},
          "ignore_first_depends_on_past": true,
          "inlets": [],
          "op_args": [],
          "op_kwargs": {
            "x": "Apache Airflow"
          },
          "outlets": [],
          "owner": "demo",
          "params": "{}",
          "pool": "default_pool",
          "pool_slots": 1,
          "priority_weight": 1,
          "python_callable": "<function python_operator at 0x403f2e4820>",
          "queue": "default",
          "retries": 0,
          "retry_delay": "0:05:00",
          "retry_exponential_backoff": false,
          "show_return_value_in_logs": true,
          "start_date": "2022-10-02T00:00:00+00:00",
          "task_group": "<airflow.utils.task_group.TaskGroup object at 0x403f48e1c0>",
          "task_id": "test-operator",
          "trigger_rule": "all_success",
          "upstream_task_ids": "{'begin'}",
          "wait_for_downstream": false,
          "weight_rule": "downstream"
        }
      },
      "nominalTime": {
        "_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.14.1/integration/airflow",
        "_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/NominalTimeRunFacet",
        "nominalStartTime": "2022-10-03T01:17:52.271583Z"
      },
      "parent": {
        "_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.14.1/integration/airflow",
        "_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/ParentRunFacet",
        "job": {
          "name": "inlet_outlet_demo",
          "namespace": "uninhabited-magnify-7821"
        },
        "run": {
          "runId": "8576adbc-4e4a-3abf-8b8a-38980de35b22"
        }
      },
      "parentRun": {
        "_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.14.1/integration/airflow",
        "_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/ParentRunFacet",
        "job": {
          "name": "inlet_outlet_demo",
          "namespace": "uninhabited-magnify-7821"
        },
        "run": {
          "runId": "8576adbc-4e4a-3abf-8b8a-38980de35b22"
        }
      },
      "unknownSourceAttribute": {
        "_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.14.1/integration/airflow",
        "_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet",
        "unknownItems": [
          {
            "name": "PythonOperator",
            "properties": {
              "_BaseOperator__from_mapped": false,
              "_BaseOperator__init_kwargs": {
                "depends_on_past": false,
                "email": [],
                "email_on_failure": false,
                "email_on_retry": false,
                "op_kwargs": {
                  "x": "Apache Airflow"
                },
                "owner": "demo",
                "python_callable": "<<non-serializable: function>>",
                "start_date": "<<non-serializable: DateTime>>",
                "task_id": "test-operator"
              },
              "_BaseOperator__instantiated": true,
              "_dag": "<<non-serializable: DAG>>",
              "_log": "<<non-serializable: Logger>>",
              "depends_on_past": false,
              "do_xcom_push": true,
              "downstream_task_ids": [],
              "email": [],
              "email_on_failure": false,
              "email_on_retry": false,
              "executor_config": {},
              "ignore_first_depends_on_past": true,
              "inlets": [],
              "op_args": [],
              "op_kwargs": {
                "x": "Apache Airflow"
              },
              "outlets": [],
              "owner": "demo",
              "params": "<<non-serializable: ParamsDict>>",
              "pool": "default_pool",
              "pool_slots": 1,
              "priority_weight": 1,
              "python_callable": "<<non-serializable: function>>",
              "queue": "default",
              "retries": 0,
              "retry_delay": "<<non-serializable: timedelta>>",
              "retry_exponential_backoff": false,
              "show_return_value_in_logs": true,
              "start_date": "<<non-serializable: DateTime>>",
              "task_group": "<<non-serializable: TaskGroup>>",
              "task_id": "test-operator",
              "trigger_rule": "all_success",
              "upstream_task_ids": [],
              "wait_for_downstream": false,
              "weight_rule": "downstream"
            },
            "type": "operator"
          }
        ]
      }
    },
    "runId": "701c44c4-d9c1-4845-951f-847cf0929383"
  }
}

COMPLETE

{
  "eventTime": "2022-10-03T01:17:56.708670Z",
  "eventType": "COMPLETE",
  "inputs": [],
  "job": {
    "facets": {},
    "name": "inlet_outlet_demo.test-operator",
    "namespace": "uninhabited-magnify-7821"
  },
  "outputs": [],
  "producer": "https://github.com/OpenLineage/OpenLineage/tree/0.14.1/integration/airflow",
  "run": {
    "facets": {
      "unknownSourceAttribute": {
        "_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.14.1/integration/airflow",
        "_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet",
        "unknownItems": [
          {
            "name": "PythonOperator",
            "properties": {
              "_BaseOperator__from_mapped": false,
              "_BaseOperator__init_kwargs": {
                "depends_on_past": false,
                "email": [],
                "email_on_failure": false,
                "email_on_retry": false,
                "op_kwargs": {
                  "x": "Apache Airflow"
                },
                "owner": "demo",
                "python_callable": "<<non-serializable: function>>",
                "start_date": "<<non-serializable: DateTime>>",
                "task_id": "test-operator"
              },
              "_BaseOperator__instantiated": true,
              "_dag": "<<non-serializable: DAG>>",
              "_log": "<<non-serializable: Logger>>",
              "depends_on_past": false,
              "do_xcom_push": true,
              "downstream_task_ids": [],
              "email": [],
              "email_on_failure": false,
              "email_on_retry": false,
              "executor_config": {},
              "ignore_first_depends_on_past": true,
              "inlets": [],
              "op_args": [],
              "op_kwargs": {
                "x": "Apache Airflow"
              },
              "outlets": [],
              "owner": "demo",
              "params": "<<non-serializable: ParamsDict>>",
              "pool": "default_pool",
              "pool_slots": 1,
              "priority_weight": 1,
              "python_callable": "<<non-serializable: function>>",
              "queue": "default",
              "retries": 0,
              "retry_delay": "<<non-serializable: timedelta>>",
              "retry_exponential_backoff": false,
              "show_return_value_in_logs": true,
              "start_date": "<<non-serializable: DateTime>>",
              "task_group": "<<non-serializable: TaskGroup>>",
              "task_id": "test-operator",
              "trigger_rule": "all_success",
              "upstream_task_ids": [],
              "wait_for_downstream": false,
              "weight_rule": "downstream"
            },
            "type": "operator"
          }
        ]
      }
    },
    "runId": "701c44c4-d9c1-4845-951f-847cf0929383"
  }
}
java.lang.IllegalStateException: Multiple values for optional: ['JobRow(uuid=a9d45c62-5963-49ef-81a7-4cc7f0a90382, type=BATCH, createdAt=2022-10-01T18:05:47.304297Z, updatedAt=2022-10-03T01:17:56.708670Z, namespaceName=uninhabited-magnify-7821, name=inlet_outlet_demo.test-operator, simpleName=inlet_outlet_demo.test-operator, parentJobName=null, description=Optional.empty, currentVersionUuid=Optional[a2588988-d184-49b5-ad71-8088dbfdc715], jobContextUuid=Optional[ba5cbdec-830b-4c71-b3fd-4666b455b524], location=null, inputs=[], symlinkTargetId=null)', 'JobRow(uuid=872fd8cf-0548-43e1-b330-bd820ca7e8c8, type=BATCH, createdAt=2022-10-01T18:05:44.931259Z, updatedAt=2022-10-03T01:17:56.187719Z, namespaceName=uninhabited-magnify-7821, name=inlet_outlet_demo.test-operator, simpleName=test-operator, parentJobName=inlet_outlet_demo, description=Optional.empty, currentVersionUuid=Optional.empty, jobContextUuid=Optional[ba5cbdec-830b-4c71-b3fd-4666b455b524], location=null, inputs=[], symlinkTargetId=null)', ...]
	at org.jdbi.v3.core.collector.OptionalBuilder.tooManyValues(OptionalBuilder.java:54)
	at org.jdbi.v3.core.collector.OptionalBuilder.set(OptionalBuilder.java:33)
	at java.base/java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
	at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
	at org.jdbi.v3.core.result.ResultIterator.forEachRemaining(ResultIterator.java:39)
	at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
	at org.jdbi.v3.core.result.ResultIterable.collect(ResultIterable.java:297)
	at org.jdbi.v3.sqlobject.statement.internal.ResultReturner$CollectedResultReturner.mappedResult(ResultReturner.java:275)
	at org.jdbi.v3.sqlobject.statement.internal.SqlQueryHandler.lambda$configureReturner$0(SqlQueryHandler.java:61)
	at org.jdbi.v3.sqlobject.statement.internal.CustomizingStatementHandler.invoke(CustomizingStatementHandler.java:187)
	at org.jdbi.v3.sqlobject.statement.internal.SqlQueryHandler.invoke(SqlQueryHandler.java:27)
	at org.jdbi.v3.sqlobject.internal.SqlObjectInitData$1.lambda$invoke$0(SqlObjectInitData.java:132)
	at org.jdbi.v3.core.internal.Invocations.invokeWith(Invocations.java:44)
	at org.jdbi.v3.core.internal.Invocations.invokeWith(Invocations.java:26)
	at org.jdbi.v3.core.LazyHandleSupplier.lambda$invokeInContext$1(LazyHandleSupplier.java:79)
	at org.jdbi.v3.core.internal.Invocations.invokeWith(Invocations.java:44)
	at org.jdbi.v3.core.internal.Invocations.invokeWith(Invocations.java:26)
	at org.jdbi.v3.core.LazyHandleSupplier.invokeInContext(LazyHandleSupplier.java:78)
	at org.jdbi.v3.sqlobject.internal.SqlObjectInitData$1.call(SqlObjectInitData.java:138)
	at org.jdbi.v3.sqlobject.internal.SqlObjectInitData$1.invoke(SqlObjectInitData.java:132)
	at org.jdbi.v3.sqlobject.SqlObjectFactory.lambda$attach$2(SqlObjectFactory.java:108)
	at org.jdbi.v3.core.internal.OnDemandExtensions.lambda$invoke$5(OnDemandExtensions.java:98)
	at org.jdbi.v3.core.internal.exceptions.Unchecked.lambda$function$4(Unchecked.java:76)
	at org.jdbi.v3.core.internal.OnDemandExtensions.invoke(OnDemandExtensions.java:98)
	at org.jdbi.v3.core.internal.OnDemandExtensions.lambda$createProxy$2(OnDemandExtensions.java:82)
	at org.jdbi.v3.core.Jdbi.callWithExtension(Jdbi.java:491)
	at org.jdbi.v3.core.Jdbi.withExtension(Jdbi.java:478)
	at org.jdbi.v3.core.internal.OnDemandExtensions.lambda$createProxy$3(OnDemandExtensions.java:82)
	at jdk.proxy3/jdk.proxy3.$Proxy130.findJobByNameAsRow(Unknown Source)
	at marquez.service.LineageService.getJobUuid(LineageService.java:206)
	at marquez.service.LineageService.lineage(LineageService.java:50)
	at com.datakin.api.service.LineageService.lineage(LineageService.java:37)
	at com.datakin.api.LineageResource.get(LineageResource.java:78)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52)
	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:134)
	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:177)
	at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$ResponseOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:176)
	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:81)
	at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:478)
	at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:400)
	at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:81)
	at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:255)
	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
	at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
	at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
	at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
	at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
	at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:234)
	at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:684)
	at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
	at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:358)
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:311)
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)
	at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:799)
	at org.eclipse.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1656)
	at io.dropwizard.servlets.ThreadNameFilter.doFilter(ThreadNameFilter.java:35)
	at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193)
	at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626)
	at io.dropwizard.jersey.filter.AllowedMethodsFilter.handle(AllowedMethodsFilter.java:47)
	at io.dropwizard.jersey.filter.AllowedMethodsFilter.doFilter(AllowedMethodsFilter.java:41)
	at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193)
	at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626)
	at marquez.tracing.TracingServletFilter.doFilter(TracingServletFilter.java:29)
	at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193)
	at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626)
	at com.datakin.tenant.TenantFilter.doFilter(TenantFilter.java:70)
	at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193)
	at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626)
	at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:552)
	at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
	at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1624)
	at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
	at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1440)
	at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
	at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:505)
	at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1594)
	at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
	at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1355)
	at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
	at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
	at com.codahale.metrics.jetty9.InstrumentedHandler.handle(InstrumentedHandler.java:284)
	at io.dropwizard.jetty.RoutingHandler.handle(RoutingHandler.java:52)
	at org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:772)
	at io.dropwizard.jetty.ZipExceptionHandlingGzipHandler.handle(ZipExceptionHandlingGzipHandler.java:26)
	at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:181)
	at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
	at org.eclipse.jetty.server.Server.handle(Server.java:516)
	at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:487)
	at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:732)
	at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:479)
	at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:277)
	at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
	at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105)
	at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338)
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315)
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173)
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131)
	at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:409)
	at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:883)
	at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1034)
	at java.base/java.lang.Thread.run(Thread.java:833)

This error is very identical to the issue reported #2122
And reporting that the issue may be related to extractor for PythonOperator.

Metadata

Metadata

Assignees

Labels

apiAPI layer changesbugSomething isn't workingduplicateThis issue or pull request already exists

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions