Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/centos7-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ jobs:

- name: Run other docker containers for test
run: |
# generating SSL certificates for Kafka
keytool -genkeypair -keyalg RSA -dname "CN=127.0.0.1" -alias 127.0.0.1 -keystore ./ci/pod/kafka/kafka-server/selfsigned.jks -validity 365 -keysize 2048 -storepass changeit
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it more appropriate to put it in the linux-ci-init-service.sh script ?

Copy link
Copy Markdown
Contributor Author

@bzp2010 bzp2010 May 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image
First make sure that the certificate exists for docker-compose to start kafka. If the certificate does not exist then the kafka container will crash.

make ci-env-up
./ci/linux-ci-init-service.sh

Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,9 @@ install: runtime
$(ENV_INSTALL) apisix/discovery/kubernetes/*.lua $(ENV_INST_LUADIR)/apisix/discovery/kubernetes
$(ENV_INSTALL) apisix/discovery/tars/*.lua $(ENV_INST_LUADIR)/apisix/discovery/tars

$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/pubsub
$(ENV_INSTALL) apisix/pubsub/*.lua $(ENV_INST_LUADIR)/apisix/pubsub/

$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/http
$(ENV_INSTALL) apisix/http/*.lua $(ENV_INST_LUADIR)/apisix/http/

Expand Down
57 changes: 52 additions & 5 deletions apisix/include/apisix/model/pubsub.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,24 @@ message CmdPing {
*/
message CmdEmpty {}

/**
* Get the offset of the specified topic partition from Apache Kafka.
*/
message CmdKafkaListOffset {
string topic = 1;
int32 partition = 2;
int64 timestamp = 3;
}

/**
* Fetch messages of the specified topic partition from Apache Kafka.
*/
message CmdKafkaFetch {
string topic = 1;
int32 partition = 2;
int64 offset = 3;
}

/**
* Client request definition for pubsub scenarios
*
Expand All @@ -55,16 +73,18 @@ message CmdEmpty {}
message PubSubReq {
int64 sequence = 1;
oneof req {
CmdEmpty cmd_empty = 31;
CmdPing cmd_ping = 32;
CmdEmpty cmd_empty = 31;
CmdPing cmd_ping = 32;
CmdKafkaFetch cmd_kafka_fetch = 33;
CmdKafkaListOffset cmd_kafka_list_offset = 34;
};
}

/**
* The response body of the service when an error occurs,
* containing the error code and the error message.
*/
message ErrorResp {
message ErrorResp {
int32 code = 1;
string message = 2;
}
Expand All @@ -77,6 +97,31 @@ message PongResp {
bytes state = 1;
}

/**
* The definition of a message in Kafka with the current message
* offset, production timestamp, Key, and message content.
*/
message KafkaMessage {
int64 offset = 1;
int64 timestamp = 2;
bytes key = 3;
bytes value = 4;
}

/**
* The response of Fetch messages from Apache Kafka.
*/
message KafkaFetchResp {
repeated KafkaMessage messages = 1;
}

/**
* The response of list offset from Apache Kafka.
*/
message KafkaListOffsetResp {
int64 offset = 1;
}

/**
* Server response definition for pubsub scenarios
*
Expand All @@ -90,7 +135,9 @@ message PongResp {
message PubSubResp {
int64 sequence = 1;
oneof resp {
ErrorResp error_resp = 31;
PongResp pong_resp = 32;
ErrorResp error_resp = 31;
PongResp pong_resp = 32;
KafkaFetchResp kafka_fetch_resp = 33;
KafkaListOffsetResp kafka_list_offset_resp = 34;
};
}
8 changes: 8 additions & 0 deletions apisix/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ local xrpc = require("apisix.stream.xrpc")
local ctxdump = require("resty.ctxdump")
local ngx_balancer = require("ngx.balancer")
local debug = require("apisix.debug")
local pubsub_kafka = require("apisix.pubsub.kafka")
local ngx = ngx
local get_method = ngx.req.get_method
local ngx_exit = ngx.exit
Expand Down Expand Up @@ -504,6 +505,13 @@ function _M.http_access_phase()
api_ctx.upstream_scheme = "grpc"
end

-- load balancer is not required by kafka upstream, so the upstream
-- node selection process is intercepted and left to kafka to
-- handle on its own
if api_ctx.matched_upstream and api_ctx.matched_upstream.scheme == "kafka" then
return pubsub_kafka.access(api_ctx)
end

local code, err = set_upstream(route, api_ctx)
if code then
core.log.error("failed to set upstream: ", err)
Expand Down
63 changes: 63 additions & 0 deletions apisix/plugins/kafka-proxy.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")


local schema = {
type = "object",
properties = {
sasl = {
type = "object",
properties = {
username = {
type = "string",
default = "",
},
password = {
type = "string",
default = "",
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove the default if these fields are required?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

},
},
required = {"username", "password"},
},
},
}


local _M = {
version = 0.1,
priority = 508,
name = "kafka-proxy",
schema = schema,
}


function _M.check_schema(conf)
return core.schema.check(schema, conf)
end


function _M.access(conf, ctx)
if conf.sasl then
ctx.kafka_consumer_enable_sasl = true
ctx.kafka_consumer_sasl_username = conf.sasl.username
ctx.kafka_consumer_sasl_password = conf.sasl.password
end
end


return _M
145 changes: 145 additions & 0 deletions apisix/pubsub/kafka.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--

local core = require("apisix.core")
local bconsumer = require("resty.kafka.basic-consumer")
local ffi = require("ffi")
local C = ffi.C
local tostring = tostring
local type = type
local ipairs = ipairs
local str_sub = string.sub

ffi.cdef[[
int64_t atoll(const char *num);
]]


local _M = {}


-- Handles the conversion of 64-bit integers in the lua-protobuf.
--
-- Because of the limitations of luajit, we cannot use native 64-bit
-- numbers, so pb decode converts int64 to a string in #xxx format
-- to avoid loss of precision, by this function, we convert this
-- string to int64 cdata numbers.
local function pb_convert_to_int64(src)
if type(src) == "string" then
return C.atoll(ffi.cast("char *", src) + 1)
else
return src
end
end


-- Takes over requests of type kafka upstream in the http_access phase.
function _M.access(api_ctx)
local pubsub, err = core.pubsub.new()
if not pubsub then
core.log.error("failed to initialize pubsub module, err: ", err)
core.response.exit(400)
return
end

local up_nodes = api_ctx.matched_upstream.nodes

-- kafka client broker-related configuration
local broker_list = {}
for i, node in ipairs(up_nodes) do
broker_list[i] = {
host = node.host,
port = node.port,
}

if api_ctx.kafka_consumer_enable_sasl then
broker_list[i].sasl_config = {
mechanism = "PLAIN",
user = api_ctx.kafka_consumer_sasl_username,
password = api_ctx.kafka_consumer_sasl_password,
}
end
end

local client_config = {refresh_interval = 30 * 60 * 1000}
if api_ctx.matched_upstream.tls then
client_config.ssl = true
client_config.ssl_verify = api_ctx.matched_upstream.tls.verify
end

-- load and create the consumer instance when it is determined
-- that the websocket connection was created successfully
local consumer = bconsumer:new(broker_list, client_config)

pubsub:on("cmd_kafka_list_offset", function (params)
-- The timestamp parameter uses a 64-bit integer, which is difficult
-- for luajit to handle well, so the int64_as_string option in
-- lua-protobuf is used here. Smaller numbers will be decoded as
-- lua number, while overly larger numbers will be decoded as strings
-- in the format #number, where the # symbol at the beginning of the
-- string will be removed and converted to int64_t with the atoll function.
local timestamp = pb_convert_to_int64(params.timestamp)

local offset, err = consumer:list_offset(params.topic, params.partition, timestamp)

if not offset then
return nil, "failed to list offset, topic: " .. params.topic ..
", partition: " .. params.partition .. ", err: " .. err
end

offset = tostring(offset)
return {
kafka_list_offset_resp = {
offset = str_sub(offset, 1, #offset - 2)
}
}
end)

pubsub:on("cmd_kafka_fetch", function (params)
local offset = pb_convert_to_int64(params.offset)

local ret, err = consumer:fetch(params.topic, params.partition, offset)
if not ret then
return nil, "failed to fetch message, topic: " .. params.topic ..
", partition: " .. params.partition .. ", err: " .. err
end

-- split into multiple messages when the amount of data in
-- a single batch is too large
local messages = ret.records

-- special handling of int64 for luajit compatibility
for _, message in ipairs(messages) do
local timestamp = tostring(message.timestamp)
message.timestamp = str_sub(timestamp, 1, #timestamp - 2)
local offset = tostring(message.offset)
message.offset = str_sub(offset, 1, #offset - 2)
end

return {
kafka_fetch_resp = {
messages = messages,
},
}
end)

-- start processing client commands
pubsub:wait()
end


return _M
17 changes: 14 additions & 3 deletions apisix/schema_def.lua
Original file line number Diff line number Diff line change
Expand Up @@ -406,8 +406,17 @@ local upstream_schema = {
properties = {
client_cert = certificate_scheme,
client_key = private_key_schema,
verify = {
type = "boolean",
description = "Turn on server certificate verification, "..
"currently only kafka upstream is supported",
default = false,
},
},
required = {"client_cert", "client_key"},
dependencies = {
client_cert = {"client_key"},
client_key = {"client_cert"},
}
},
keepalive_pool = {
type = "object",
Expand Down Expand Up @@ -451,10 +460,12 @@ local upstream_schema = {
},
scheme = {
default = "http",
enum = {"grpc", "grpcs", "http", "https", "tcp", "tls", "udp"},
enum = {"grpc", "grpcs", "http", "https", "tcp", "tls", "udp",
"kafka"},
description = "The scheme of the upstream." ..
" For L7 proxy, it can be one of grpc/grpcs/http/https." ..
" For L4 proxy, it can be one of tcp/tls/udp."
" For L4 proxy, it can be one of tcp/tls/udp." ..
" For specific protocols, it can be kafka."
},
labels = labels_def,
discovery_type = {
Expand Down
2 changes: 1 addition & 1 deletion apisix/upstream.lua
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ local function check_upstream_conf(in_dp, conf)
end
end

if conf.tls then
if conf.tls and conf.tls.client_cert and conf.tls.client_key then
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if conf.tls and conf.tls.client_cert and conf.tls.client_key then
if conf.tls and conf.tls.client_cert then

is enough?

Copy link
Copy Markdown
Contributor Author

@bzp2010 bzp2010 May 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's enough, we ensure client_cert and client_key both exist by jsonschema's dependencies. Any one of them separate exist is forbidden.

dependencies = {
client_cert = {"client_key"},
client_key = {"client_cert"},
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

local cert = conf.tls.client_cert
local key = conf.tls.client_key
local ok, err = apisix_ssl.validate(cert, key)
Expand Down
Loading