Added parsing otel json to span from kafka topic to Span#5681
Added parsing otel json to span from kafka topic to Span#5681Mamol27 wants to merge 9 commits intoopensearch-project:mainfrom
Conversation
Signed-off-by: mamol27 <mamol27@yandex.ru>
KarstenSchnitter
left a comment
There was a problem hiding this comment.
Thanks for providing this PR. I did not review the tests. But I have some comments on the main implementation.
...ins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java
Outdated
Show resolved
Hide resolved
...ins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java
Outdated
Show resolved
Hide resolved
| final long receivedTimeStamp = getRecordTimeStamp(consumerRecord, Instant.now().toEpochMilli()); | ||
|
|
||
| KafkaOtelJsonTraceParser kafkaOtelJsonTraceParser = new KafkaOtelJsonTraceParser(); | ||
| kafkaOtelJsonTraceParser.parse(consumerRecord.value().toString(), Instant.ofEpochMilli(receivedTimeStamp), (record) -> { |
There was a problem hiding this comment.
@kkondaka: I think this implementation could be improved, if the type of consumerRecord.value() was known. If it was byte[], ByteBuffer or Bytes, it could be wrapped in an java.io.Reader implementation that avoids generating an intermediate string. The JsonFormat.parser().merge(...) would be fine with such a reader. Unfortunately, if I understand the code correctly, the type T is a free placeholder. Unless any valid assumptions on the type can be made, this improvement is not possible.
There was a problem hiding this comment.
Using an InputCodec as I suggested elsewhere would allow us to read from an InputStream. So you could have new ByteArrayInputStream(consumerRecord.value()).
There was a problem hiding this comment.
consumerRecord.value() isn't byte[] it has type T
.../src/main/java/org/opensearch/dataprepper/plugins/kafka/parser/KafkaOtelJsonTraceParser.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/opensearch/dataprepper/plugins/kafka/parser/KafkaOtelJsonTraceParser.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/opensearch/dataprepper/plugins/kafka/parser/KafkaOtelJsonTraceParser.java
Outdated
Show resolved
Hide resolved
...kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/MessageFormat.java
Outdated
Show resolved
Hide resolved
| processRecord(acknowledgementSet, record); | ||
| } | ||
| } else if (schema == MessageFormat.JSON_OTEL_TRACE) { | ||
| final long receivedTimeStamp = getRecordTimeStamp(consumerRecord, Instant.now().toEpochMilli()); |
There was a problem hiding this comment.
Why is this code not common to all reads?
There was a problem hiding this comment.
because it process many records instead one to other
| final long receivedTimeStamp = getRecordTimeStamp(consumerRecord, Instant.now().toEpochMilli()); | ||
|
|
||
| KafkaOtelJsonTraceParser kafkaOtelJsonTraceParser = new KafkaOtelJsonTraceParser(); | ||
| kafkaOtelJsonTraceParser.parse(consumerRecord.value().toString(), Instant.ofEpochMilli(receivedTimeStamp), (record) -> { |
There was a problem hiding this comment.
Using an InputCodec as I suggested elsewhere would allow us to read from an InputStream. So you could have new ByteArrayInputStream(consumerRecord.value()).
Signed-off-by: mamol27 <mamol27@yandex.ru>
Parsing with TracesData.Builder have some issuies Signed-off-by: mamol27 <mamol27@yandex.ru>
Parsing with TracesData.Builder have some issuies Signed-off-by: mamol27 <mamol27@yandex.ru>
|
@dlvenable @KarstenSchnitter |
Signed-off-by: mamol27 <mamol27@yandex.ru>
Signed-off-by: mamol27 <mamol27@yandex.ru>
# Conflicts: # data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java # data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java
Signed-off-by: mamol27 <mamol27@yandex.ru>
...src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java
Outdated
Show resolved
Hide resolved
...-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTraceInputCodec.java
Outdated
Show resolved
Hide resolved
...n/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTraceInputCodecConfig.java
Outdated
Show resolved
Hide resolved
...n/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTraceInputCodecConfig.java
Outdated
Show resolved
Hide resolved
...common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OtelTraceJsonDecoder.java
Show resolved
Hide resolved
|
@Mamol27 sorry for the delay in reviewing this. Please address the comments. |
Signed-off-by: mamol27 <mamol27@yandex.ru>

Description
Parsing of OpenTelemetry trace in JSON format into Span has been implemented to support trace handling via Kafka.
We had to implement it, as the client requires all data to be sent exclusively through Kafka.
To implement this, a new MessageFormat was added.
Sorry for creating new PR I've got a problem with solving DCO. (Old PR #5590)
Issues Resolved
Resolves #5446
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.