Skip to content

Proposal: support subscribe to messages as Kafka consumer #6874

@bzp2010

Description

@bzp2010

Background

Apache Kafka is an open source stream processing platform developed by Apache and written in Scala and Java. It is a high throughput distributed publish-subscribe messaging system that handles all action stream data from consumers in a website.

Previously, APISIX was only supported as a producer for Kafka to be used as a logger. Therefore, we wanted to add this feature to APISIX to allow end users to pull messages from Kafka via a protocol such as WebSocket so that APISIX could support the publish/subscribe scenario.

Scheme

Overview

We thought it would be a good idea to treat Kafka as an upstream type, reuse the upstream object as the data source, and use a route plugin to configure authentication for Kafka broker.

First, We'll create a plugin that provides a simple function of writing the plugin configuration into an APISIX variable(ctx).
Next, add kafka as the upstream scheme to the upstream schema definition, add a validation mode that supports filling in only nodes and scheme, and refine the schema validation code for it.
Finally, the Kafka consumer logic will be entered in the http access phase entry code based on the upstream scheme field.

Kafka service

This will open the WebSocket service on the configured route uri and interact with the client via a protobuf-like based protocol. At this stage, we do not support Kafka Consumer Group capability, so we will implement a simple version where only the client manages the consumer offset, which is passed in actively by the client when pulling messages.

Protocol

Currently, we expect to use protobuf as the communication encoding protocol to deliver the command and response results.

message Request {
    string command = 1;
    bytes data = 2;
}

message Response {
    bool success = 1;
    string msg = 2;
    bytes data = 3;
}

Configure

Core

We need to extend the upstream configuration to support both cases, filling in only scheme+nodes.

Plugin

Name Type Required Default Desc
enable_tls boolean no false Is enable TLS handshark
ssl_verify boolean no false Is enable SSL cert verify
enable_sasl boolean no false Is enable sasl for authenticate
sasl_username string no "" sasl username
sasl_password string no "" sasl password

Implementation

  • Creating the kafka-proxy plugin: writing the plugin configuration dynamically into ctx
  • Extend the http_access_phase processing code: initialize the consumer when the scheme is kafka
  • Creating a websocket server: handling client requests

Metadata

Metadata

Assignees

Labels

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions