Skip to content

Commit 4a57df7

Browse files
bzp2010Liu-Junlin
authored andcommitted
feat(pubsub): support kafka tls and sasl/plain auth (apache#7046)
1 parent e1defb7 commit 4a57df7

File tree

18 files changed

+494
-7
lines changed

18 files changed

+494
-7
lines changed

.github/workflows/centos7-ci.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ jobs:
7979
8080
- name: Run other docker containers for test
8181
run: |
82+
# generating SSL certificates for Kafka
83+
keytool -genkeypair -keyalg RSA -dname "CN=127.0.0.1" -alias 127.0.0.1 -keystore ./ci/pod/kafka/kafka-server/selfsigned.jks -validity 365 -keysize 2048 -storepass changeit
8284
make ci-env-up
8385
./ci/linux-ci-init-service.sh
8486

apisix/plugins/kafka-proxy.lua

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
--
2+
-- Licensed to the Apache Software Foundation (ASF) under one or more
3+
-- contributor license agreements. See the NOTICE file distributed with
4+
-- this work for additional information regarding copyright ownership.
5+
-- The ASF licenses this file to You under the Apache License, Version 2.0
6+
-- (the "License"); you may not use this file except in compliance with
7+
-- the License. You may obtain a copy of the License at
8+
--
9+
-- http://www.apache.org/licenses/LICENSE-2.0
10+
--
11+
-- Unless required by applicable law or agreed to in writing, software
12+
-- distributed under the License is distributed on an "AS IS" BASIS,
13+
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
-- See the License for the specific language governing permissions and
15+
-- limitations under the License.
16+
--
17+
local core = require("apisix.core")
18+
19+
20+
local schema = {
21+
type = "object",
22+
properties = {
23+
sasl = {
24+
type = "object",
25+
properties = {
26+
username = {
27+
type = "string",
28+
},
29+
password = {
30+
type = "string",
31+
},
32+
},
33+
required = {"username", "password"},
34+
},
35+
},
36+
}
37+
38+
39+
local _M = {
40+
version = 0.1,
41+
priority = 508,
42+
name = "kafka-proxy",
43+
schema = schema,
44+
}
45+
46+
47+
function _M.check_schema(conf)
48+
return core.schema.check(schema, conf)
49+
end
50+
51+
52+
function _M.access(conf, ctx)
53+
if conf.sasl then
54+
ctx.kafka_consumer_enable_sasl = true
55+
ctx.kafka_consumer_sasl_username = conf.sasl.username
56+
ctx.kafka_consumer_sasl_password = conf.sasl.password
57+
end
58+
end
59+
60+
61+
return _M

apisix/pubsub/kafka.lua

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,21 @@ function _M.access(api_ctx)
6969
host = node.host,
7070
port = node.port,
7171
}
72+
73+
if api_ctx.kafka_consumer_enable_sasl then
74+
broker_list[i].sasl_config = {
75+
mechanism = "PLAIN",
76+
user = api_ctx.kafka_consumer_sasl_username,
77+
password = api_ctx.kafka_consumer_sasl_password,
78+
}
79+
end
7280
end
7381

7482
local client_config = {refresh_interval = 30 * 60 * 1000}
83+
if api_ctx.matched_upstream.tls then
84+
client_config.ssl = true
85+
client_config.ssl_verify = api_ctx.matched_upstream.tls.verify
86+
end
7587

7688
-- load and create the consumer instance when it is determined
7789
-- that the websocket connection was created successfully

apisix/schema_def.lua

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -406,8 +406,17 @@ local upstream_schema = {
406406
properties = {
407407
client_cert = certificate_scheme,
408408
client_key = private_key_schema,
409+
verify = {
410+
type = "boolean",
411+
description = "Turn on server certificate verification, "..
412+
"currently only kafka upstream is supported",
413+
default = false,
414+
},
409415
},
410-
required = {"client_cert", "client_key"},
416+
dependencies = {
417+
client_cert = {"client_key"},
418+
client_key = {"client_cert"},
419+
}
411420
},
412421
keepalive_pool = {
413422
type = "object",

apisix/upstream.lua

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ local function check_upstream_conf(in_dp, conf)
435435
end
436436
end
437437

438-
if conf.tls then
438+
if conf.tls and conf.tls.client_cert then
439439
local cert = conf.tls.client_cert
440440
local key = conf.tls.client_key
441441
local ok, err = apisix_ssl.validate(cert, key)

ci/linux_openresty_common_runner.sh

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121
before_install() {
2222
sudo cpanm --notest Test::Nginx >build.log 2>&1 || (cat build.log && exit 1)
2323

24+
# generating SSL certificates for Kafka
25+
keytool -genkeypair -keyalg RSA -dname "CN=127.0.0.1" -alias 127.0.0.1 -keystore ./ci/pod/kafka/kafka-server/selfsigned.jks -validity 365 -keysize 2048 -storepass changeit
26+
2427
# launch deps env
2528
make ci-env-up
2629
./ci/linux-ci-init-service.sh

ci/pod/docker-compose.yml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,17 @@ services:
7373
restart: unless-stopped
7474
ports:
7575
- "9092:9092"
76+
- "9093:9093"
77+
- "9094:9094"
7678
depends_on:
7779
- zookeeper-server1
7880
- zookeeper-server2
7981
networks:
8082
kafka_net:
83+
volumes:
84+
- ./ci/pod/kafka/kafka-server/kafka_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf:ro
85+
- ./ci/pod/kafka/kafka-server/selfsigned.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro
86+
- ./ci/pod/kafka/kafka-server/selfsigned.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro
8187

8288
kafka-server2:
8389
image: bitnami/kafka:2.8.1
@@ -88,11 +94,17 @@ services:
8894
restart: unless-stopped
8995
ports:
9096
- "19092:9092"
97+
- "19093:9093"
98+
- "19094:9094"
9199
depends_on:
92100
- zookeeper-server1
93101
- zookeeper-server2
94102
networks:
95103
kafka_net:
104+
volumes:
105+
- ./ci/pod/kafka/kafka-server/kafka_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf:ro
106+
- ./ci/pod/kafka/kafka-server/selfsigned.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro
107+
- ./ci/pod/kafka/kafka-server/selfsigned.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro
96108

97109

98110
## Eureka
Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
11
ALLOW_PLAINTEXT_LISTENER=yes
2-
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
3-
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
2+
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
3+
KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093,SASL_PLAINTEXT://0.0.0.0:9094
4+
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9093,SASL_PLAINTEXT://127.0.0.1:9094
5+
KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=
6+
KAFKA_CFG_SSL_KEYSTORE_LOCATION=/opt/bitnami/kafka/config/certs/kafka.keystore.jks
7+
KAFKA_CFG_SSL_KEYSTORE_PASSWORD=changeit
8+
KAFKA_CFG_SSL_KEY_PASSWORD=changeit
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
//
2+
// Licensed to the Apache Software Foundation (ASF) under one or more
3+
// contributor license agreements. See the NOTICE file distributed with
4+
// this work for additional information regarding copyright ownership.
5+
// The ASF licenses this file to You under the Apache License, Version 2.0
6+
// (the "License"); you may not use this file except in compliance with
7+
// the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
//
17+
18+
KafkaServer {
19+
org.apache.kafka.common.security.plain.PlainLoginModule required
20+
username="admin"
21+
password="admin-secret"
22+
user_admin="admin-secret";
23+
};

conf/config-default.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,7 @@ plugins: # plugin list (sorted by priority)
374374
- traffic-split # priority: 966
375375
- redirect # priority: 900
376376
- response-rewrite # priority: 899
377+
- kafka-proxy # priority: 508
377378
#- dubbo-proxy # priority: 507
378379
- grpc-transcode # priority: 506
379380
- grpc-web # priority: 505

0 commit comments

Comments
 (0)