Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/centos7-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ jobs:

- name: Run other docker containers for test
run: |
# generating SSL certificates for Kafka
keytool -genkeypair -keyalg RSA -dname "CN=127.0.0.1" -alias 127.0.0.1 -keystore ./ci/pod/kafka/kafka-server/selfsigned.jks -validity 365 -keysize 2048 -storepass changeit
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it more appropriate to put it in the linux-ci-init-service.sh script ?

Copy link
Copy Markdown
Contributor Author

@bzp2010 bzp2010 May 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image
First make sure that the certificate exists for docker-compose to start kafka. If the certificate does not exist then the kafka container will crash.

make ci-env-up
./ci/linux-ci-init-service.sh

Expand Down
61 changes: 61 additions & 0 deletions apisix/plugins/kafka-proxy.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")


local schema = {
type = "object",
properties = {
sasl = {
type = "object",
properties = {
username = {
type = "string",
},
password = {
type = "string",
},
},
required = {"username", "password"},
},
},
}


local _M = {
version = 0.1,
priority = 508,
name = "kafka-proxy",
schema = schema,
}


function _M.check_schema(conf)
return core.schema.check(schema, conf)
end


function _M.access(conf, ctx)
if conf.sasl then
ctx.kafka_consumer_enable_sasl = true
ctx.kafka_consumer_sasl_username = conf.sasl.username
ctx.kafka_consumer_sasl_password = conf.sasl.password
end
end


return _M
12 changes: 12 additions & 0 deletions apisix/pubsub/kafka.lua
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,21 @@ function _M.access(api_ctx)
host = node.host,
port = node.port,
}

if api_ctx.kafka_consumer_enable_sasl then
broker_list[i].sasl_config = {
mechanism = "PLAIN",
user = api_ctx.kafka_consumer_sasl_username,
password = api_ctx.kafka_consumer_sasl_password,
}
end
end

local client_config = {refresh_interval = 30 * 60 * 1000}
if api_ctx.matched_upstream.tls then
client_config.ssl = true
client_config.ssl_verify = api_ctx.matched_upstream.tls.verify
end

-- load and create the consumer instance when it is determined
-- that the websocket connection was created successfully
Expand Down
11 changes: 10 additions & 1 deletion apisix/schema_def.lua
Original file line number Diff line number Diff line change
Expand Up @@ -406,8 +406,17 @@ local upstream_schema = {
properties = {
client_cert = certificate_scheme,
client_key = private_key_schema,
verify = {
type = "boolean",
description = "Turn on server certificate verification, "..
"currently only kafka upstream is supported",
default = false,
},
},
required = {"client_cert", "client_key"},
dependencies = {
client_cert = {"client_key"},
client_key = {"client_cert"},
}
},
keepalive_pool = {
type = "object",
Expand Down
2 changes: 1 addition & 1 deletion apisix/upstream.lua
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ local function check_upstream_conf(in_dp, conf)
end
end

if conf.tls then
if conf.tls and conf.tls.client_cert then
local cert = conf.tls.client_cert
local key = conf.tls.client_key
local ok, err = apisix_ssl.validate(cert, key)
Expand Down
3 changes: 3 additions & 0 deletions ci/linux_openresty_common_runner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
before_install() {
sudo cpanm --notest Test::Nginx >build.log 2>&1 || (cat build.log && exit 1)

# generating SSL certificates for Kafka
keytool -genkeypair -keyalg RSA -dname "CN=127.0.0.1" -alias 127.0.0.1 -keystore ./ci/pod/kafka/kafka-server/selfsigned.jks -validity 365 -keysize 2048 -storepass changeit

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, add this to linux-ci-init-service.sh script ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

# launch deps env
make ci-env-up
./ci/linux-ci-init-service.sh
Expand Down
12 changes: 12 additions & 0 deletions ci/pod/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,17 @@ services:
restart: unless-stopped
ports:
- "9092:9092"
- "9093:9093"
- "9094:9094"
depends_on:
- zookeeper-server1
- zookeeper-server2
networks:
kafka_net:
volumes:
- ./ci/pod/kafka/kafka-server/kafka_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf:ro
- ./ci/pod/kafka/kafka-server/selfsigned.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro
- ./ci/pod/kafka/kafka-server/selfsigned.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro

kafka-server2:
image: bitnami/kafka:2.8.1
Expand All @@ -88,11 +94,17 @@ services:
restart: unless-stopped
ports:
- "19092:9092"
- "19093:9093"
- "19094:9094"
depends_on:
- zookeeper-server1
- zookeeper-server2
networks:
kafka_net:
volumes:
- ./ci/pod/kafka/kafka-server/kafka_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf:ro
- ./ci/pod/kafka/kafka-server/selfsigned.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro
- ./ci/pod/kafka/kafka-server/selfsigned.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro


## Eureka
Expand Down
9 changes: 7 additions & 2 deletions ci/pod/kafka/kafka-server/env/common.env
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
ALLOW_PLAINTEXT_LISTENER=yes
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093,SASL_PLAINTEXT://0.0.0.0:9094
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9093,SASL_PLAINTEXT://127.0.0.1:9094
KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=
KAFKA_CFG_SSL_KEYSTORE_LOCATION=/opt/bitnami/kafka/config/certs/kafka.keystore.jks
KAFKA_CFG_SSL_KEYSTORE_PASSWORD=changeit
KAFKA_CFG_SSL_KEY_PASSWORD=changeit
23 changes: 23 additions & 0 deletions ci/pod/kafka/kafka-server/kafka_jaas.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
//
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};
1 change: 1 addition & 0 deletions conf/config-default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ plugins: # plugin list (sorted by priority)
- traffic-split # priority: 966
- redirect # priority: 900
- response-rewrite # priority: 899
- kafka-proxy # priority: 508
#- dubbo-proxy # priority: 507
- grpc-transcode # priority: 506
- grpc-web # priority: 505
Expand Down
3 changes: 2 additions & 1 deletion docs/en/latest/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@
"label": "Other Protocols",
"items": [
"plugins/dubbo-proxy",
"plugins/mqtt-proxy"
"plugins/mqtt-proxy",
"plugins/kafka-proxy"
]
}
]
Expand Down
80 changes: 80 additions & 0 deletions docs/en/latest/plugins/kafka-proxy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
---
title: kafka-proxy
keywords:
- APISIX
- Plugin
- Kafka proxy
description: This document contains information about the Apache APISIX kafka-proxy Plugin.
---

<!--
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
-->

## Description

The `kafka-proxy` plugin can be used to configure advanced parameters for the kafka upstream of Apache APISIX, such as SASL authentication.

## Attributes

| Name | Type | Required | Default | Valid values | Description |
|-------------------|---------|----------|---------|---------------|------------------------------------|
| sasl | object | optional | | {"username": "user", "password" :"pwd"} | SASL/PLAIN authentication configuration, when this configuration exists, turn on SASL authentication; this object will contain two parameters username and password, they must be configured. |
| sasl.username | string | required | | | SASL/PLAIN authentication username |
| sasl.password | string | required | | | SASL/PLAIN authentication password |

:::note
If SASL authentication is enabled, the `sasl.username` and `sasl.password` must be set.
The current SASL authentication only supports PLAIN mode, which is the username password login method.
:::

## Example usage

When we use scheme as the upstream of kafka, we can add kafka authentication configuration to it through this plugin.

```shell
curl -X PUT 'http://127.0.0.1:9080/apisix/admin/routes/r1' \
-H 'X-API-KEY: <api-key>' \
-H 'Content-Type: application/json' \
-d '{
"uri": "/kafka",
"plugins": {
"kafka-proxy": {
"sasl": {
"username": "user",
"password": "pwd"
}
}
},
"upstream": {
"nodes": {
"kafka-server1:9092": 1,
"kafka-server2:9092": 1,
"kafka-server3:9092": 1
},
"type": "none",
"scheme": "kafka"
}
}'
```

Now, we can test it by connecting to the `/kafka` endpoint via websocket.

## Disable Plugin

To disable the `kafka-proxy` Plugin, you can delete the corresponding JSON configuration from the Plugin configuration. APISIX will automatically reload and you do not have to restart for this to take effect.
33 changes: 33 additions & 0 deletions docs/en/latest/pubsub/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,36 @@ curl -X PUT 'http://127.0.0.1:9080/apisix/admin/routes/kafka' \
```

After configuring the route, you can use this feature.

#### Enabling TLS and SASL/PLAIN authentication

Simply turn on the `kafka-proxy` plugin on the created route and enable the Kafka TLS handshake and SASL authentication through the configuration, which can be found in the [plugin documentation](../../../en/latest/plugins/kafka-proxy.md).

```shell
curl -X PUT 'http://127.0.0.1:9080/apisix/admin/routes/kafka' \
-H 'X-API-KEY: <api-key>' \
-H 'Content-Type: application/json' \
-d '{
"uri": "/kafka",
"plugins": {
"kafka-proxy": {
"sasl": {
"username": "user",
"password": "pwd"
}
}
},
"upstream": {
"nodes": {
"kafka-server1:9092": 1,
"kafka-server2:9092": 1,
"kafka-server3:9092": 1
},
"type": "none",
"scheme": "kafka",
"tls": {
"verify": true
}
}
}'
```
1 change: 1 addition & 0 deletions t/admin/plugins.t
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ server-info
traffic-split
redirect
response-rewrite
kafka-proxy
grpc-transcode
grpc-web
public-api
Expand Down
Loading