@@ -50,188 +50,193 @@ impl Executor {
5050 pub fn execute ( & self , statement : Statement ) -> Result < ExecutionResult > {
5151 let start = Instant :: now ( ) ;
5252
53- let result =
54- match statement {
55- Statement :: CreateTable { name, columns } => {
56- self . catalog . create_table ( & name, columns) ?;
57- Ok ( ExecutionResult :: Created { table : name } )
53+ let result = match statement {
54+ Statement :: CreateTable { name, columns } => {
55+ self . catalog . create_table ( & name, columns) ?;
56+ Ok ( ExecutionResult :: Created { table : name } )
57+ }
58+ Statement :: Insert {
59+ table,
60+ columns : _,
61+ values,
62+ } => {
63+ let _schema = self
64+ . catalog
65+ . get_table ( & table) ?
66+ . ok_or_else ( || StorageError :: ReadError ( format ! ( "Table {} not found" , table) ) ) ?;
67+
68+ for row_values in & values {
69+ let row = Row {
70+ values : row_values. clone ( ) ,
71+ } ;
72+ let key = self . generate_row_key ( & table) ;
73+ let value = serde_json:: to_vec ( & row) ?;
74+ self . storage . set ( & key, & value) ?;
5875 }
59- Statement :: Insert {
60- table,
61- columns : _,
62- values,
63- } => {
64- let _schema = self . catalog . get_table ( & table) ?. ok_or_else ( || {
65- StorageError :: ReadError ( format ! ( "Table {} not found" , table) )
66- } ) ?;
67-
68- for row_values in & values {
69- let row = Row {
70- values : row_values. clone ( ) ,
71- } ;
72- let key = self . generate_row_key ( & table) ;
73- let value = serde_json:: to_vec ( & row) ?;
74- self . storage . set ( & key, & value) ?;
75- }
7676
77- Ok ( ExecutionResult :: Inserted {
78- table,
79- rows : values. len ( ) ,
80- } )
81- }
82- Statement :: Select {
77+ Ok ( ExecutionResult :: Inserted {
8378 table,
84- columns,
85- where_clause,
86- order_by,
87- limit,
88- } => {
89- if let Some ( cache) = & self . cache {
90- let query_str = format ! ( "SELECT {:?} FROM {}" , columns, table) ;
91-
92- if let Ok ( Some ( cached_result) ) = cache. get ( & query_str) {
93- log:: debug!( "Cache hit for query: {}" , query_str) ;
94- let rows: Vec < Row > =
95- serde_json:: from_str ( & cached_result) . unwrap_or_else ( |_| Vec :: new ( ) ) ;
96- return Ok ( ExecutionResult :: Selected { columns, rows } ) ;
97- }
79+ rows : values. len ( ) ,
80+ } )
81+ }
82+ Statement :: Select {
83+ table,
84+ columns,
85+ where_clause,
86+ order_by,
87+ limit,
88+ } => {
89+ if let Some ( cache) = & self . cache {
90+ let query_str = format ! ( "SELECT {:?} FROM {}" , columns, table) ;
91+
92+ if let Ok ( Some ( cached_result) ) = cache. get ( & query_str) {
93+ log:: debug!( "Cache hit for query: {}" , query_str) ;
94+ let rows: Vec < Row > =
95+ serde_json:: from_str ( & cached_result) . unwrap_or_else ( |_| Vec :: new ( ) ) ;
96+ return Ok ( ExecutionResult :: Selected { columns, rows } ) ;
9897 }
98+ }
9999
100- let schema = self . catalog . get_table ( & table) ?. ok_or_else ( || {
101- StorageError :: ReadError ( format ! ( "Table {} not found" , table) )
102- } ) ?;
100+ let schema = self
101+ . catalog
102+ . get_table ( & table) ?
103+ . ok_or_else ( || StorageError :: ReadError ( format ! ( "Table {} not found" , table) ) ) ?;
103104
104- let prefix = Self :: table_data_prefix ( & table) ;
105- let all_rows = self . storage . scan_prefix ( & prefix) ?;
105+ let prefix = Self :: table_data_prefix ( & table) ;
106+ let all_rows = self . storage . scan_prefix ( & prefix) ?;
106107
107- let mut rows: Vec < Row > = all_rows
108- . iter ( )
109- . filter_map ( |( _, v) | serde_json:: from_slice :: < Row > ( v) . ok ( ) )
110- . collect ( ) ;
108+ let mut rows: Vec < Row > = all_rows
109+ . iter ( )
110+ . filter_map ( |( _, v) | serde_json:: from_slice :: < Row > ( v) . ok ( ) )
111+ . collect ( ) ;
111112
112- if let Some ( where_expr) = where_clause {
113- let column_names: Vec < String > =
114- schema. columns . iter ( ) . map ( |c| c. name . clone ( ) ) . collect ( ) ;
115- let evaluator = ExpressionEvaluator :: new ( column_names) ;
113+ if let Some ( where_expr) = where_clause {
114+ let column_names: Vec < String > =
115+ schema. columns . iter ( ) . map ( |c| c. name . clone ( ) ) . collect ( ) ;
116+ let evaluator = ExpressionEvaluator :: new ( column_names) ;
116117
117- rows. retain ( |row| {
118- evaluator
119- . evaluate ( & where_expr, & row. values )
120- . unwrap_or ( false )
121- } ) ;
118+ rows. retain ( |row| {
119+ evaluator
120+ . evaluate ( & where_expr, & row. values )
121+ . unwrap_or ( false )
122+ } ) ;
122123
123- log:: debug!( "Filtered {} rows using WHERE clause" , rows. len( ) ) ;
124- }
124+ log:: debug!( "Filtered {} rows using WHERE clause" , rows. len( ) ) ;
125+ }
125126
126- if let Some ( order_clauses) = order_by {
127- let column_names: Vec < String > =
128- schema. columns . iter ( ) . map ( |c| c. name . clone ( ) ) . collect ( ) ;
129-
130- for order_clause in order_clauses. iter ( ) . rev ( ) {
131- if let Some ( col_idx) =
132- column_names. iter ( ) . position ( |c| c == & order_clause. column )
133- {
134- rows. sort_by ( |a, b| {
135- let ordering = match ( & a. values [ col_idx] , & b. values [ col_idx] ) {
136- ( Value :: Integer ( av) , Value :: Integer ( bv) ) => av. cmp ( bv) ,
137- ( Value :: Float ( av) , Value :: Float ( bv) ) => {
138- av. partial_cmp ( bv) . unwrap_or ( std:: cmp:: Ordering :: Equal )
139- }
140- ( Value :: Text ( av) , Value :: Text ( bv) ) => av. cmp ( bv) ,
141- ( Value :: Boolean ( av) , Value :: Boolean ( bv) ) => av. cmp ( bv) ,
142- _ => std:: cmp:: Ordering :: Equal ,
143- } ;
144-
145- if order_clause. ascending {
146- ordering
147- } else {
148- ordering. reverse ( )
127+ if let Some ( order_clauses) = order_by {
128+ let column_names: Vec < String > =
129+ schema. columns . iter ( ) . map ( |c| c. name . clone ( ) ) . collect ( ) ;
130+
131+ for order_clause in order_clauses. iter ( ) . rev ( ) {
132+ if let Some ( col_idx) =
133+ column_names. iter ( ) . position ( |c| c == & order_clause. column )
134+ {
135+ rows. sort_by ( |a, b| {
136+ let ordering = match ( & a. values [ col_idx] , & b. values [ col_idx] ) {
137+ ( Value :: Integer ( av) , Value :: Integer ( bv) ) => av. cmp ( bv) ,
138+ ( Value :: Float ( av) , Value :: Float ( bv) ) => {
139+ av. partial_cmp ( bv) . unwrap_or ( std:: cmp:: Ordering :: Equal )
149140 }
150- } ) ;
151- }
141+ ( Value :: Text ( av) , Value :: Text ( bv) ) => av. cmp ( bv) ,
142+ ( Value :: Boolean ( av) , Value :: Boolean ( bv) ) => av. cmp ( bv) ,
143+ _ => std:: cmp:: Ordering :: Equal ,
144+ } ;
145+
146+ if order_clause. ascending {
147+ ordering
148+ } else {
149+ ordering. reverse ( )
150+ }
151+ } ) ;
152152 }
153-
154- log:: debug!( "Sorted {} rows using ORDER BY" , rows. len( ) ) ;
155153 }
156154
157- if let Some ( limit_count) = limit {
158- rows. truncate ( limit_count) ;
159- log:: debug!( "Limited to {} rows using LIMIT" , limit_count) ;
160- }
155+ log:: debug!( "Sorted {} rows using ORDER BY" , rows. len( ) ) ;
156+ }
161157
162- if let Some ( cache) = & self . cache {
163- let query_str = format ! ( "SELECT {:?} FROM {}" , columns, table) ;
164- let cached_data = serde_json:: to_string ( & rows) . unwrap_or_default ( ) ;
165- let _ = cache. put ( & query_str, & cached_data) ;
166- }
158+ if let Some ( limit_count) = limit {
159+ rows. truncate ( limit_count) ;
160+ log:: debug!( "Limited to {} rows using LIMIT" , limit_count) ;
161+ }
167162
168- Ok ( ExecutionResult :: Selected { columns, rows } )
163+ if let Some ( cache) = & self . cache {
164+ let query_str = format ! ( "SELECT {:?} FROM {}" , columns, table) ;
165+ let cached_data = serde_json:: to_string ( & rows) . unwrap_or_default ( ) ;
166+ let _ = cache. put ( & query_str, & cached_data) ;
169167 }
170- Statement :: Delete {
171- table,
172- where_clause,
173- } => {
174- let schema = self . catalog . get_table ( & table) ?. ok_or_else ( || {
175- StorageError :: ReadError ( format ! ( "Table {} not found" , table) )
176- } ) ?;
177-
178- let prefix = Self :: table_data_prefix ( & table) ;
179-
180- if let Some ( where_expr) = where_clause {
181- let column_names: Vec < String > =
182- schema. columns . iter ( ) . map ( |c| c. name . clone ( ) ) . collect ( ) ;
183- let evaluator = ExpressionEvaluator :: new ( column_names) ;
184-
185- // Two-phase deletion: first collect keys to delete, then delete them
186- // This prevents partial deletion if WHERE evaluation fails
187- let all_rows = self . storage . scan_prefix ( & prefix) ?;
188- let mut keys_to_delete: Vec < Vec < u8 > > = Vec :: new ( ) ;
189-
190- // Phase 1: Evaluate all rows and collect matching keys
191- for ( key, value) in & all_rows {
192- if let Ok ( row) = serde_json:: from_slice :: < Row > ( value) {
193- match evaluator. evaluate ( & where_expr, & row. values ) {
194- Ok ( true ) => {
195- keys_to_delete. push ( key. clone ( ) ) ;
196- }
197- Ok ( false ) => {
198- // Row doesn't match WHERE condition, skip
199- }
200- Err ( e) => {
201- return Err ( StorageError :: ReadError ( format ! (
168+
169+ Ok ( ExecutionResult :: Selected { columns, rows } )
170+ }
171+ Statement :: Delete {
172+ table,
173+ where_clause,
174+ } => {
175+ let schema = self
176+ . catalog
177+ . get_table ( & table) ?
178+ . ok_or_else ( || StorageError :: ReadError ( format ! ( "Table {} not found" , table) ) ) ?;
179+
180+ let prefix = Self :: table_data_prefix ( & table) ;
181+
182+ if let Some ( where_expr) = where_clause {
183+ let column_names: Vec < String > =
184+ schema. columns . iter ( ) . map ( |c| c. name . clone ( ) ) . collect ( ) ;
185+ let evaluator = ExpressionEvaluator :: new ( column_names) ;
186+
187+ // Two-phase deletion: first collect keys to delete, then delete them
188+ // This prevents partial deletion if WHERE evaluation fails
189+ let all_rows = self . storage . scan_prefix ( & prefix) ?;
190+ let mut keys_to_delete: Vec < Vec < u8 > > = Vec :: new ( ) ;
191+
192+ // Phase 1: Evaluate all rows and collect matching keys
193+ for ( key, value) in & all_rows {
194+ if let Ok ( row) = serde_json:: from_slice :: < Row > ( value) {
195+ match evaluator. evaluate ( & where_expr, & row. values ) {
196+ Ok ( true ) => {
197+ keys_to_delete. push ( key. clone ( ) ) ;
198+ }
199+ Ok ( false ) => {
200+ // Row doesn't match WHERE condition, skip
201+ }
202+ Err ( e) => {
203+ return Err ( StorageError :: ReadError ( format ! (
202204 "WHERE clause evaluation failed on row: {}. No rows were deleted." , e
203205 ) ) ) ;
204- }
205206 }
206207 }
207208 }
209+ }
208210
209- // Phase 2: Delete all matching rows (only if Phase 1 succeeded)
210- let deleted_count = keys_to_delete. len ( ) ;
211- for key in keys_to_delete {
212- self . storage . delete ( & key) ?;
213- }
214-
215- Ok ( ExecutionResult :: Deleted {
216- table,
217- rows : deleted_count,
218- } )
219- } else {
220- // No WHERE clause - delete all rows
221- log:: warn!( "DELETE without WHERE clause will remove all rows from table '{}'" , table) ;
222- let all_rows = self . storage . scan_prefix ( & prefix) ?;
223- let deleted_count = all_rows. len ( ) ;
224- for ( key, _) in & all_rows {
225- self . storage . delete ( key) ?;
226- }
211+ // Phase 2: Delete all matching rows (only if Phase 1 succeeded)
212+ let deleted_count = keys_to_delete. len ( ) ;
213+ for key in keys_to_delete {
214+ self . storage . delete ( & key) ?;
215+ }
227216
228- Ok ( ExecutionResult :: Deleted {
229- table,
230- rows : deleted_count,
231- } )
217+ Ok ( ExecutionResult :: Deleted {
218+ table,
219+ rows : deleted_count,
220+ } )
221+ } else {
222+ // No WHERE clause - delete all rows
223+ log:: warn!(
224+ "DELETE without WHERE clause will remove all rows from table '{}'" ,
225+ table
226+ ) ;
227+ let all_rows = self . storage . scan_prefix ( & prefix) ?;
228+ let deleted_count = all_rows. len ( ) ;
229+ for ( key, _) in & all_rows {
230+ self . storage . delete ( key) ?;
232231 }
232+
233+ Ok ( ExecutionResult :: Deleted {
234+ table,
235+ rows : deleted_count,
236+ } )
233237 }
234- } ;
238+ }
239+ } ;
235240
236241 let duration = start. elapsed ( ) ;
237242 log:: debug!( "Query executed in {:?}" , duration) ;
@@ -464,23 +469,18 @@ mod tests {
464469 // Create table
465470 let create = Statement :: CreateTable {
466471 name : "test_delete_all" . to_string ( ) ,
467- columns : vec ! [
468- Column {
469- name: "id" . to_string( ) ,
470- data_type: DataType :: Integer ,
471- } ,
472- ] ,
472+ columns : vec ! [ Column {
473+ name: "id" . to_string( ) ,
474+ data_type: DataType :: Integer ,
475+ } ] ,
473476 } ;
474477 executor. execute ( create) . unwrap ( ) ;
475478
476479 // Insert rows
477480 let insert = Statement :: Insert {
478481 table : "test_delete_all" . to_string ( ) ,
479482 columns : vec ! [ "id" . to_string( ) ] ,
480- values : vec ! [
481- vec![ Value :: Integer ( 1 ) ] ,
482- vec![ Value :: Integer ( 2 ) ] ,
483- ] ,
483+ values : vec ! [ vec![ Value :: Integer ( 1 ) ] , vec![ Value :: Integer ( 2 ) ] ] ,
484484 } ;
485485 executor. execute ( insert) . unwrap ( ) ;
486486
0 commit comments