Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ jobs:
uses: ./.github/actions/setup-rust
- name: Start FlightSQL Server
run: |
cargo r --features=flightsql -- serve-flight-sql --config data/configs/flightsql_basic.toml &
cargo r --features=flightsql -- serve-flightsql --config data/configs/flightsql_basic.toml &
- name: Run auth tests
run: |
cargo t --features=flightsql extension_cases::auth_basic
Expand Down Expand Up @@ -323,7 +323,7 @@ jobs:
uses: ./.github/actions/setup-rust
- name: Start FlightSQL Server
run: |
cargo r --features=flightsql -- serve-flight-sql --config data/configs/flightsql_bearer.toml &
cargo r --features=flightsql -- serve-flightsql --config data/configs/flightsql_bearer.toml &
- name: Run auth tests
run: |
cargo t --features=flightsql extension_cases::auth_bearer
Expand Down
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ default = ["functions-parquet"]
deltalake = ["datafusion-app/deltalake"]
flightsql = [
"datafusion-app/flightsql",
"datafusion-app/observability",
"dep:arrow-flight",
"dep:jiff",
"dep:metrics",
"dep:metrics-exporter-prometheus",
"dep:tonic",
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ dft -c "SELECT 1"
dft -f query.sql

# Start FlightSQL Server (requires `flightsql` feature)
dft serve-flight-sql
dft serve-flightsql

# Start HTTP Server (requires `http` feature)
dft serve-http
Expand Down
2 changes: 1 addition & 1 deletion crates/datafusion-app/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ impl ExecutionContext {
}
}

/// Executes the provided `LogicalPlan` returning a `SendableRecordBatchStream`. Uses the [`DedicatedExecutor`] if it is available.
/// Executes the provided `LogicalPlan` returning a `SendableRecordBatchStream`. Uses the [`DedicatedExecutor`] if it is available. Useful on server implementations when planning and execution are done in separate steps and you may be storing the logical plan with something like a request_id.
pub async fn execute_logical_plan(
&self,
logical_plan: LogicalPlan,
Expand Down
42 changes: 25 additions & 17 deletions crates/datafusion-app/src/observability/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ use datafusion::{
logical_expr::{logical_plan::dml::InsertOp, LogicalPlan, Values},
physical_plan::execute_stream,
prelude::{cast, lit, SessionContext},
scalar::ScalarValue,
sql::TableReference,
};
use log::error;
use tokio_stream::StreamExt;

use crate::config::ObservabilityConfig;

const REQUESTS_TABLE_NAME: &'static str = "requests";
const REQUESTS_TABLE_NAME: &str = "requests";

#[derive(Clone, Debug)]
pub struct ObservabilityContext {
Expand Down Expand Up @@ -87,13 +88,15 @@ impl ObservabilityContext {
let values = Values {
schema,
values: vec![vec![
lit(req.sql),
lit(ScalarValue::Utf8(req.request_id)),
lit(req.path),
lit(ScalarValue::Utf8(req.sql)),
cast(
lit(req.start_ms),
DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
),
lit(req.duration_ms),
lit(req.rows),
lit(ScalarValue::UInt64(req.rows)),
lit(req.status),
]],
};
Expand All @@ -108,7 +111,7 @@ impl ObservabilityContext {
// Requires executing this stream to actually insert the request. The plan
// returns the count of records inserted
let mut stream = execute_stream(res, ctx.task_ctx())?;
while let Some(_) = stream.next().await {}
while (stream.next().await).is_some() {}
}
Err(e) => {
error!("Error recording request: {}", e.to_string())
Expand All @@ -123,31 +126,34 @@ impl ObservabilityContext {

/// Details that will be recorded in the configured observability request table
pub struct ObservabilityRequestDetails {
pub sql: String,
pub request_id: Option<String>,
pub path: String,
pub sql: Option<String>,
pub start_ms: i64,
pub duration_ms: i64,
pub rows: u64,
pub rows: Option<u64>,
pub status: u16,
}

fn req_fields() -> Vec<Field> {
vec![
Field::new("sql", DataType::Utf8, false),
Field::new("request_id", DataType::Utf8, true),
Field::new("path", DataType::Utf8, false),
Field::new("sql", DataType::Utf8, true),
Field::new(
"timestamp",
DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
false,
),
Field::new("duration_ms", DataType::Int64, false),
Field::new("rows", DataType::UInt64, false),
Field::new("rows", DataType::UInt64, true),
Field::new("status", DataType::UInt16, false),
]
}

fn create_req_schema() -> Schema {
let fields = req_fields();
let schema = Schema::new(fields);
schema
Schema::new(fields)
}

#[cfg(test)]
Expand Down Expand Up @@ -183,10 +189,12 @@ mod test {

let ctx = execution.session_ctx();
let req = ObservabilityRequestDetails {
sql: "SELECT 1".to_string(),
request_id: None,
path: "/sql".to_string(),
sql: Some("SELECT 1".to_string()),
start_ms: 100,
duration_ms: 200,
rows: 1,
rows: Some(1),
status: 200,
};

Expand All @@ -206,11 +214,11 @@ mod test {
.unwrap();

let expected = [
"+----------+--------------------------+-------------+------+--------+",
"| sql | timestamp | duration_ms | rows | status |",
"+----------+--------------------------+-------------+------+--------+",
"| SELECT 1 | 1970-01-01T00:00:00.100Z | 200 | 1 | 200 |",
"+----------+--------------------------+-------------+------+--------+",
"+------------+------+----------+--------------------------+-------------+------+--------+",
"| request_id | path | sql | timestamp | duration_ms | rows | status |",
"+------------+------+----------+--------------------------+-------------+------+--------+",
"| | /sql | SELECT 1 | 1970-01-01T00:00:00.100Z | 200 | 1 | 200 |",
"+------------+------+----------+--------------------------+-------------+------+--------+",
];

assert_batches_eq!(expected, &batches);
Expand Down
2 changes: 1 addition & 1 deletion docs/flightsql_server.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
## Starting the Server

```sh
dft serve-flight-sql
dft serve-flightsql
```

## Endpoints
Expand Down
4 changes: 2 additions & 2 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ serve-http:
RUST_LOG=info cargo r --features=http,flightsql -- serve-http

# Starts a debug FlightSQL server
serve-flight-sql:
RUST_LOG=info cargo r --features=flightsql -- serve-flight-sql
serve-flightsql:
RUST_LOG=info cargo r --features=flightsql -- serve-flightsql

# You should already have run `cargo r --features=http -- serve-http` in another shell
bench-http-basic:
Expand Down
1 change: 1 addition & 0 deletions src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ pub enum Command {
config: Option<String>,
},
/// Start a FlightSQL server
#[command(name = "serve-flightsql")]
ServeFlightSql {
#[clap(short, long)]
config: Option<String>,
Expand Down
Loading