Skip to content

Commit fb3e38f

Browse files
Add path to observability reqs
1 parent c421755 commit fb3e38f

5 files changed

Lines changed: 36 additions & 20 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ default = ["functions-parquet"]
8181
deltalake = ["datafusion-app/deltalake"]
8282
flightsql = [
8383
"datafusion-app/flightsql",
84+
"datafusion-app/observability",
8485
"dep:arrow-flight",
8586
"dep:metrics",
8687
"dep:metrics-exporter-prometheus",

crates/datafusion-app/src/observability/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ impl ObservabilityContext {
8787
let values = Values {
8888
schema,
8989
values: vec![vec![
90+
lit(req.path),
9091
lit(req.sql),
9192
cast(
9293
lit(req.start_ms),
@@ -123,6 +124,7 @@ impl ObservabilityContext {
123124

124125
/// Details that will be recorded in the configured observability request table
125126
pub struct ObservabilityRequestDetails {
127+
pub path: String,
126128
pub sql: String,
127129
pub start_ms: i64,
128130
pub duration_ms: i64,
@@ -132,6 +134,7 @@ pub struct ObservabilityRequestDetails {
132134

133135
fn req_fields() -> Vec<Field> {
134136
vec![
137+
Field::new("path", DataType::Utf8, false),
135138
Field::new("sql", DataType::Utf8, false),
136139
Field::new(
137140
"timestamp",
@@ -183,6 +186,7 @@ mod test {
183186

184187
let ctx = execution.session_ctx();
185188
let req = ObservabilityRequestDetails {
189+
path: "/sql".to_string(),
186190
sql: "SELECT 1".to_string(),
187191
start_ms: 100,
188192
duration_ms: 200,

src/cli/mod.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use datafusion_app::extensions::DftSessionStateBuilder;
3030
use datafusion_app::local::ExecutionContext;
3131
use datafusion_app::local_benchmarks::LocalBenchmarkStats;
3232
use futures::{Stream, StreamExt};
33-
use log::{error, info};
33+
use log::info;
3434
use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
3535
use std::error::Error;
3636
use std::fs::File;
@@ -43,6 +43,7 @@ use {
4343
flightsql::FlightSQLContext,
4444
flightsql_benchmarks::FlightSQLBenchmarkStats,
4545
},
46+
log::error,
4647
tonic::IntoRequest,
4748
};
4849

@@ -606,11 +607,8 @@ pub async fn try_run(cli: DftArgs, config: AppConfig) -> Result<()> {
606607
auth,
607608
);
608609
let flightsql_ctx = FlightSQLContext::new(flightsql_cfg);
609-
if let Err(e) = flightsql_ctx.create_client(cli.host.clone()).await {
610-
error!("{}", e.to_string())
611-
} else {
612-
app_execution.with_flightsql_ctx(flightsql_ctx);
613-
}
610+
flightsql_ctx.create_client(cli.host.clone()).await?;
611+
app_execution.with_flightsql_ctx(flightsql_ctx);
614612
}
615613
}
616614
let app = CliApp::new(app_execution, cli.clone());

src/server/http/mod.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,7 @@ pub async fn try_run(cli: DftArgs, config: AppConfig) -> Result<()> {
123123
execution_ctx.execute_ddl().await;
124124
}
125125

126-
#[cfg(not(feature = "flightsql"))]
127-
let app_execution = AppExecution::new(execution_ctx);
128-
#[cfg(feature = "flightsql")]
126+
#[allow(unused_mut)]
129127
let mut app_execution = AppExecution::new(execution_ctx);
130128
#[cfg(feature = "flightsql")]
131129
{

src/server/http/router.rs

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::{io::Cursor, time::Duration};
1919

2020
use axum::{
2121
body::Body,
22-
extract::{Json, Path, Query, State},
22+
extract::{Json, OriginalUri, Path, Query, State},
2323
response::{IntoResponse, Response},
2424
routing::{get, post},
2525
Router,
@@ -29,7 +29,7 @@ use datafusion_app::{observability::ObservabilityRequestDetails, ExecOptions, Ex
2929
use http::{HeaderValue, StatusCode};
3030
use jiff::Timestamp;
3131
use log::error;
32-
use serde::Deserialize;
32+
use serde::{Deserialize, Serialize};
3333
use tokio_stream::StreamExt;
3434
use tower_http::{timeout::TimeoutLayer, trace::TraceLayer};
3535
use tracing::info;
@@ -38,6 +38,7 @@ use crate::{config::HttpServerConfig, execution::AppExecution};
3838

3939
#[derive(Debug)]
4040
struct ExecRequest {
41+
path: String,
4142
sql: String,
4243
}
4344

@@ -83,17 +84,22 @@ struct PostSqlBody {
8384
flightsql: bool,
8485
}
8586

86-
async fn post_sql_handler(state: State<ExecutionState>, Json(body): Json<PostSqlBody>) -> Response {
87+
async fn post_sql_handler(
88+
state: State<ExecutionState>,
89+
OriginalUri(uri): OriginalUri,
90+
Json(body): Json<PostSqlBody>,
91+
) -> Response {
8792
if body.flightsql && !cfg!(feature = "flightsql") {
8893
return (
8994
StatusCode::BAD_REQUEST,
9095
"FlightSQL is not enabled on this server",
9196
)
9297
.into_response();
9398
}
94-
let rt = state.execution.session_ctx().runtime_env();
95-
println!("Runtime {rt:?}");
96-
let req = ExecRequest { sql: body.sql };
99+
let req = ExecRequest {
100+
path: uri.path().to_string(),
101+
sql: body.sql.to_string(),
102+
};
97103
let opts = ExecOptions::new(Some(state.config.result_limit), body.flightsql);
98104
create_response(&state, req, opts).await
99105
}
@@ -106,6 +112,7 @@ struct GetCatalogQueryParams {
106112

107113
async fn get_catalog_handler(
108114
state: State<ExecutionState>,
115+
OriginalUri(uri): OriginalUri,
109116
Query(query): Query<GetCatalogQueryParams>,
110117
) -> Response {
111118
let opts = ExecOptions::new(None, query.flightsql);
@@ -117,11 +124,14 @@ async fn get_catalog_handler(
117124
.into_response();
118125
}
119126
let sql = "SHOW TABLES".to_string();
120-
let req = ExecRequest { sql };
127+
let req = ExecRequest {
128+
path: uri.path().to_string(),
129+
sql,
130+
};
121131
create_response(&state, req, opts).await
122132
}
123133

124-
#[derive(Deserialize)]
134+
#[derive(Deserialize, Serialize)]
125135
struct GetTablePathParams {
126136
catalog: String,
127137
schema: String,
@@ -136,19 +146,23 @@ struct GetTableQueryParams {
136146

137147
async fn get_table_handler(
138148
state: State<ExecutionState>,
139-
Path(params): Path<GetTablePathParams>,
149+
Path(path): Path<GetTablePathParams>,
140150
Query(query): Query<GetTableQueryParams>,
151+
OriginalUri(uri): OriginalUri,
141152
) -> Response {
142153
let GetTablePathParams {
143154
catalog,
144155
schema,
145156
table,
146-
} = params;
157+
} = path;
147158
let sql = format!(
148159
"SELECT * FROM \"{catalog}\".\"{schema}\".\"{table}\" LIMIT {}",
149160
state.config.result_limit
150161
);
151-
let req = ExecRequest { sql };
162+
let req = ExecRequest {
163+
path: uri.path().to_string(),
164+
sql,
165+
};
152166
let opts = ExecOptions::new(Some(state.config.result_limit), query.flightsql);
153167
create_response(&state, req, opts).await
154168
}
@@ -247,6 +261,7 @@ async fn create_response(
247261
let (res, details) = response_for_sql(state, req.sql.clone(), opts).await;
248262
let elapsed = Timestamp::now() - start;
249263
let req = ObservabilityRequestDetails {
264+
path: req.path,
250265
sql: req.sql,
251266
start_ms: start.as_millisecond(),
252267
duration_ms: elapsed.get_milliseconds(),

0 commit comments

Comments
 (0)