Skip to content

Commit 2755f21

Browse files
Revolyssupjizhuozhi
authored andcommitted
feat(ai-proxy-multi): add support for healthcheck (apache#12509)
1 parent 335ec6b commit 2755f21

File tree

10 files changed

+1232
-149
lines changed

10 files changed

+1232
-149
lines changed

apisix/healthcheck_manager.lua

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ local healthcheck
2626
local events = require("apisix.events")
2727
local tab_clone = core.table.clone
2828
local timer_every = ngx.timer.every
29+
local ngx_re = require('ngx.re')
30+
local jp = require("jsonpath")
2931
local string_sub = string.sub
3032

3133
local _M = {}
@@ -58,8 +60,17 @@ local function remove_etcd_prefix(key)
5860
return string_sub(key, #prefix + 1)
5961
end
6062

63+
local function parse_path(resource_full_path)
64+
local resource_path_parts = ngx_re.split(resource_full_path, "#")
65+
local resource_path = resource_path_parts[1] or resource_full_path
66+
local resource_sub_path = resource_path_parts[2] or ""
67+
return resource_path, resource_sub_path
68+
end
6169

6270
local function fetch_latest_conf(resource_path)
71+
-- if resource path contains json path, extract out the prefix
72+
-- for eg: extracts /routes/1 from /routes/1#plugins.abc
73+
resource_path = parse_path(resource_path)
6374
local resource_type, id
6475
-- Handle both formats:
6576
-- 1. /<etcd-prefix>/<resource_type>/<id>
@@ -206,6 +217,15 @@ function _M.upstream_version(index, nodes_ver)
206217
end
207218

208219

220+
local function get_plugin_name(path)
221+
-- Extract JSON path (after '#') or use full path
222+
local json_path = path:match("#(.+)$") or path
223+
-- Match plugin name in the JSON path segment
224+
return json_path:match("^plugins%['([^']+)'%]")
225+
or json_path:match('^plugins%["([^"]+)"%]')
226+
or json_path:match("^plugins%.([^%.]+)")
227+
end
228+
209229
local function timer_create_checker()
210230
if core.table.nkeys(waiting_pool) == 0 then
211231
return
@@ -224,7 +244,21 @@ local function timer_create_checker()
224244
if not res_conf then
225245
goto continue
226246
end
227-
local upstream = res_conf.value.upstream or res_conf.value
247+
local upstream
248+
local plugin_name = get_plugin_name(resource_path)
249+
if plugin_name and plugin_name ~= "" then
250+
local _, sub_path = parse_path(resource_path)
251+
local json_path = "$." .. sub_path
252+
--- the users of the API pass the jsonpath(in resourcepath) to
253+
--- upstream_constructor_config which is passed to the
254+
--- callback construct_upstream to create an upstream dynamically
255+
local upstream_constructor_config = jp.value(res_conf.value, json_path)
256+
local plugin = require("apisix.plugins." .. plugin_name)
257+
upstream = plugin.construct_upstream(upstream_constructor_config)
258+
upstream.resource_key = resource_path
259+
else
260+
upstream = res_conf.value.upstream or res_conf.value
261+
end
228262
local new_version = _M.upstream_version(res_conf.modifiedIndex, upstream._nodes_ver)
229263
core.log.info("checking waiting pool for resource: ", resource_path,
230264
" current version: ", new_version, " requested version: ", resource_ver)

apisix/http/service.lua

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
local core = require("apisix.core")
1818
local apisix_upstream = require("apisix.upstream")
1919
local plugin_checker = require("apisix.plugin").plugin_checker
20+
local plugin = require("apisix.plugin")
2021
local services
2122
local error = error
2223

@@ -46,6 +47,9 @@ local function filter(service)
4647
return
4748
end
4849

50+
51+
plugin.set_plugins_meta_parent(service.value.plugins, service)
52+
4953
apisix_upstream.filter_upstream(service.value.upstream, service)
5054

5155
core.log.info("filter service: ", core.json.delay_encode(service, true))

apisix/plugin.lua

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ local type = type
3434
local local_plugins = core.table.new(32, 0)
3535
local tostring = tostring
3636
local error = error
37+
local getmetatable = getmetatable
38+
local setmetatable = setmetatable
3739
-- make linter happy to avoid error: getting the Lua global "load"
3840
-- luacheck: globals load, ignore lua_load
3941
local lua_load = load
@@ -1234,6 +1236,30 @@ function _M.run_plugin(phase, plugins, api_ctx)
12341236
return api_ctx, plugin_run
12351237
end
12361238

1239+
function _M.set_plugins_meta_parent(plugins, parent)
1240+
if not plugins then
1241+
return
1242+
end
1243+
for _, plugin_conf in pairs(plugins) do
1244+
if not plugin_conf._meta then
1245+
plugin_conf._meta = {}
1246+
end
1247+
if not plugin_conf._meta.parent then
1248+
local parent_info = {
1249+
resource_key = parent.key,
1250+
resource_version = tostring(parent.modifiedIndex)
1251+
}
1252+
local mt_table = getmetatable(plugin_conf._meta)
1253+
if mt_table then
1254+
mt_table.parent = parent_info
1255+
else
1256+
plugin_conf._meta = setmetatable(plugin_conf._meta,
1257+
{ __index = {parent = parent_info} })
1258+
end
1259+
end
1260+
end
1261+
end
1262+
12371263

12381264
function _M.run_global_rules(api_ctx, global_rules, phase_name)
12391265
if global_rules and #global_rules > 0 then

apisix/plugins/ai-proxy-multi.lua

Lines changed: 140 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ local core = require("apisix.core")
1919
local schema = require("apisix.plugins.ai-proxy.schema")
2020
local base = require("apisix.plugins.ai-proxy.base")
2121
local plugin = require("apisix.plugin")
22+
local ipmatcher = require("resty.ipmatcher")
23+
local healthcheck_manager = require("apisix.healthcheck_manager")
24+
local tonumber = tonumber
25+
local pairs = pairs
2226

2327
local require = require
2428
local pcall = pcall
@@ -118,17 +122,100 @@ local function transform_instances(new_instances, instance)
118122
new_instances[instance.priority][instance.name] = instance.weight
119123
end
120124

125+
local function parse_domain_for_node(node)
126+
local host = node.domain or node.host
127+
if not ipmatcher.parse_ipv4(host)
128+
and not ipmatcher.parse_ipv6(host)
129+
then
130+
node.domain = host
121131

122-
local function create_server_picker(conf, ups_tab)
132+
local ip, err = core.resolver.parse_domain(host)
133+
if ip then
134+
node.host = ip
135+
end
136+
137+
if err then
138+
core.log.error("dns resolver domain: ", host, " error: ", err)
139+
end
140+
end
141+
end
142+
143+
144+
local function resolve_endpoint(instance_conf)
145+
local endpoint = core.table.try_read_attr(instance_conf, "override", "endpoint")
146+
local scheme, host, port, _ = endpoint:match("^(https?)://([^:/]+):?(%d*)(/?.*)$")
147+
if port == "" then
148+
port = (scheme == "https") and "443" or "80"
149+
end
150+
local node = {
151+
host = host,
152+
port = tonumber(port),
153+
scheme = scheme,
154+
}
155+
parse_domain_for_node(node)
156+
return node
157+
end
158+
159+
160+
local function get_checkers_status_ver(checkers)
161+
local status_ver_total = 0
162+
for _, checker in pairs(checkers) do
163+
status_ver_total = status_ver_total + checker.status_ver
164+
end
165+
return status_ver_total
166+
end
167+
168+
169+
170+
local function fetch_health_instances(conf, checkers)
171+
local instances = conf.instances
172+
local new_instances = core.table.new(0, #instances)
173+
if not checkers then
174+
for _, ins in ipairs(conf.instances) do
175+
transform_instances(new_instances, ins)
176+
end
177+
return new_instances
178+
end
179+
180+
for _, ins in ipairs(instances) do
181+
local checker = checkers[ins.name]
182+
if checker then
183+
local host = ins.checks and ins.checks.active and ins.checks.active.host
184+
local port = ins.checks and ins.checks.active and ins.checks.active.port
185+
186+
local node = resolve_endpoint(ins)
187+
local ok, err = checker:get_target_status(node.host, port or node.port, host)
188+
if ok then
189+
transform_instances(new_instances, ins)
190+
elseif err then
191+
core.log.error("failed to get health check target status, addr: ",
192+
node.host, ":", port or node.port, ", host: ", host, ", err: ", err)
193+
end
194+
else
195+
transform_instances(new_instances, ins)
196+
end
197+
end
198+
199+
if core.table.nkeys(new_instances) == 0 then
200+
core.log.warn("all upstream nodes is unhealthy, use default")
201+
for _, ins in ipairs(instances) do
202+
transform_instances(new_instances, ins)
203+
end
204+
end
205+
206+
return new_instances
207+
end
208+
209+
210+
local function create_server_picker(conf, ups_tab, checkers)
123211
local picker = pickers[conf.balancer.algorithm] -- nil check
124212
if not picker then
125213
pickers[conf.balancer.algorithm] = require("apisix.balancer." .. conf.balancer.algorithm)
126214
picker = pickers[conf.balancer.algorithm]
127215
end
128-
local new_instances = {}
129-
for _, ins in ipairs(conf.instances) do
130-
transform_instances(new_instances, ins)
131-
end
216+
217+
local new_instances = fetch_health_instances(conf, checkers)
218+
core.log.info("fetch health instances: ", core.json.delay_encode(new_instances))
132219

133220
if #new_instances._priority_index > 1 then
134221
core.log.info("new instances: ", core.json.delay_encode(new_instances))
@@ -149,11 +236,57 @@ local function get_instance_conf(instances, name)
149236
end
150237

151238

239+
function _M.construct_upstream(instance)
240+
local upstream = {}
241+
local node = resolve_endpoint(instance)
242+
if not node then
243+
return nil, "failed to resolve endpoint for instance: " .. instance.name
244+
end
245+
246+
if not node.host or not node.port then
247+
return nil, "invalid upstream node: " .. core.json.encode(node)
248+
end
249+
250+
parse_domain_for_node(node)
251+
252+
local node = {
253+
host = node.host,
254+
port = node.port,
255+
scheme = node.scheme,
256+
weight = instance.weight or 1,
257+
priority = instance.priority or 0,
258+
name = instance.name,
259+
}
260+
upstream.nodes = {node}
261+
upstream.checks = instance.checks
262+
return upstream
263+
end
264+
265+
152266
local function pick_target(ctx, conf, ups_tab)
267+
local checkers
268+
for i, instance in ipairs(conf.instances) do
269+
if instance.checks then
270+
-- json path is 0 indexed so we need to decrement i
271+
local resource_path = conf._meta.parent.resource_key ..
272+
"#plugins['ai-proxy-multi'].instances[" .. i-1 .. "]"
273+
local resource_version = conf._meta.parent.resource_version
274+
local checker = healthcheck_manager.fetch_checker(resource_path, resource_version)
275+
checkers = checkers or {}
276+
checkers[instance.name] = checker
277+
end
278+
end
279+
280+
local version = plugin.conf_version(conf)
281+
if checkers then
282+
local status_ver = get_checkers_status_ver(checkers)
283+
version = version .. "#" .. status_ver
284+
end
285+
153286
local server_picker = ctx.server_picker
154287
if not server_picker then
155-
server_picker = lrucache_server_picker(ctx.matched_route.key, plugin.conf_version(conf),
156-
create_server_picker, conf, ups_tab)
288+
server_picker = lrucache_server_picker(ctx.matched_route.key, version,
289+
create_server_picker, conf, ups_tab, checkers)
157290
end
158291
if not server_picker then
159292
return nil, nil, "failed to fetch server picker"

apisix/plugins/ai-proxy/schema.lua

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
-- See the License for the specific language governing permissions and
1515
-- limitations under the License.
1616
--
17+
local schema_def = require("apisix.schema_def")
18+
1719
local _M = {}
1820

1921
local auth_item_schema = {
@@ -88,6 +90,13 @@ local ai_instance_schema = {
8890
},
8991
},
9092
},
93+
checks = {
94+
type = "object",
95+
properties = {
96+
active = schema_def.health_checker_active,
97+
},
98+
required = {"active"}
99+
}
91100
},
92101
required = {"name", "provider", "auth", "weight"}
93102
},

apisix/router.lua

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ local require = require
1818
local http_route = require("apisix.http.route")
1919
local apisix_upstream = require("apisix.upstream")
2020
local core = require("apisix.core")
21+
local set_plugins_meta_parent = require("apisix.plugin").set_plugins_meta_parent
2122
local str_lower = string.lower
2223
local ipairs = ipairs
2324

@@ -33,6 +34,8 @@ local function filter(route)
3334
return
3435
end
3536

37+
set_plugins_meta_parent(route.value.plugins, route)
38+
3639
if route.value.host then
3740
route.value.host = str_lower(route.value.host)
3841
elseif route.value.hosts then

0 commit comments

Comments
 (0)