@@ -100,6 +100,13 @@ async fn get_catalog_handler(
100100 Query ( query) : Query < GetCatalogQueryParams > ,
101101) -> Response {
102102 let opts = ExecOptions :: new ( None , query. flightsql ) ;
103+ if opts. flightsql && !cfg ! ( feature = "flightsql" ) {
104+ return (
105+ StatusCode :: BAD_REQUEST ,
106+ "FlightSQL is not enabled on this server" ,
107+ )
108+ . into_response ( ) ;
109+ }
103110 let sql = "SHOW TABLES" . to_string ( ) ;
104111 execute_sql_with_opts ( state, sql, opts) . await
105112}
@@ -150,7 +157,7 @@ async fn execute_sql_with_opts(
150157 )
151158 . into_response ( ) ,
152159
153- Err ( e) => ( StatusCode :: BAD_REQUEST , format ! ( "Execution failed: {}" , e) ) . into_response ( ) ,
160+ Err ( e) => ( StatusCode :: BAD_REQUEST , format ! ( "{}" , e) ) . into_response ( ) ,
154161 }
155162}
156163
@@ -169,6 +176,7 @@ async fn batch_stream_to_response(batch_stream: SendableRecordBatchStream) -> Re
169176 }
170177 Err ( e) => {
171178 error ! ( "Error executing query: {}" , e) ;
179+ // TODO: Use more appropriate errors, like 404 for table that doesnt exist
172180 return ( StatusCode :: INTERNAL_SERVER_ERROR , "Query execution error" )
173181 . into_response ( ) ;
174182 }
@@ -190,3 +198,164 @@ async fn batch_stream_to_response(batch_stream: SendableRecordBatchStream) -> Re
190198 Err ( _) => ( StatusCode :: INTERNAL_SERVER_ERROR , "UTF-8 conversion error" ) . into_response ( ) ,
191199 }
192200}
201+
202+ #[ cfg( test) ]
203+ mod test {
204+ use axum:: body:: Body ;
205+ use datafusion_app:: {
206+ config:: ExecutionConfig , extensions:: DftSessionStateBuilder , local:: ExecutionContext ,
207+ } ;
208+ use http:: { Request , StatusCode } ;
209+ use http_body_util:: BodyExt ;
210+
211+ use crate :: {
212+ config:: HttpServerConfig , execution:: AppExecution , server:: http:: router:: create_router,
213+ } ;
214+ use tower:: ServiceExt ;
215+
216+ fn setup ( ) -> ( AppExecution , HttpServerConfig ) {
217+ let config = ExecutionConfig :: default ( ) ;
218+ let state = DftSessionStateBuilder :: try_new ( None )
219+ . unwrap ( )
220+ . build ( )
221+ . unwrap ( ) ;
222+ let local = ExecutionContext :: try_new ( & config, state) . unwrap ( ) ;
223+ let execution = AppExecution :: new ( local) ;
224+
225+ let http_config = HttpServerConfig :: default ( ) ;
226+ ( execution, http_config)
227+ }
228+
229+ #[ tokio:: test]
230+ async fn test_get_catalog ( ) {
231+ let ( execution, http_config) = setup ( ) ;
232+ let router = create_router ( execution, http_config) ;
233+
234+ let req = Request :: builder ( )
235+ . uri ( "/catalog" )
236+ . body ( Body :: empty ( ) )
237+ . unwrap ( ) ;
238+ let res = router. oneshot ( req) . await . unwrap ( ) ;
239+ assert_eq ! ( res. status( ) , StatusCode :: OK ) ;
240+ }
241+
242+ #[ tokio:: test]
243+ async fn test_get_table ( ) {
244+ let ( execution, http_config) = setup ( ) ;
245+ let router = create_router ( execution, http_config) ;
246+
247+ let req = Request :: builder ( )
248+ . uri ( "/table/datafusion/information_schema/df_settings" )
249+ . body ( Body :: empty ( ) )
250+ . unwrap ( ) ;
251+ let res = router. oneshot ( req) . await . unwrap ( ) ;
252+ assert_eq ! ( res. status( ) , StatusCode :: OK ) ;
253+ }
254+
255+ #[ tokio:: test]
256+ async fn test_get_nonexistent_table ( ) {
257+ let ( execution, http_config) = setup ( ) ;
258+ let router = create_router ( execution, http_config) ;
259+
260+ let req = Request :: builder ( )
261+ . uri ( "/table/datafusion/information_schema/df_setting" )
262+ . body ( Body :: empty ( ) )
263+ . unwrap ( ) ;
264+ let res = router. oneshot ( req) . await . unwrap ( ) ;
265+ assert_eq ! ( res. status( ) , StatusCode :: BAD_REQUEST ) ;
266+ }
267+
268+ #[ tokio:: test]
269+ async fn test_correct_when_flightsql_not_enabled ( ) {
270+ let ( execution, http_config) = setup ( ) ;
271+ let router = create_router ( execution, http_config) ;
272+
273+ let req = Request :: builder ( )
274+ . uri ( "/catalog?flightsql=true" )
275+ . body ( Body :: empty ( ) )
276+ . unwrap ( ) ;
277+ let res = router. oneshot ( req) . await . unwrap ( ) ;
278+ assert_eq ! ( res. status( ) , StatusCode :: BAD_REQUEST ) ;
279+ let body = res. into_body ( ) . collect ( ) . await . unwrap ( ) . to_bytes ( ) ;
280+ assert_eq ! ( body, "FlightSQL is not enabled on this server" . as_bytes( ) )
281+ }
282+ }
283+
284+ #[ cfg( all( test, feature = "flightsql" ) ) ]
285+ mod flightsql_test {
286+ use axum:: body:: Body ;
287+ use datafusion_app:: {
288+ config:: { ExecutionConfig , FlightSQLConfig } ,
289+ extensions:: DftSessionStateBuilder ,
290+ flightsql:: FlightSQLContext ,
291+ local:: ExecutionContext ,
292+ } ;
293+ use http:: { Request , StatusCode } ;
294+
295+ use crate :: {
296+ config:: HttpServerConfig , execution:: AppExecution , server:: http:: router:: create_router,
297+ } ;
298+ use tower:: ServiceExt ;
299+
300+ async fn setup ( ) -> ( AppExecution , HttpServerConfig ) {
301+ let config = ExecutionConfig :: default ( ) ;
302+ let state = DftSessionStateBuilder :: try_new ( None )
303+ . unwrap ( )
304+ . build ( )
305+ . unwrap ( ) ;
306+ let local = ExecutionContext :: try_new ( & config, state) . unwrap ( ) ;
307+ let mut execution = AppExecution :: new ( local) ;
308+ let flightsql_cfg = FlightSQLConfig {
309+ connection_url : "localhost:50051" . to_string ( ) ,
310+ ..Default :: default ( )
311+ } ;
312+ let flightsql_ctx = FlightSQLContext :: new ( flightsql_cfg) ;
313+ flightsql_ctx
314+ . create_client ( Some ( "http://localhost:50051" . to_string ( ) ) )
315+ . await
316+ . unwrap ( ) ;
317+ execution. with_flightsql_ctx ( flightsql_ctx) ;
318+
319+ let http_config = HttpServerConfig :: default ( ) ;
320+ ( execution, http_config)
321+ }
322+
323+ #[ tokio:: test]
324+ async fn test_get_catalog ( ) {
325+ let ( execution, http_config) = setup ( ) . await ;
326+ let router = create_router ( execution, http_config) ;
327+
328+ let req = Request :: builder ( )
329+ . uri ( "/catalog?flightsql=true" )
330+ . body ( Body :: empty ( ) )
331+ . unwrap ( ) ;
332+ let res = router. oneshot ( req) . await . unwrap ( ) ;
333+ assert_eq ! ( res. status( ) , StatusCode :: OK ) ;
334+ }
335+
336+ #[ tokio:: test]
337+ async fn test_get_table ( ) {
338+ let ( execution, http_config) = setup ( ) . await ;
339+ let router = create_router ( execution, http_config) ;
340+
341+ let req = Request :: builder ( )
342+ . uri ( "/table/datafusion/information_schema/df_settings?flightsql=true" )
343+ . body ( Body :: empty ( ) )
344+ . unwrap ( ) ;
345+ let res = router. oneshot ( req) . await . unwrap ( ) ;
346+ assert_eq ! ( res. status( ) , StatusCode :: OK ) ;
347+ }
348+
349+ #[ tokio:: test]
350+ async fn test_get_nonexistent_table ( ) {
351+ let ( execution, http_config) = setup ( ) . await ;
352+ let router = create_router ( execution, http_config) ;
353+
354+ let req = Request :: builder ( )
355+ . uri ( "/table/datafusion/information_schema/df_setting?flightsql=true" )
356+ . body ( Body :: empty ( ) )
357+ . unwrap ( ) ;
358+ let res = router. oneshot ( req) . await . unwrap ( ) ;
359+ assert_eq ! ( res. status( ) , StatusCode :: BAD_REQUEST ) ;
360+ }
361+ }
0 commit comments