Skip to content

Commit 23c5230

Browse files
authored
[scheduler] Fix regression with layers containing multiple tags (#2258)
Layers can be setup with multiple tags using both '|' and ' | ' as separators, but the new scheduler only takes ' | '. This change fixes that. ## LLM usage disclosure Claude Opus was used to apply the query changes to accept both separators <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **Bug Fixes** * Fixed tag matching to properly handle tags with inconsistent spacing around delimiters, improving query accuracy and flexibility. * Enhanced tag parsing to normalize whitespace and filter out empty values. * **Tests** * Added comprehensive smoke tests for tag matching with various delimiter spacing formats. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 381114c commit 23c5230

3 files changed

Lines changed: 130 additions & 4 deletions

File tree

rust/crates/scheduler/src/dao/job_dao.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ filtered_jobs AS (
9797
AND (fr.int_max_cores = -1 OR fr.int_cores + l.int_cores_min < fr.int_max_cores)
9898
AND (fr.int_max_gpus = -1 OR fr.int_gpus + l.int_gpus_min < fr.int_max_gpus)
9999
-- Match tags: jobs with at least one layer that contains the queried tag
100-
AND string_to_array($3, ' | ') && string_to_array(l.str_tags, ' | ')
100+
AND string_to_array(REPLACE($3, ' ', ''), '|') && string_to_array(REPLACE(l.str_tags, ' ', ''), '|')
101101
AND LOWER(j.pk_facility) = LOWER($4)
102102
)
103103
SELECT DISTINCT

rust/crates/scheduler/src/dao/layer_dao.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,12 @@ impl DispatchLayer {
139139
.try_into()
140140
.expect("gpus_min should fit on a i32"),
141141
gpu_mem_min: ByteSize::kb(layer.int_gpu_mem_min as u64),
142-
tags: layer.str_tags.split(" | ").map(|t| t.to_string()).collect(),
142+
tags: layer
143+
.str_tags
144+
.split('|')
145+
.map(|t| t.trim().to_string())
146+
.filter(|t| !t.is_empty())
147+
.collect(),
143148
frames: frames.into_iter().map(|f| f.into()).collect(),
144149
}
145150
}
@@ -204,7 +209,7 @@ WITH dispatch_frames AS (
204209
INNER JOIN layer_stat ls on l.pk_layer = ls.pk_layer
205210
WHERE j.pk_job = $1
206211
AND ls.int_waiting_count > 0
207-
AND string_to_array($2, ' | ') && string_to_array(l.str_tags, ' | ')
212+
AND string_to_array(REPLACE($2, ' ', ''), '|') && string_to_array(REPLACE(l.str_tags, ' ', ''), '|')
208213
AND f.str_state = 'WAITING'
209214
),
210215
limited_frames AS (
@@ -277,7 +282,7 @@ FROM job j
277282
LEFT JOIN limited_frames lf ON l.pk_layer = lf.pk_layer
278283
WHERE j.pk_job = $1
279284
AND ls.int_waiting_count > 0
280-
AND string_to_array($2, ' | ') && string_to_array(l.str_tags, ' | ')
285+
AND string_to_array(REPLACE($2, ' ', ''), '|') && string_to_array(REPLACE(l.str_tags, ' ', ''), '|')
281286
ORDER BY
282287
l.int_dispatch_order,
283288
lf.int_dispatch_order,

rust/crates/scheduler/tests/smoke_tests.rs

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1358,6 +1358,127 @@ mod scheduler_smoke_test {
13581358
assert!(total_frames > 0, "Expected at least 1 frame to be dispatched");
13591359
}
13601360

1361+
// ============================================================
1362+
// Tag delimiter tolerance: str_tags may be stored as 'a|b' or 'a | b'
1363+
// ============================================================
1364+
1365+
#[tokio::test]
1366+
#[traced_test]
1367+
#[serial]
1368+
async fn test_query_layers_matches_tags_without_spaces() {
1369+
// Regression: layers stored with 'tag1|tag2' (no spaces) were dropped
1370+
// by string_to_array(..., ' | ') and never dispatched.
1371+
let (pool, show_id, facility_id, dept_id, folder_id, _alloc_id, suffix) =
1372+
setup_resource_limit_test("tag_nospace").await.expect("setup failed");
1373+
1374+
let tag_a = format!("tag_a_{}", suffix);
1375+
let tag_b = format!("tag_b_{}", suffix);
1376+
let stored_tags = format!("{}|{}", tag_a, tag_b);
1377+
1378+
let job = create_job_scenario(
1379+
&pool,
1380+
show_id,
1381+
facility_id,
1382+
dept_id,
1383+
folder_id,
1384+
&format!("tag_nospace_job_{}", suffix),
1385+
vec![(
1386+
&format!("tag_nospace_layer_{}", suffix),
1387+
&stored_tags,
1388+
1,
1389+
1024 * 1024,
1390+
0,
1391+
0,
1392+
)],
1393+
3,
1394+
)
1395+
.await
1396+
.expect("failed to create job");
1397+
1398+
let layer_dao = scheduler::dao::LayerDao::new()
1399+
.await
1400+
.expect("failed to create LayerDao");
1401+
1402+
let layers = layer_dao
1403+
.query_layers(job.id, vec![tag_a.clone()])
1404+
.await
1405+
.expect("query_layers failed");
1406+
1407+
let total_frames = count_dispatch_frames(&layers);
1408+
assert!(
1409+
total_frames > 0,
1410+
"Expected frames for no-space tag format '{}' when querying '{}', got 0",
1411+
stored_tags,
1412+
tag_a,
1413+
);
1414+
}
1415+
1416+
#[tokio::test]
1417+
#[traced_test]
1418+
#[serial]
1419+
async fn test_query_layers_matches_mixed_tag_delimiters() {
1420+
// Some rows end up with inconsistent delimiters ('a|b | c'); the query
1421+
// should still match any individual tag.
1422+
let (pool, show_id, facility_id, dept_id, folder_id, _alloc_id, suffix) =
1423+
setup_resource_limit_test("tag_mixed").await.expect("setup failed");
1424+
1425+
let tag_a = format!("tag_a_{}", suffix);
1426+
let tag_b = format!("tag_b_{}", suffix);
1427+
let tag_c = format!("tag_c_{}", suffix);
1428+
let stored_tags = format!("{}|{} | {}", tag_a, tag_b, tag_c);
1429+
1430+
let job = create_job_scenario(
1431+
&pool,
1432+
show_id,
1433+
facility_id,
1434+
dept_id,
1435+
folder_id,
1436+
&format!("tag_mixed_job_{}", suffix),
1437+
vec![(
1438+
&format!("tag_mixed_layer_{}", suffix),
1439+
&stored_tags,
1440+
1,
1441+
1024 * 1024,
1442+
0,
1443+
0,
1444+
)],
1445+
3,
1446+
)
1447+
.await
1448+
.expect("failed to create job");
1449+
1450+
let layer_dao = scheduler::dao::LayerDao::new()
1451+
.await
1452+
.expect("failed to create LayerDao");
1453+
1454+
// Query for the middle tag which is adjacent to both delimiter styles.
1455+
let layers = layer_dao
1456+
.query_layers(job.id, vec![tag_b.clone()])
1457+
.await
1458+
.expect("query_layers failed");
1459+
1460+
let total_frames = count_dispatch_frames(&layers);
1461+
assert!(
1462+
total_frames > 0,
1463+
"Expected frames for mixed-delimiter tags '{}' when querying '{}', got 0",
1464+
stored_tags,
1465+
tag_b,
1466+
);
1467+
1468+
// Returned DispatchLayer.tags should be normalized (trimmed, no empties).
1469+
let returned_tags = &layers[0].tags;
1470+
assert!(
1471+
returned_tags.contains(&tag_a)
1472+
&& returned_tags.contains(&tag_b)
1473+
&& returned_tags.contains(&tag_c),
1474+
"DispatchLayer.tags should be normalized to [{}, {}, {}], got {:?}",
1475+
tag_a,
1476+
tag_b,
1477+
tag_c,
1478+
returned_tags,
1479+
);
1480+
}
1481+
13611482
// #[tokio::test]
13621483
// #[traced_test]
13631484
// async fn test_full_service_cluster_discovery() {

0 commit comments

Comments
 (0)