Skip to content

Commit be00513

Browse files
Feat/start observability (#299)
1 parent 513ecfe commit be00513

15 files changed

Lines changed: 301 additions & 21 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ functions-json = ["datafusion-app/functions-json"]
9090
functions-parquet = ["datafusion-app/functions-parquet"]
9191
http = [
9292
"axum",
93+
"datafusion-app/observability",
9394
"dep:metrics",
9495
"dep:metrics-exporter-prometheus",
9596
"dep:tower-http",

crates/datafusion-app/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,5 +50,6 @@ functions-parquet = ["dep:datafusion-functions-parquet"]
5050
hudi = ["dep:hudi"]
5151
huggingface = ["object_store_opendal", "opendal", "url"]
5252
iceberg = ["dep:iceberg-catalog-rest", "dep:iceberg-datafusion"]
53+
observability = []
5354
s3 = ["object_store/aws", "url"]
5455
udfs-wasm = ["dep:datafusion-udfs-wasm"]
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::Arc;
19+
20+
use datafusion::{
21+
arrow::{
22+
array::StringArray,
23+
datatypes::{DataType, Field, Schema},
24+
record_batch::RecordBatch,
25+
},
26+
catalog::{CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider, SchemaProvider},
27+
common::Result,
28+
datasource::MemTable,
29+
DATAFUSION_VERSION,
30+
};
31+
32+
use crate::config::ExecutionConfig;
33+
34+
pub fn create_app_catalog(
35+
_config: &ExecutionConfig,
36+
app_name: &str,
37+
app_version: &str,
38+
) -> Result<Arc<dyn CatalogProvider>> {
39+
let catalog = MemoryCatalogProvider::new();
40+
let meta_schema = Arc::new(MemorySchemaProvider::new());
41+
catalog.register_schema("meta", meta_schema.clone())?;
42+
let versions_table = try_create_meta_versions_table(app_name, app_version)?;
43+
meta_schema.register_table("versions".to_string(), versions_table)?;
44+
Ok(Arc::new(catalog))
45+
}
46+
47+
fn try_create_meta_versions_table(app_name: &str, app_version: &str) -> Result<Arc<MemTable>> {
48+
let fields = vec![
49+
Field::new(app_name, DataType::Utf8, false),
50+
Field::new("datafusion", DataType::Utf8, false),
51+
Field::new("datafusion-app", DataType::Utf8, false),
52+
];
53+
let schema = Arc::new(Schema::new(fields));
54+
55+
let app_version_arr = StringArray::from(vec![app_version]);
56+
let datafusion_version_arr = StringArray::from(vec![DATAFUSION_VERSION]);
57+
let datafusion_app_version_arr = StringArray::from(vec![env!("CARGO_PKG_VERSION")]);
58+
let batches = RecordBatch::try_new(
59+
schema.clone(),
60+
vec![
61+
Arc::new(app_version_arr),
62+
Arc::new(datafusion_version_arr),
63+
Arc::new(datafusion_app_version_arr),
64+
],
65+
)?;
66+
67+
Ok(Arc::new(MemTable::try_new(schema, vec![vec![batches]])?))
68+
}

crates/datafusion-app/src/config.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,11 @@ pub struct ExecutionConfig {
8989
#[cfg(feature = "udfs-wasm")]
9090
#[serde(default = "default_wasm_udf")]
9191
pub wasm_udf: WasmUdfConfig,
92+
#[serde(default = "default_catalog")]
93+
pub catalog: CatalogConfig,
94+
// #[cfg(feature = "observability")]
95+
// #[serde(default)]
96+
// pub observability: ObservabilityConfig,
9297
}
9398

9499
impl Default for ExecutionConfig {
@@ -103,6 +108,9 @@ impl Default for ExecutionConfig {
103108
iceberg: default_iceberg_config(),
104109
#[cfg(feature = "udfs-wasm")]
105110
wasm_udf: default_wasm_udf(),
111+
catalog: default_catalog(),
112+
// #[cfg(feature = "observability")]
113+
// observability: default_observability(),
106114
}
107115
}
108116
}
@@ -268,3 +276,46 @@ pub struct BasicAuth {
268276
pub username: String,
269277
pub password: String,
270278
}
279+
280+
#[derive(Clone, Debug, Deserialize)]
281+
pub struct CatalogConfig {
282+
#[serde(default = "default_catalog_name")]
283+
pub name: String,
284+
}
285+
286+
impl Default for CatalogConfig {
287+
fn default() -> Self {
288+
Self {
289+
name: default_catalog_name(),
290+
}
291+
}
292+
}
293+
294+
fn default_catalog() -> CatalogConfig {
295+
CatalogConfig::default()
296+
}
297+
298+
fn default_catalog_name() -> String {
299+
"dft".to_string()
300+
}
301+
302+
#[cfg(feature = "observability")]
303+
#[derive(Clone, Debug, Deserialize)]
304+
pub struct ObservabilityConfig {
305+
#[serde(default = "default_observability_catalog_name")]
306+
pub catalog_name: String,
307+
}
308+
309+
#[cfg(feature = "observability")]
310+
impl Default for ObservabilityConfig {
311+
fn default() -> Self {
312+
Self {
313+
catalog_name: default_observability_catalog_name(),
314+
}
315+
}
316+
}
317+
318+
#[cfg(feature = "observability")]
319+
fn default_observability_catalog_name() -> String {
320+
"observability".to_string()
321+
}

crates/datafusion-app/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
pub mod catalog;
1819
pub mod config;
1920
pub mod executor;
2021
pub mod extensions;
@@ -24,6 +25,8 @@ pub mod flightsql;
2425
pub mod flightsql_benchmarks;
2526
pub mod local;
2627
pub mod local_benchmarks;
28+
#[cfg(feature = "observability")]
29+
pub mod observability;
2730
pub mod sql_utils;
2831
pub mod stats;
2932
#[cfg(feature = "udfs-wasm")]

crates/datafusion-app/src/local.rs

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use datafusion::logical_expr::LogicalPlan;
2626
use futures::TryFutureExt;
2727
use log::{debug, error, info};
2828

29+
use crate::catalog::create_app_catalog;
2930
use crate::config::ExecutionConfig;
3031
use crate::{ExecOptions, ExecResult};
3132
use color_eyre::eyre::{self, Result};
@@ -41,6 +42,8 @@ use super::local_benchmarks::LocalBenchmarkStats;
4142
use super::stats::{ExecutionDurationStats, ExecutionStats};
4243
#[cfg(feature = "udfs-wasm")]
4344
use super::wasm::create_wasm_udfs;
45+
#[cfg(feature = "observability")]
46+
use crate::observability::ObservabilityContext;
4447

4548
/// Structure for executing queries locally
4649
///
@@ -64,6 +67,9 @@ pub struct ExecutionContext {
6467
ddl_path: Option<PathBuf>,
6568
/// Dedicated executor for running CPU intensive work
6669
executor: Option<DedicatedExecutor>,
70+
/// Observability handlers
71+
#[cfg(feature = "observability")]
72+
observability: ObservabilityContext,
6773
}
6874

6975
impl std::fmt::Debug for ExecutionContext {
@@ -74,21 +80,35 @@ impl std::fmt::Debug for ExecutionContext {
7480

7581
impl ExecutionContext {
7682
/// Construct a new `ExecutionContext` with the specified configuration
77-
pub fn try_new(config: &ExecutionConfig, session_state: SessionState) -> Result<Self> {
83+
pub fn try_new(
84+
config: &ExecutionConfig,
85+
session_state: SessionState,
86+
app_name: &str,
87+
app_version: &str,
88+
) -> Result<Self> {
7889
let mut executor = None;
7990
if config.dedicated_executor_enabled {
8091
// Ideally we would only use `enable_time` but we are still doing
8192
// some network requests as part of planning / execution which require network
8293
// functionality.
83-
8494
let runtime_builder = tokio::runtime::Builder::new_multi_thread();
8595
let dedicated_executor =
8696
DedicatedExecutor::new("cpu_runtime", config.clone(), runtime_builder);
8797
executor = Some(dedicated_executor)
8898
}
8999

90-
#[allow(unused_mut)]
100+
#[cfg(any(
101+
feature = "udfs-wasm",
102+
feature = "observability",
103+
feature = "functions-json"
104+
))]
91105
let mut session_ctx = SessionContext::new_with_state(session_state);
106+
#[cfg(all(
107+
not(feature = "udfs-wasm"),
108+
not(feature = "observability"),
109+
not(feature = "functions-json")
110+
))]
111+
let session_ctx = SessionContext::new_with_state(session_state);
92112

93113
#[cfg(feature = "functions-json")]
94114
datafusion_functions_json::register_all(&mut session_ctx)?;
@@ -109,12 +129,32 @@ impl ExecutionContext {
109129
Arc::new(datafusion_functions_parquet::ParquetMetadataFunc {}),
110130
);
111131

112-
Ok(Self {
132+
let catalog = create_app_catalog(config, app_name, app_version)?;
133+
session_ctx.register_catalog(&config.catalog.name, catalog);
134+
135+
// #[cfg(feature = "observability")]
136+
// {
137+
// let obs = ObservabilityContext::new(config.observability.clone());
138+
// }
139+
140+
#[cfg(feature = "observability")]
141+
let ctx = Self {
113142
config: config.clone(),
114143
session_ctx,
115144
ddl_path: config.ddl_path.as_ref().map(PathBuf::from),
116145
executor,
117-
})
146+
observability: ObservabilityContext::default(),
147+
};
148+
149+
#[cfg(not(feature = "observability"))]
150+
let ctx = Self {
151+
config: config.clone(),
152+
session_ctx,
153+
ddl_path: config.ddl_path.as_ref().map(PathBuf::from),
154+
executor,
155+
};
156+
157+
Ok(ctx)
118158
}
119159

120160
pub fn config(&self) -> &ExecutionConfig {
@@ -135,6 +175,12 @@ impl ExecutionContext {
135175
&self.executor
136176
}
137177

178+
/// Return the `ObservabilityCtx`
179+
#[cfg(feature = "observability")]
180+
pub fn observability(&self) -> &ObservabilityContext {
181+
&self.observability
182+
}
183+
138184
/// Convert the statement to a `LogicalPlan`. Uses the [`DedicatedExecutor`] if it is available.
139185
pub async fn statement_to_logical_plan(&self, statement: Statement) -> Result<LogicalPlan> {
140186
let ctx = self.session_ctx.clone();
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::{sync::Arc, time::Duration};
19+
20+
use datafusion::catalog::{MemorySchemaProvider, SchemaProvider};
21+
22+
use crate::config::ObservabilityConfig;
23+
24+
#[derive(Clone, Debug)]
25+
pub struct ObservabilityContext {
26+
schema: Arc<dyn SchemaProvider>,
27+
_config: ObservabilityConfig,
28+
}
29+
30+
impl ObservabilityContext {
31+
pub fn new(config: ObservabilityConfig) -> Self {
32+
let schema = MemorySchemaProvider::new();
33+
Self {
34+
schema: Arc::new(schema),
35+
_config: config,
36+
}
37+
}
38+
39+
pub fn catalog(&self) -> Arc<dyn SchemaProvider> {
40+
self.schema.clone()
41+
}
42+
43+
pub fn record_request(&self, _sql: &str, _duration: Duration) {}
44+
}
45+
46+
impl Default for ObservabilityContext {
47+
fn default() -> Self {
48+
Self {
49+
schema: Arc::new(MemorySchemaProvider::new()),
50+
_config: ObservabilityConfig::default(),
51+
}
52+
}
53+
}

src/cli/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -585,7 +585,12 @@ pub async fn try_run(cli: DftArgs, config: AppConfig) -> Result<()> {
585585

586586
// CLI mode: executing commands from files or CLI arguments
587587
let session_state = session_state_builder.build()?;
588-
let execution_ctx = ExecutionContext::try_new(&merged_exec_config, session_state)?;
588+
let execution_ctx = ExecutionContext::try_new(
589+
&merged_exec_config,
590+
session_state,
591+
env!("CARGO_PKG_NAME"),
592+
env!("CARGO_PKG_VERSION"),
593+
)?;
589594
#[allow(unused_mut)]
590595
let mut app_execution = AppExecution::new(execution_ctx);
591596
#[cfg(feature = "flightsql")]

src/server/flightsql/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,12 @@ pub async fn try_run(cli: DftArgs, config: AppConfig) -> Result<()> {
178178
.await?;
179179
let session_state = session_state_builder.build()?;
180180
// FlightSQL Server mode: start a FlightSQL server
181-
let execution_ctx = ExecutionContext::try_new(&merged_exec_config, session_state)?;
181+
let execution_ctx = ExecutionContext::try_new(
182+
&merged_exec_config,
183+
session_state,
184+
env!("CARGO_PKG_NAME"),
185+
env!("CARGO_PKG_VERSION"),
186+
)?;
182187
if cli.run_ddl {
183188
execution_ctx.execute_ddl().await;
184189
}

src/server/http/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,12 @@ pub async fn try_run(cli: DftArgs, config: AppConfig) -> Result<()> {
113113
.with_extensions()
114114
.await?;
115115
let session_state = session_state_builder.build()?;
116-
let execution_ctx = ExecutionContext::try_new(&merged_exec_config, session_state)?;
116+
let execution_ctx = ExecutionContext::try_new(
117+
&merged_exec_config,
118+
session_state,
119+
env!("CARGO_PKG_NAME"),
120+
env!("CARGO_PKG_VERSION"),
121+
)?;
117122
if cli.run_ddl {
118123
execution_ctx.execute_ddl().await;
119124
}

0 commit comments

Comments
 (0)