@@ -21,11 +21,13 @@ use arrow_flight::error::FlightError;
2121use arrow_flight:: flight_service_server:: { FlightService , FlightServiceServer } ;
2222use arrow_flight:: sql:: server:: FlightSqlService ;
2323use arrow_flight:: sql:: {
24- Any , CommandGetCatalogs , CommandGetDbSchemas , CommandGetTables , CommandStatementQuery , SqlInfo ,
24+ Any , CommandGetCatalogs , CommandGetDbSchemas , CommandGetSqlInfo , CommandGetTableTypes ,
25+ CommandGetTables , CommandGetXdbcTypeInfo , CommandStatementQuery , SqlInfo ,
2526 TicketStatementQuery ,
2627} ;
2728use arrow_flight:: { FlightDescriptor , FlightEndpoint , FlightInfo , Ticket } ;
2829use color_eyre:: Result ;
30+ use datafusion:: arrow:: datatypes:: Schema ;
2931use datafusion:: logical_expr:: LogicalPlan ;
3032use datafusion:: prelude:: { col, lit} ;
3133use datafusion:: sql:: parser:: DFParser ;
@@ -42,18 +44,30 @@ use std::sync::{Arc, Mutex};
4244use tonic:: { Code , Request , Response , Status } ;
4345use uuid:: Uuid ;
4446
47+ /// Prepared statement handle containing the logical plan and metadata
48+ #[ derive( Clone ) ]
49+ pub struct PreparedStatementHandle {
50+ pub plan : LogicalPlan ,
51+ pub parameter_schema : Option < Arc < Schema > > ,
52+ pub dataset_schema : Arc < Schema > ,
53+ pub created_at : Timestamp ,
54+ }
55+
4556#[ derive( Clone ) ]
4657pub struct FlightSqlServiceImpl {
4758 requests : Arc < Mutex < HashMap < Uuid , LogicalPlan > > > ,
59+ prepared_statements : Arc < Mutex < HashMap < Uuid , PreparedStatementHandle > > > ,
4860 execution : ExecutionContext ,
4961}
5062
5163impl FlightSqlServiceImpl {
5264 pub fn new ( execution : AppExecution ) -> Self {
5365 let requests = HashMap :: new ( ) ;
66+ let prepared_statements = HashMap :: new ( ) ;
5467 Self {
5568 execution : execution. execution_ctx ( ) . clone ( ) ,
5669 requests : Arc :: new ( Mutex :: new ( requests) ) ,
70+ prepared_statements : Arc :: new ( Mutex :: new ( prepared_statements) ) ,
5771 }
5872 }
5973
@@ -353,6 +367,117 @@ impl FlightSqlService for FlightSqlServiceImpl {
353367 res
354368 }
355369
370+ async fn get_flight_info_table_types (
371+ & self ,
372+ _query : CommandGetTableTypes ,
373+ request : Request < FlightDescriptor > ,
374+ ) -> Result < Response < FlightInfo > , Status > {
375+ counter ! ( "requests" , "endpoint" => "get_flight_info_table_types" ) . increment ( 1 ) ;
376+ let start = Timestamp :: now ( ) ;
377+ let request_id = uuid:: Uuid :: new_v4 ( ) ;
378+ let query =
379+ "SELECT DISTINCT table_type FROM information_schema.tables ORDER BY table_type"
380+ . to_string ( ) ;
381+ let res = self . create_flight_info ( query, request_id, request) . await ;
382+
383+ // TODO: Move recording to after response is sent to not impact response latency
384+ self . record_request (
385+ start,
386+ Some ( request_id. to_string ( ) ) ,
387+ res. as_ref ( ) . err ( ) ,
388+ "/get_flight_info_table_types" . to_string ( ) ,
389+ "get_flight_info_table_types_latency_ms" ,
390+ )
391+ . await ;
392+ res
393+ }
394+
395+ async fn get_flight_info_sql_info (
396+ & self ,
397+ _query : CommandGetSqlInfo ,
398+ request : Request < FlightDescriptor > ,
399+ ) -> Result < Response < FlightInfo > , Status > {
400+ counter ! ( "requests" , "endpoint" => "get_flight_info_sql_info" ) . increment ( 1 ) ;
401+ let start = Timestamp :: now ( ) ;
402+ let request_id = uuid:: Uuid :: new_v4 ( ) ;
403+
404+ // For now, return basic server info via a simple SQL query
405+ // TODO: Implement full SqlInfo support with DenseUnion schema
406+ let query = format ! (
407+ "SELECT '{}' as server_name, '{}' as server_version, '57.0' as arrow_version, false as read_only" ,
408+ "datafusion-dft" ,
409+ env!( "CARGO_PKG_VERSION" )
410+ ) ;
411+
412+ let res = self . create_flight_info ( query, request_id, request) . await ;
413+
414+ // TODO: Move recording to after response is sent to not impact response latency
415+ self . record_request (
416+ start,
417+ Some ( request_id. to_string ( ) ) ,
418+ res. as_ref ( ) . err ( ) ,
419+ "/get_flight_info_sql_info" . to_string ( ) ,
420+ "get_flight_info_sql_info_latency_ms" ,
421+ )
422+ . await ;
423+ res
424+ }
425+
426+ async fn get_flight_info_xdbc_type_info (
427+ & self ,
428+ query : CommandGetXdbcTypeInfo ,
429+ request : Request < FlightDescriptor > ,
430+ ) -> Result < Response < FlightInfo > , Status > {
431+ counter ! ( "requests" , "endpoint" => "get_flight_info_xdbc_type_info" ) . increment ( 1 ) ;
432+ let start = Timestamp :: now ( ) ;
433+ let request_id = uuid:: Uuid :: new_v4 ( ) ;
434+
435+ // Build query to return XDBC type info for DataFusion-supported types
436+ // If data_type filter is provided, we would filter to that type
437+ // For now, return all supported types
438+ let type_filter = if let Some ( data_type) = query. data_type {
439+ format ! ( " WHERE type_num = {}" , data_type)
440+ } else {
441+ String :: new ( )
442+ } ;
443+
444+ // Return basic type information for common Arrow/DataFusion types
445+ // This is a simplified version - a full implementation would include all XDBC type metadata
446+ let query = format ! (
447+ "SELECT * FROM (VALUES \
448+ (-5, 'BIGINT', 19, NULL, NULL, NULL, 1, 0, 3, 0, 0, 0, 'BIGINT', -5, 0, 10, 0), \
449+ (4, 'INTEGER', 10, NULL, NULL, NULL, 1, 0, 3, 0, 0, 0, 'INTEGER', 4, 0, 10, 0), \
450+ (5, 'SMALLINT', 5, NULL, NULL, NULL, 1, 0, 3, 0, 0, 0, 'SMALLINT', 5, 0, 10, 0), \
451+ (-6, 'TINYINT', 3, NULL, NULL, NULL, 1, 0, 3, 0, 0, 0, 'TINYINT', -6, 0, 10, 0), \
452+ (8, 'DOUBLE', 15, NULL, NULL, NULL, 1, 0, 3, 0, 0, 0, 'DOUBLE PRECISION', 8, 0, 2, 0), \
453+ (7, 'REAL', 7, NULL, NULL, NULL, 1, 0, 3, 0, 0, 0, 'REAL', 7, 0, 2, 0), \
454+ (12, 'VARCHAR', 2147483647, '''', '''', 'length', 1, 1, 3, 0, 0, 0, 'VARCHAR', 12, 0, 0, 0), \
455+ (91, 'DATE', 10, '''', '''', NULL, 1, 0, 3, 0, 0, 0, 'DATE', 91, 0, 0, 0), \
456+ (93, 'TIMESTAMP', 23, '''', '''', NULL, 1, 0, 3, 0, 0, 0, 'TIMESTAMP', 93, 3, 0, 0), \
457+ (-7, 'BOOLEAN', 1, NULL, NULL, NULL, 1, 0, 3, 0, 0, 0, 'BOOLEAN', -7, 0, 0, 0), \
458+ (-2, 'BINARY', 2147483647, '''', '''', 'length', 1, 0, 3, 0, 0, 0, 'BINARY', -2, 0, 0, 0), \
459+ (2, 'DECIMAL', 38, NULL, NULL, 'precision,scale', 1, 0, 3, 0, 0, 0, 'DECIMAL', 2, 0, 10, 0) \
460+ ) AS types(\
461+ type_name_num, type_name_str, column_size, literal_prefix, literal_suffix, create_params, \
462+ nullable, case_sensitive, searchable, unsigned_attribute, fixed_prec_scale, auto_increment, \
463+ local_type_name, data_type, minimum_scale, maximum_scale, sql_datetime_sub\
464+ ){type_filter}"
465+ ) ;
466+
467+ let res = self . create_flight_info ( query, request_id, request) . await ;
468+
469+ // TODO: Move recording to after response is sent to not impact response latency
470+ self . record_request (
471+ start,
472+ Some ( request_id. to_string ( ) ) ,
473+ res. as_ref ( ) . err ( ) ,
474+ "/get_flight_info_xdbc_type_info" . to_string ( ) ,
475+ "get_flight_info_xdbc_type_info_latency_ms" ,
476+ )
477+ . await ;
478+ res
479+ }
480+
356481 async fn get_flight_info_statement (
357482 & self ,
358483 query : CommandStatementQuery ,
0 commit comments