@@ -40,31 +40,37 @@ use datafusion::{
4040use indexmap:: IndexMap ;
4141use parking_lot:: RwLock ;
4242
43+ // The first String key is meant to hold primary key and provide O(1) lookup. The inner HashMap is
44+ // for holding arbitrary column and value pairs - the key is the column name and we use DataFusions
45+ // scalar value to provide dynamic typing for the column values.
4346type IndexMapData = Arc < RwLock < IndexMap < String , HashMap < String , ScalarValue > > > > ;
4447
4548#[ derive( Debug ) ]
46- pub struct IndexMapTableConfig {
49+ pub struct MapTableConfig {
50+ table_name : String ,
4751 primary_key : String ,
4852}
4953
5054/// Table for tracking observability information. Data is held in a IndexMap, which maintains
5155/// insertion order, while the app is running and is serialized on app shutdown.
5256///
5357/// TODO: Add filter pushdown on the primary key and use `get` on that for O(1)
54- /// TODO: Add filter pushdown on non primary key and use `binary_search_by` to search values
58+ /// TODO: Add filter pushdown on non primary key and use `binary_search_by` / `range` (whatever
59+ /// method the underlying map provides) to search values
5560#[ derive( Debug ) ]
56- pub struct IndexMapTable {
61+ pub struct MapTable {
5762 schema : Arc < Schema > ,
5863 constraints : Option < Constraints > ,
59- config : IndexMapTableConfig ,
64+ config : MapTableConfig ,
65+ // TODO: This will be based on a Trait so you can use IndexMap, DashMap, BTreeMap, etc...
6066 inner : IndexMapData ,
6167}
6268
63- impl IndexMapTable {
69+ impl MapTable {
6470 pub fn try_new (
6571 schema : Arc < Schema > ,
6672 constraints : Option < Constraints > ,
67- config : IndexMapTableConfig ,
73+ config : MapTableConfig ,
6874 ) -> Result < Self > {
6975 let inner = Arc :: new ( RwLock :: new ( IndexMap :: new ( ) ) ) ;
7076 Ok ( Self {
@@ -76,6 +82,19 @@ impl IndexMapTable {
7682 }
7783
7884 fn hashmap_to_row ( & self , values : & HashMap < String , ScalarValue > ) -> Result < ( ) > {
85+ for ( col, val) in values {
86+ // Check that the column is in the tables schema
87+ if let Some ( _) = self . schema . fields . find ( col) {
88+ } else {
89+ return Err ( datafusion:: error:: DataFusionError :: External (
90+ format ! (
91+ "Column {} for table {} is not in the provided schema" ,
92+ col, self . config. table_name
93+ )
94+ . into ( ) ,
95+ ) ) ;
96+ }
97+ }
7998 Ok ( ( ) )
8099 }
81100
@@ -91,7 +110,7 @@ impl IndexMapTable {
91110}
92111
93112#[ async_trait]
94- impl TableProvider for IndexMapTable {
113+ impl TableProvider for MapTable {
95114 fn as_any ( & self ) -> & dyn Any {
96115 self
97116 }
@@ -117,7 +136,7 @@ impl TableProvider for IndexMapTable {
117136 ) -> Result < Arc < dyn ExecutionPlan > > {
118137 let partitions = self . partitions ( ) ;
119138 let exec =
120- IndexMapExec :: try_new ( partitions, Arc :: clone ( & self . schema ) , projection. cloned ( ) ) ?;
139+ IndexMapExec :: try_new ( & partitions, Arc :: clone ( & self . schema ) , projection. cloned ( ) ) ?;
121140 Ok ( Arc :: new ( exec) )
122141 }
123142
0 commit comments