Skip to content

Commit ce1a2c7

Browse files
Start structuring observability table
1 parent 9edc2a7 commit ce1a2c7

3 files changed

Lines changed: 108 additions & 0 deletions

File tree

crates/datafusion-app/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ hudi = { git = "https://github.com/apache/hudi-rs", rev = "eab36e2d3ea39372c52d2
2626
], optional = true }
2727
iceberg-catalog-rest = { git = "https://github.com/apache/iceberg-rust", rev = "210134573569be0b6e49464aca076cb122e33e24", optional = true }
2828
iceberg-datafusion = { git = "https://github.com/apache/iceberg-rust", rev = "210134573569be0b6e49464aca076cb122e33e24", optional = true }
29+
indexmap = { version = "2.8.0", features = ["serde"] }
2930
itertools = "0.13.0"
3031
log = "0.4.22"
3132
num_cpus = "1.16.0"
@@ -53,3 +54,6 @@ iceberg = ["dep:iceberg-catalog-rest", "dep:iceberg-datafusion"]
5354
observability = []
5455
s3 = ["object_store/aws", "url"]
5556
udfs-wasm = ["dep:datafusion-udfs-wasm"]
57+
58+
[lints.clippy]
59+
clone_on_ref_ptr = "deny"

crates/datafusion-app/src/observability/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
pub mod table;
19+
1820
use std::{collections::HashMap, sync::Arc};
1921

2022
use datafusion::{
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::{any::Any, collections::HashMap, sync::Arc};
19+
20+
use datafusion::{
21+
arrow::datatypes::{Schema, SchemaRef},
22+
catalog::{Session, TableProvider},
23+
common::{Constraints, Result},
24+
datasource::TableType,
25+
logical_expr::dml::InsertOp,
26+
physical_plan::ExecutionPlan,
27+
prelude::Expr,
28+
scalar::ScalarValue,
29+
};
30+
use indexmap::IndexMap;
31+
use parking_lot::RwLock;
32+
33+
type ObservabilityData = Arc<RwLock<IndexMap<String, HashMap<String, ScalarValue>>>>;
34+
35+
#[derive(Debug)]
36+
pub struct ObservabilityTableConfig {
37+
primary_key: String,
38+
}
39+
40+
/// Table for tracking observability information. Data is held in a IndexMap, which maintains
41+
/// insertion order, while the app is running and is serialized on app shutdown.
42+
#[derive(Debug)]
43+
pub struct ObservabilityTable {
44+
schema: Arc<Schema>,
45+
constraints: Option<Constraints>,
46+
config: ObservabilityTableConfig,
47+
inner: ObservabilityData,
48+
}
49+
50+
impl ObservabilityTable {
51+
pub fn try_new(
52+
schema: Arc<Schema>,
53+
constraints: Option<Constraints>,
54+
config: ObservabilityTableConfig,
55+
) -> Result<Self> {
56+
let inner = Arc::new(RwLock::new(IndexMap::new()));
57+
Ok(Self {
58+
schema,
59+
constraints,
60+
config,
61+
inner,
62+
})
63+
}
64+
}
65+
66+
impl TableProvider for ObservabilityTable {
67+
fn as_any(&self) -> &dyn Any {
68+
self
69+
}
70+
71+
fn schema(&self) -> SchemaRef {
72+
Arc::clone(&self.schema)
73+
}
74+
75+
fn constraints(&self) -> Option<&Constraints> {
76+
self.constraints.as_ref()
77+
}
78+
79+
fn table_type(&self) -> TableType {
80+
TableType::Base
81+
}
82+
83+
fn scan(
84+
&self,
85+
state: &dyn Session,
86+
projection: Option<&Vec<usize>>,
87+
filters: &[Expr],
88+
limit: Option<usize>,
89+
) -> Result<Arc<dyn ExecutionPlan>> {
90+
}
91+
92+
// async fn insert_into(
93+
// &self,
94+
// _state: &dyn Session,
95+
// input: Arc<dyn ExecutionPlan>,
96+
// insert_op: InsertOp,
97+
// ) -> Result<Arc<dyn ExecutionPlan>> {
98+
// }
99+
}
100+
101+
#[cfg(test)]
102+
mod test {}

0 commit comments

Comments
 (0)