Skip to content

Commit e1dde89

Browse files
FlightSQL Observability improvements (#301)
1 parent 1805adb commit e1dde89

13 files changed

Lines changed: 196 additions & 353 deletions

File tree

.github/workflows/test.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ jobs:
295295
uses: ./.github/actions/setup-rust
296296
- name: Start FlightSQL Server
297297
run: |
298-
cargo r --features=flightsql -- serve-flight-sql --config data/configs/flightsql_basic.toml &
298+
cargo r --features=flightsql -- serve-flightsql --config data/configs/flightsql_basic.toml &
299299
- name: Run auth tests
300300
run: |
301301
cargo t --features=flightsql extension_cases::auth_basic
@@ -323,7 +323,7 @@ jobs:
323323
uses: ./.github/actions/setup-rust
324324
- name: Start FlightSQL Server
325325
run: |
326-
cargo r --features=flightsql -- serve-flight-sql --config data/configs/flightsql_bearer.toml &
326+
cargo r --features=flightsql -- serve-flightsql --config data/configs/flightsql_bearer.toml &
327327
- name: Run auth tests
328328
run: |
329329
cargo t --features=flightsql extension_cases::auth_bearer

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,9 @@ default = ["functions-parquet"]
8181
deltalake = ["datafusion-app/deltalake"]
8282
flightsql = [
8383
"datafusion-app/flightsql",
84+
"datafusion-app/observability",
8485
"dep:arrow-flight",
86+
"dep:jiff",
8587
"dep:metrics",
8688
"dep:metrics-exporter-prometheus",
8789
"dep:tonic",

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ dft -c "SELECT 1"
4646
dft -f query.sql
4747

4848
# Start FlightSQL Server (requires `flightsql` feature)
49-
dft serve-flight-sql
49+
dft serve-flightsql
5050

5151
# Start HTTP Server (requires `http` feature)
5252
dft serve-http

crates/datafusion-app/src/local.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ impl ExecutionContext {
200200
}
201201
}
202202

203-
/// Executes the provided `LogicalPlan` returning a `SendableRecordBatchStream`. Uses the [`DedicatedExecutor`] if it is available.
203+
/// Executes the provided `LogicalPlan` returning a `SendableRecordBatchStream`. Uses the [`DedicatedExecutor`] if it is available. Useful on server implementations when planning and execution are done in separate steps and you may be storing the logical plan with something like a request_id.
204204
pub async fn execute_logical_plan(
205205
&self,
206206
logical_plan: LogicalPlan,

crates/datafusion-app/src/observability/mod.rs

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,15 @@ use datafusion::{
2525
logical_expr::{logical_plan::dml::InsertOp, LogicalPlan, Values},
2626
physical_plan::execute_stream,
2727
prelude::{cast, lit, SessionContext},
28+
scalar::ScalarValue,
2829
sql::TableReference,
2930
};
3031
use log::error;
3132
use tokio_stream::StreamExt;
3233

3334
use crate::config::ObservabilityConfig;
3435

35-
const REQUESTS_TABLE_NAME: &'static str = "requests";
36+
const REQUESTS_TABLE_NAME: &str = "requests";
3637

3738
#[derive(Clone, Debug)]
3839
pub struct ObservabilityContext {
@@ -87,13 +88,15 @@ impl ObservabilityContext {
8788
let values = Values {
8889
schema,
8990
values: vec![vec![
90-
lit(req.sql),
91+
lit(ScalarValue::Utf8(req.request_id)),
92+
lit(req.path),
93+
lit(ScalarValue::Utf8(req.sql)),
9194
cast(
9295
lit(req.start_ms),
9396
DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
9497
),
9598
lit(req.duration_ms),
96-
lit(req.rows),
99+
lit(ScalarValue::UInt64(req.rows)),
97100
lit(req.status),
98101
]],
99102
};
@@ -108,7 +111,7 @@ impl ObservabilityContext {
108111
// Requires executing this stream to actually insert the request. The plan
109112
// returns the count of records inserted
110113
let mut stream = execute_stream(res, ctx.task_ctx())?;
111-
while let Some(_) = stream.next().await {}
114+
while (stream.next().await).is_some() {}
112115
}
113116
Err(e) => {
114117
error!("Error recording request: {}", e.to_string())
@@ -123,31 +126,34 @@ impl ObservabilityContext {
123126

124127
/// Details that will be recorded in the configured observability request table
125128
pub struct ObservabilityRequestDetails {
126-
pub sql: String,
129+
pub request_id: Option<String>,
130+
pub path: String,
131+
pub sql: Option<String>,
127132
pub start_ms: i64,
128133
pub duration_ms: i64,
129-
pub rows: u64,
134+
pub rows: Option<u64>,
130135
pub status: u16,
131136
}
132137

133138
fn req_fields() -> Vec<Field> {
134139
vec![
135-
Field::new("sql", DataType::Utf8, false),
140+
Field::new("request_id", DataType::Utf8, true),
141+
Field::new("path", DataType::Utf8, false),
142+
Field::new("sql", DataType::Utf8, true),
136143
Field::new(
137144
"timestamp",
138145
DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
139146
false,
140147
),
141148
Field::new("duration_ms", DataType::Int64, false),
142-
Field::new("rows", DataType::UInt64, false),
149+
Field::new("rows", DataType::UInt64, true),
143150
Field::new("status", DataType::UInt16, false),
144151
]
145152
}
146153

147154
fn create_req_schema() -> Schema {
148155
let fields = req_fields();
149-
let schema = Schema::new(fields);
150-
schema
156+
Schema::new(fields)
151157
}
152158

153159
#[cfg(test)]
@@ -183,10 +189,12 @@ mod test {
183189

184190
let ctx = execution.session_ctx();
185191
let req = ObservabilityRequestDetails {
186-
sql: "SELECT 1".to_string(),
192+
request_id: None,
193+
path: "/sql".to_string(),
194+
sql: Some("SELECT 1".to_string()),
187195
start_ms: 100,
188196
duration_ms: 200,
189-
rows: 1,
197+
rows: Some(1),
190198
status: 200,
191199
};
192200

@@ -206,11 +214,11 @@ mod test {
206214
.unwrap();
207215

208216
let expected = [
209-
"+----------+--------------------------+-------------+------+--------+",
210-
"| sql | timestamp | duration_ms | rows | status |",
211-
"+----------+--------------------------+-------------+------+--------+",
212-
"| SELECT 1 | 1970-01-01T00:00:00.100Z | 200 | 1 | 200 |",
213-
"+----------+--------------------------+-------------+------+--------+",
217+
"+------------+------+----------+--------------------------+-------------+------+--------+",
218+
"| request_id | path | sql | timestamp | duration_ms | rows | status |",
219+
"+------------+------+----------+--------------------------+-------------+------+--------+",
220+
"| | /sql | SELECT 1 | 1970-01-01T00:00:00.100Z | 200 | 1 | 200 |",
221+
"+------------+------+----------+--------------------------+-------------+------+--------+",
214222
];
215223

216224
assert_batches_eq!(expected, &batches);

docs/flightsql_server.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
## Starting the Server
88

99
```sh
10-
dft serve-flight-sql
10+
dft serve-flightsql
1111
```
1212

1313
## Endpoints

justfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ serve-http:
1111
RUST_LOG=info cargo r --features=http,flightsql -- serve-http
1212

1313
# Starts a debug FlightSQL server
14-
serve-flight-sql:
15-
RUST_LOG=info cargo r --features=flightsql -- serve-flight-sql
14+
serve-flightsql:
15+
RUST_LOG=info cargo r --features=flightsql -- serve-flightsql
1616

1717
# You should already have run `cargo r --features=http -- serve-http` in another shell
1818
bench-http-basic:

src/args.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ pub enum Command {
126126
config: Option<String>,
127127
},
128128
/// Start a FlightSQL server
129+
#[command(name = "serve-flightsql")]
129130
ServeFlightSql {
130131
#[clap(short, long)]
131132
config: Option<String>,

0 commit comments

Comments
 (0)