Skip to content

Commit 20c7aaa

Browse files
committed
fix(influxdb3_system_tables): scope system.queries to current database
Filter the entries from the shared `QueryLog` by the connected database name. Matches the per-database scoping used by `system.parquet_files`, `system.last_caches`, etc.
1 parent db092e1 commit 20c7aaa

3 files changed

Lines changed: 72 additions & 1 deletion

File tree

influxdb3/tests/server/system_tables.rs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,73 @@ async fn queries_table() {
9393
}
9494
}
9595

96+
#[tokio::test]
97+
async fn queries_table_scoped_to_database() {
98+
let server = TestServer::spawn().await;
99+
100+
// Initialize two user databases by writing to each.
101+
server
102+
.write_lp_to_db("foo", "cpu,host=s1 usage=0.9 2998574931", Precision::Second)
103+
.await
104+
.expect("write to foo");
105+
server
106+
.write_lp_to_db("bar", "cpu,host=s2 usage=0.8 2998574931", Precision::Second)
107+
.await
108+
.expect("write to bar");
109+
110+
// Run a query on foo so the global query log has at least one foo entry.
111+
{
112+
let mut foo_client = server.flight_sql_client("foo").await;
113+
let resp = foo_client.query("SELECT * FROM cpu").await.unwrap();
114+
let _batches = collect_stream(resp).await;
115+
}
116+
117+
// system.queries on `bar` must not return foo's query. `running = false`
118+
// excludes the current scan query itself (which is still in flight).
119+
{
120+
let mut bar_client = server.flight_sql_client("bar").await;
121+
let resp = bar_client
122+
.query("SELECT COUNT(*) FROM system.queries WHERE running = false")
123+
.await
124+
.unwrap();
125+
let batches = collect_stream(resp).await;
126+
assert_batches_sorted_eq!(
127+
[
128+
"+----------+",
129+
"| count(*) |",
130+
"+----------+",
131+
"| 0 |",
132+
"+----------+",
133+
],
134+
&batches
135+
);
136+
}
137+
138+
// system.queries on `_internal` is also scoped — it does not act as a
139+
// server-wide admin view, so foo's query must not appear here either.
140+
{
141+
let mut internal_client = server.flight_sql_client("_internal").await;
142+
let resp = internal_client
143+
.query(
144+
"SELECT COUNT(*) FROM system.queries \
145+
WHERE running = false AND query_text LIKE '%SELECT * FROM cpu%'",
146+
)
147+
.await
148+
.unwrap();
149+
let batches = collect_stream(resp).await;
150+
assert_batches_sorted_eq!(
151+
[
152+
"+----------+",
153+
"| count(*) |",
154+
"+----------+",
155+
"| 0 |",
156+
"+----------+",
157+
],
158+
&batches
159+
);
160+
}
161+
}
162+
96163
#[test_log::test(tokio::test)]
97164
async fn last_caches_table() {
98165
let server = TestServer::spawn().await;

influxdb3_system_tables/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ impl AllSystemSchemaTablesProvider {
127127
let mut tables = HashMap::<&'static str, Arc<dyn TableProvider>>::new();
128128
let queries = Arc::new(SystemTableProvider::new(Arc::new(QueriesTable::new(
129129
query_log,
130+
db_schema.name(),
130131
))));
131132
tables.insert(QUERIES_TABLE_NAME, queries);
132133
let last_caches = Arc::new(SystemTableProvider::new(Arc::new(LastCachesTable::new(

influxdb3_system_tables/src/queries.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,15 @@ use iox_system_tables::IoxSystemTable;
1313
pub(super) struct QueriesTable {
1414
schema: SchemaRef,
1515
query_log: Arc<QueryLog>,
16+
db_name: Arc<str>,
1617
}
1718

1819
impl QueriesTable {
19-
pub(super) fn new(query_log: Arc<QueryLog>) -> Self {
20+
pub(super) fn new(query_log: Arc<QueryLog>, db_name: Arc<str>) -> Self {
2021
Self {
2122
schema: queries_schema(),
2223
query_log,
24+
db_name,
2325
}
2426
}
2527
}
@@ -43,6 +45,7 @@ impl IoxSystemTable for QueriesTable {
4345
.entries
4446
.into_iter()
4547
.map(|e| e.state())
48+
.filter(|state| state.namespace_name.as_ref() == self.db_name.as_ref())
4649
.collect::<Vec<_>>();
4750

4851
from_query_log_entries(Arc::clone(&schema), &entries)

0 commit comments

Comments
 (0)