Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/centos7-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ jobs:

- name: Run other docker containers for test
run: |
# generating SSL certificates for Kafka
keytool -genkeypair -keyalg RSA -dname "CN=127.0.0.1" -alias 127.0.0.1 -keystore ./ci/pod/kafka/kafka-server/selfsigned.jks -validity 365 -keysize 2048 -storepass changeit
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it more appropriate to put it in the linux-ci-init-service.sh script ?

Copy link
Copy Markdown
Contributor Author

@bzp2010 bzp2010 May 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image
First make sure that the certificate exists for docker-compose to start kafka. If the certificate does not exist then the kafka container will crash.

make ci-env-up
./ci/linux-ci-init-service.sh

Expand Down
63 changes: 63 additions & 0 deletions apisix/plugins/kafka-proxy.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")


local schema = {
type = "object",
properties = {
sasl = {
type = "object",
properties = {
username = {
type = "string",
default = "",
},
password = {
type = "string",
default = "",
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove the default if these fields are required?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

},
},
required = {"username", "password"},
},
},
}


local _M = {
version = 0.1,
priority = 508,
name = "kafka-proxy",
schema = schema,
}


function _M.check_schema(conf)
return core.schema.check(schema, conf)
end


function _M.access(conf, ctx)
if conf.sasl then
ctx.kafka_consumer_enable_sasl = true
ctx.kafka_consumer_sasl_username = conf.sasl.username
ctx.kafka_consumer_sasl_password = conf.sasl.password
end
end


return _M
12 changes: 12 additions & 0 deletions apisix/pubsub/kafka.lua
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,21 @@ function _M.access(api_ctx)
host = node.host,
port = node.port,
}

if api_ctx.kafka_consumer_enable_sasl then
broker_list[i].sasl_config = {
mechanism = "PLAIN",
user = api_ctx.kafka_consumer_sasl_username,
password = api_ctx.kafka_consumer_sasl_password,
}
end
end

local client_config = {refresh_interval = 30 * 60 * 1000}
if api_ctx.matched_upstream.tls then
client_config.ssl = true
client_config.ssl_verify = api_ctx.matched_upstream.tls.verify
end

-- load and create the consumer instance when it is determined
-- that the websocket connection was created successfully
Expand Down
11 changes: 10 additions & 1 deletion apisix/schema_def.lua
Original file line number Diff line number Diff line change
Expand Up @@ -406,8 +406,17 @@ local upstream_schema = {
properties = {
client_cert = certificate_scheme,
client_key = private_key_schema,
verify = {
type = "boolean",
description = "Turn on server certificate verification, "..
"currently only kafka upstream is supported",
default = false,
},
},
required = {"client_cert", "client_key"},
dependencies = {
client_cert = {"client_key"},
client_key = {"client_cert"},
}
},
keepalive_pool = {
type = "object",
Expand Down
2 changes: 1 addition & 1 deletion apisix/upstream.lua
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ local function check_upstream_conf(in_dp, conf)
end
end

if conf.tls then
if conf.tls and conf.tls.client_cert and conf.tls.client_key then
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if conf.tls and conf.tls.client_cert and conf.tls.client_key then
if conf.tls and conf.tls.client_cert then

is enough?

Copy link
Copy Markdown
Contributor Author

@bzp2010 bzp2010 May 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's enough, we ensure client_cert and client_key both exist by jsonschema's dependencies. Any one of them separate exist is forbidden.

dependencies = {
client_cert = {"client_key"},
client_key = {"client_cert"},
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

local cert = conf.tls.client_cert
local key = conf.tls.client_key
local ok, err = apisix_ssl.validate(cert, key)
Expand Down
3 changes: 3 additions & 0 deletions ci/linux_openresty_common_runner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
before_install() {
sudo cpanm --notest Test::Nginx >build.log 2>&1 || (cat build.log && exit 1)

# generating SSL certificates for Kafka
keytool -genkeypair -keyalg RSA -dname "CN=127.0.0.1" -alias 127.0.0.1 -keystore ./ci/pod/kafka/kafka-server/selfsigned.jks -validity 365 -keysize 2048 -storepass changeit

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, add this to linux-ci-init-service.sh script ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

# launch deps env
make ci-env-up
./ci/linux-ci-init-service.sh
Expand Down
12 changes: 12 additions & 0 deletions ci/pod/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,17 @@ services:
restart: unless-stopped
ports:
- "9092:9092"
- "9093:9093"
- "9094:9094"
depends_on:
- zookeeper-server1
- zookeeper-server2
networks:
kafka_net:
volumes:
- ./ci/pod/kafka/kafka-server/kafka_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf:ro
- ./ci/pod/kafka/kafka-server/selfsigned.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro
- ./ci/pod/kafka/kafka-server/selfsigned.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro

kafka-server2:
image: bitnami/kafka:2.8.1
Expand All @@ -88,11 +94,17 @@ services:
restart: unless-stopped
ports:
- "19092:9092"
- "19093:9093"
- "19094:9094"
depends_on:
- zookeeper-server1
- zookeeper-server2
networks:
kafka_net:
volumes:
- ./ci/pod/kafka/kafka-server/kafka_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf:ro
- ./ci/pod/kafka/kafka-server/selfsigned.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro
- ./ci/pod/kafka/kafka-server/selfsigned.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro


## Eureka
Expand Down
9 changes: 7 additions & 2 deletions ci/pod/kafka/kafka-server/env/common.env
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
ALLOW_PLAINTEXT_LISTENER=yes
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093,SASL_PLAINTEXT://0.0.0.0:9094
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9093,SASL_PLAINTEXT://127.0.0.1:9094
KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=
KAFKA_CFG_SSL_KEYSTORE_LOCATION=/opt/bitnami/kafka/config/certs/kafka.keystore.jks
KAFKA_CFG_SSL_KEYSTORE_PASSWORD=changeit
KAFKA_CFG_SSL_KEY_PASSWORD=changeit
23 changes: 23 additions & 0 deletions ci/pod/kafka/kafka-server/kafka_jaas.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
//
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};
1 change: 1 addition & 0 deletions conf/config-default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ plugins: # plugin list (sorted by priority)
- traffic-split # priority: 966
- redirect # priority: 900
- response-rewrite # priority: 899
- kafka-proxy # priority: 508
#- dubbo-proxy # priority: 507
- grpc-transcode # priority: 506
- grpc-web # priority: 505
Expand Down
33 changes: 33 additions & 0 deletions docs/en/latest/pubsub/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,36 @@ curl -X PUT 'http://127.0.0.1:9080/apisix/admin/routes/kafka' \
```

After configuring the route, you can use this feature.

#### Enabling TLS and SASL/PLAIN authentication

Simply turn on the `kafka-proxy` plugin on the created route and enable the Kafka TLS handshake and SASL authentication through the configuration, which can be found in the [plugin documentation](../../../en/latest/plugins/kafka-proxy.md).

```shell
curl -X PUT 'http://127.0.0.1:9080/apisix/admin/routes/kafka' \
-H 'X-API-KEY: <api-key>' \
-H 'Content-Type: application/json' \
-d '{
"uri": "/kafka",
"plugins": {
"kafka-proxy": {
"sasl": {
"username": "user",
"password": "pwd"
}
}
},
"upstream": {
"nodes": {
"kafka-server1:9092": 1,
"kafka-server2:9092": 1,
"kafka-server3:9092": 1
},
"type": "none",
"scheme": "kafka",
"tls": {
"verify": true
}
}
}'
```
1 change: 1 addition & 0 deletions t/admin/plugins.t
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ server-info
traffic-split
redirect
response-rewrite
kafka-proxy
grpc-transcode
grpc-web
public-api
Expand Down
52 changes: 52 additions & 0 deletions t/admin/upstream5.t
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,55 @@ __DATA__
}
--- response_body
passed



=== TEST 2: set upstream(empty tls)
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin")
local code, body = t.test("/apisix/admin/upstreams/kafka", ngx.HTTP_PUT, [[{
"nodes": {
"127.0.0.1:9092": 1
},
"type": "none",
"scheme": "kafka",
"tls": {}
}]])

if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}
--- response_body
passed



=== TEST 3: set upstream(tls without verify)
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin")
local code, body = t.test("/apisix/admin/upstreams/kafka", ngx.HTTP_PUT, [[{
"nodes": {
"127.0.0.1:9092": 1
},
"type": "none",
"scheme": "kafka",
"tls": {
"verify": false
}
}]])

if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}
--- response_body
passed
2 changes: 1 addition & 1 deletion t/node/upstream-mtls.t
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ __DATA__
GET /t
--- error_code: 400
--- response_body
{"error_msg":"invalid configuration: property \"upstream\" validation failed: property \"tls\" validation failed: property \"client_key\" is required"}
{"error_msg":"invalid configuration: property \"upstream\" validation failed: property \"tls\" validation failed: property \"client_key\" is required when \"client_cert\" is set"}



Expand Down
61 changes: 61 additions & 0 deletions t/plugin/kafka-proxy.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
use t::APISIX 'no_plan';

repeat_each(1);
no_long_string();
no_root_location();

add_block_preprocessor(sub {
my ($block) = @_;

if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
$block->set_value("no_error_log", "[error]");
}

if (!defined $block->request) {
$block->set_value("request", "GET /t");
}
});

run_tests();

__DATA__

=== TEST 1: sanity
--- config
location /t {
content_by_lua_block {
local test_cases = {
{},
{sasl = {username = "user", password = "pwd"}},
{sasl = {username = "user"}},
{sasl = {username = "user", password = 1234}},
}
local plugin = require("apisix.plugins.kafka-proxy")

for _, case in ipairs(test_cases) do
local ok, err = plugin.check_schema(case)
ngx.say(ok and "done" or err)
end
}
}
--- response_body
done
done
property "sasl" validation failed: property "password" is required
property "sasl" validation failed: property "password" validation failed: wrong type: expected string, got number
Loading