@@ -19,17 +19,19 @@ use std::{io::Cursor, time::Duration};
1919
2020use axum:: {
2121 body:: Body ,
22- extract:: State ,
22+ extract:: { Path , State } ,
2323 response:: { IntoResponse , Response } ,
24- routing:: get,
24+ routing:: { get, post } ,
2525 Router ,
2626} ;
2727use datafusion:: arrow:: json:: ArrayWriter ;
2828use datafusion_app:: local:: ExecutionContext ;
2929use http:: { HeaderValue , StatusCode } ;
3030use log:: error;
31+ use serde:: Deserialize ;
3132use tokio_stream:: StreamExt ;
3233use tower_http:: { timeout:: TimeoutLayer , trace:: TraceLayer } ;
34+ use tracing:: info;
3335
3436use crate :: config:: HttpServerConfig ;
3537
@@ -39,9 +41,9 @@ pub fn create_router(execution: ExecutionContext, config: HttpServerConfig) -> R
3941 "/" ,
4042 get ( |State ( _) : State < ExecutionContext > | async { "Hello, from DFT!" } ) ,
4143 )
42- . route ( "/sql" , get ( execute_sql ) )
43- . route ( "/catalog" , get ( execute_sql ) )
44- . route ( "/{ catalog}/{ schema}/{ table} " , get ( execute_sql ) )
44+ . route ( "/sql" , post ( post_sql_handler ) )
45+ . route ( "/catalog" , get ( get_catalog_handler ) )
46+ . route ( "/table/: catalog/: schema/: table" , get ( get_table_handler ) )
4547 . layer ( (
4648 TraceLayer :: new_for_http ( ) ,
4749 // Graceful shutdown will wait for outstanding requests to complete. Add a timeout so
@@ -51,8 +53,37 @@ pub fn create_router(execution: ExecutionContext, config: HttpServerConfig) -> R
5153 . with_state ( execution)
5254}
5355
54- async fn execute_sql ( State ( state) : State < ExecutionContext > ) -> Response {
55- let results = state. execute_sql ( "SELECT 1, 2" ) . await ;
56+ async fn post_sql_handler ( state : State < ExecutionContext > , query : String ) -> Response {
57+ execute_sql ( state, query) . await
58+ }
59+
60+ async fn get_catalog_handler ( state : State < ExecutionContext > ) -> Response {
61+ execute_sql ( state, "SHOW TABLES" . to_string ( ) ) . await
62+ }
63+
64+ #[ derive( Deserialize ) ]
65+ struct GetTableParams {
66+ catalog : String ,
67+ schema : String ,
68+ table : String ,
69+ }
70+
71+ async fn get_table_handler (
72+ state : State < ExecutionContext > ,
73+ Path ( params) : Path < GetTableParams > ,
74+ ) -> Response {
75+ let GetTableParams {
76+ catalog,
77+ schema,
78+ table,
79+ } = params;
80+ let sql = format ! ( "SELECT * FROM \" {catalog}\" .\" {schema}\" .\" {table}\" " ) ;
81+ execute_sql ( state, sql) . await
82+ }
83+
84+ async fn execute_sql ( State ( state) : State < ExecutionContext > , sql : String ) -> Response {
85+ info ! ( "Executing sql: {sql}" ) ;
86+ let results = state. execute_sql ( & sql) . await ;
5687 match results {
5788 Ok ( mut batch_stream) => {
5889 let mut buf: Cursor < Vec < u8 > > = Cursor :: new ( Vec :: new ( ) ) ;
0 commit comments