Skip to content

Commit 83aae52

Browse files
A little more work
1 parent 2228247 commit 83aae52

4 files changed

Lines changed: 29 additions & 11 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ flightsql = [
8383
"datafusion-app/flightsql",
8484
"datafusion-app/observability",
8585
"dep:arrow-flight",
86+
"dep:jiff",
8687
"dep:metrics",
8788
"dep:metrics-exporter-prometheus",
8889
"dep:tonic",

crates/datafusion-app/src/observability/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use datafusion::{
2525
logical_expr::{logical_plan::dml::InsertOp, LogicalPlan, Values},
2626
physical_plan::execute_stream,
2727
prelude::{cast, lit, SessionContext},
28+
scalar::ScalarValue,
2829
sql::TableReference,
2930
};
3031
use log::error;
@@ -94,7 +95,7 @@ impl ObservabilityContext {
9495
DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
9596
),
9697
lit(req.duration_ms),
97-
lit(req.rows),
98+
lit(ScalarValue::UInt64(req.rows)),
9899
lit(req.status),
99100
]],
100101
};
@@ -128,7 +129,7 @@ pub struct ObservabilityRequestDetails {
128129
pub sql: String,
129130
pub start_ms: i64,
130131
pub duration_ms: i64,
131-
pub rows: u64,
132+
pub rows: Option<u64>,
132133
pub status: u16,
133134
}
134135

@@ -142,7 +143,7 @@ fn req_fields() -> Vec<Field> {
142143
false,
143144
),
144145
Field::new("duration_ms", DataType::Int64, false),
145-
Field::new("rows", DataType::UInt64, false),
146+
Field::new("rows", DataType::UInt64, true),
146147
Field::new("status", DataType::UInt16, false),
147148
]
148149
}

src/server/flightsql/service.rs

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ use arrow_flight::{FlightDescriptor, FlightEndpoint, FlightInfo, Ticket};
2525
use datafusion::logical_expr::LogicalPlan;
2626
use datafusion::sql::parser::DFParser;
2727
use datafusion_app::local::ExecutionContext;
28+
use datafusion_app::observability::ObservabilityRequestDetails;
2829
use futures::{StreamExt, TryStreamExt};
30+
use jiff::Timestamp;
2931
use log::{debug, error, info};
3032
use metrics::{counter, histogram};
3133
use prost::Message;
@@ -60,12 +62,11 @@ impl FlightSqlServiceImpl {
6062

6163
async fn get_flight_info_statement_handler(
6264
&self,
63-
query: CommandStatementQuery,
65+
query: String,
6466
request: Request<FlightDescriptor>,
6567
) -> Result<Response<FlightInfo>, Status> {
6668
info!("get_flight_info_statement query: {:?}", query);
6769
debug!("get_flight_info_statement request: {:?}", request);
68-
let CommandStatementQuery { query, .. } = query;
6970
let dialect = datafusion::sql::sqlparser::dialect::GenericDialect {};
7071
match DFParser::parse_sql_with_dialect(&query, &dialect) {
7172
Ok(statements) => {
@@ -192,11 +193,26 @@ impl FlightSqlService for FlightSqlServiceImpl {
192193
request: Request<FlightDescriptor>,
193194
) -> Result<Response<FlightInfo>, Status> {
194195
counter!("requests", "endpoint" => "get_flight_info").increment(1);
195-
let start = Instant::now();
196-
let res = self.get_flight_info_statement_handler(query, request).await;
197-
let duration = start.elapsed();
198-
histogram!("get_flight_info_latency_ms").record(duration.as_millis() as f64);
199-
res
196+
let start = Timestamp::now();
197+
let CommandStatementQuery { query, .. } = query;
198+
let res = self
199+
.get_flight_info_statement_handler(query.clone(), request)
200+
.await?;
201+
let duration = Timestamp::now() - start;
202+
let headers = res.metadata();
203+
println!("HEADAERS: {headers:?}");
204+
205+
let ctx = self.execution.session_ctx();
206+
// let req = ObservabilityRequestDetails {
207+
// path: "GetFlightInfo".to_string(),
208+
// sql: query,
209+
// rows: None,
210+
// start_ms: start.as_millisecond(),
211+
// duration_ms: duration.get_milliseconds(),
212+
// };
213+
// self.execution.observability().try_record_request(ctx, req);
214+
histogram!("get_flight_info_latency_ms").record(duration.get_milliseconds() as f64);
215+
Ok(res)
200216
}
201217

202218
async fn do_get_statement(

src/server/http/router.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ async fn create_response(
265265
sql: req.sql,
266266
start_ms: start.as_millisecond(),
267267
duration_ms: elapsed.get_milliseconds(),
268-
rows: details.rows,
268+
rows: Some(details.rows),
269269
status: res.status().as_u16(),
270270
};
271271
let obs = state.execution.execution_ctx().observability();

0 commit comments

Comments
 (0)