Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ functions-json = ["datafusion-app/functions-json"]
functions-parquet = ["datafusion-app/functions-parquet"]
http = [
"axum",
"datafusion-app/observability",
"dep:metrics",
"dep:metrics-exporter-prometheus",
"dep:tower-http",
Expand Down
1 change: 1 addition & 0 deletions crates/datafusion-app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,6 @@ functions-parquet = ["dep:datafusion-functions-parquet"]
hudi = ["dep:hudi"]
huggingface = ["object_store_opendal", "opendal", "url"]
iceberg = ["dep:iceberg-catalog-rest", "dep:iceberg-datafusion"]
observability = []
s3 = ["object_store/aws", "url"]
udfs-wasm = ["dep:datafusion-udfs-wasm"]
68 changes: 68 additions & 0 deletions crates/datafusion-app/src/catalog/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use datafusion::{
arrow::{
array::StringArray,
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
},
catalog::{CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider, SchemaProvider},
common::Result,
datasource::MemTable,
DATAFUSION_VERSION,
};

use crate::config::ExecutionConfig;

pub fn create_app_catalog(
_config: &ExecutionConfig,
app_name: &str,
app_version: &str,
) -> Result<Arc<dyn CatalogProvider>> {
let catalog = MemoryCatalogProvider::new();
let meta_schema = Arc::new(MemorySchemaProvider::new());
catalog.register_schema("meta", meta_schema.clone())?;
let versions_table = try_create_meta_versions_table(app_name, app_version)?;
meta_schema.register_table("versions".to_string(), versions_table)?;
Ok(Arc::new(catalog))
}

fn try_create_meta_versions_table(app_name: &str, app_version: &str) -> Result<Arc<MemTable>> {
let fields = vec![
Field::new(app_name, DataType::Utf8, false),
Field::new("datafusion", DataType::Utf8, false),
Field::new("datafusion-app", DataType::Utf8, false),
];
let schema = Arc::new(Schema::new(fields));

let app_version_arr = StringArray::from(vec![app_version]);
let datafusion_version_arr = StringArray::from(vec![DATAFUSION_VERSION]);
let datafusion_app_version_arr = StringArray::from(vec![env!("CARGO_PKG_VERSION")]);
let batches = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(app_version_arr),
Arc::new(datafusion_version_arr),
Arc::new(datafusion_app_version_arr),
],
)?;

Ok(Arc::new(MemTable::try_new(schema, vec![vec![batches]])?))
}
51 changes: 51 additions & 0 deletions crates/datafusion-app/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ pub struct ExecutionConfig {
#[cfg(feature = "udfs-wasm")]
#[serde(default = "default_wasm_udf")]
pub wasm_udf: WasmUdfConfig,
#[serde(default = "default_catalog")]
pub catalog: CatalogConfig,
// #[cfg(feature = "observability")]
// #[serde(default)]
// pub observability: ObservabilityConfig,
}

impl Default for ExecutionConfig {
Expand All @@ -103,6 +108,9 @@ impl Default for ExecutionConfig {
iceberg: default_iceberg_config(),
#[cfg(feature = "udfs-wasm")]
wasm_udf: default_wasm_udf(),
catalog: default_catalog(),
// #[cfg(feature = "observability")]
// observability: default_observability(),
}
}
}
Expand Down Expand Up @@ -268,3 +276,46 @@ pub struct BasicAuth {
pub username: String,
pub password: String,
}

#[derive(Clone, Debug, Deserialize)]
pub struct CatalogConfig {
#[serde(default = "default_catalog_name")]
pub name: String,
}

impl Default for CatalogConfig {
fn default() -> Self {
Self {
name: default_catalog_name(),
}
}
}

fn default_catalog() -> CatalogConfig {
CatalogConfig::default()
}

fn default_catalog_name() -> String {
"dft".to_string()
}

#[cfg(feature = "observability")]
#[derive(Clone, Debug, Deserialize)]
pub struct ObservabilityConfig {
#[serde(default = "default_observability_catalog_name")]
pub catalog_name: String,
}

#[cfg(feature = "observability")]
impl Default for ObservabilityConfig {
fn default() -> Self {
Self {
catalog_name: default_observability_catalog_name(),
}
}
}

#[cfg(feature = "observability")]
fn default_observability_catalog_name() -> String {
"observability".to_string()
}
3 changes: 3 additions & 0 deletions crates/datafusion-app/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

pub mod catalog;
pub mod config;
pub mod executor;
pub mod extensions;
Expand All @@ -24,6 +25,8 @@ pub mod flightsql;
pub mod flightsql_benchmarks;
pub mod local;
pub mod local_benchmarks;
#[cfg(feature = "observability")]
pub mod observability;
pub mod sql_utils;
pub mod stats;
#[cfg(feature = "udfs-wasm")]
Expand Down
56 changes: 51 additions & 5 deletions crates/datafusion-app/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use datafusion::logical_expr::LogicalPlan;
use futures::TryFutureExt;
use log::{debug, error, info};

use crate::catalog::create_app_catalog;
use crate::config::ExecutionConfig;
use crate::{ExecOptions, ExecResult};
use color_eyre::eyre::{self, Result};
Expand All @@ -41,6 +42,8 @@ use super::local_benchmarks::LocalBenchmarkStats;
use super::stats::{ExecutionDurationStats, ExecutionStats};
#[cfg(feature = "udfs-wasm")]
use super::wasm::create_wasm_udfs;
#[cfg(feature = "observability")]
use crate::observability::ObservabilityContext;

/// Structure for executing queries locally
///
Expand All @@ -64,6 +67,9 @@ pub struct ExecutionContext {
ddl_path: Option<PathBuf>,
/// Dedicated executor for running CPU intensive work
executor: Option<DedicatedExecutor>,
/// Observability handlers
#[cfg(feature = "observability")]
observability: ObservabilityContext,
}

impl std::fmt::Debug for ExecutionContext {
Expand All @@ -74,21 +80,35 @@ impl std::fmt::Debug for ExecutionContext {

impl ExecutionContext {
/// Construct a new `ExecutionContext` with the specified configuration
pub fn try_new(config: &ExecutionConfig, session_state: SessionState) -> Result<Self> {
pub fn try_new(
config: &ExecutionConfig,
session_state: SessionState,
app_name: &str,
app_version: &str,
) -> Result<Self> {
let mut executor = None;
if config.dedicated_executor_enabled {
// Ideally we would only use `enable_time` but we are still doing
// some network requests as part of planning / execution which require network
// functionality.

let runtime_builder = tokio::runtime::Builder::new_multi_thread();
let dedicated_executor =
DedicatedExecutor::new("cpu_runtime", config.clone(), runtime_builder);
executor = Some(dedicated_executor)
}

#[allow(unused_mut)]
#[cfg(any(
feature = "udfs-wasm",
feature = "observability",
feature = "functions-json"
))]
let mut session_ctx = SessionContext::new_with_state(session_state);
#[cfg(all(
not(feature = "udfs-wasm"),
not(feature = "observability"),
not(feature = "functions-json")
))]
let session_ctx = SessionContext::new_with_state(session_state);

#[cfg(feature = "functions-json")]
datafusion_functions_json::register_all(&mut session_ctx)?;
Expand All @@ -109,12 +129,32 @@ impl ExecutionContext {
Arc::new(datafusion_functions_parquet::ParquetMetadataFunc {}),
);

Ok(Self {
let catalog = create_app_catalog(config, app_name, app_version)?;
session_ctx.register_catalog(&config.catalog.name, catalog);

// #[cfg(feature = "observability")]
// {
// let obs = ObservabilityContext::new(config.observability.clone());
// }

#[cfg(feature = "observability")]
let ctx = Self {
config: config.clone(),
session_ctx,
ddl_path: config.ddl_path.as_ref().map(PathBuf::from),
executor,
})
observability: ObservabilityContext::default(),
};

#[cfg(not(feature = "observability"))]
let ctx = Self {
config: config.clone(),
session_ctx,
ddl_path: config.ddl_path.as_ref().map(PathBuf::from),
executor,
};

Ok(ctx)
}

pub fn config(&self) -> &ExecutionConfig {
Expand All @@ -135,6 +175,12 @@ impl ExecutionContext {
&self.executor
}

/// Return the `ObservabilityCtx`
#[cfg(feature = "observability")]
pub fn observability(&self) -> &ObservabilityContext {
&self.observability
}

/// Convert the statement to a `LogicalPlan`. Uses the [`DedicatedExecutor`] if it is available.
pub async fn statement_to_logical_plan(&self, statement: Statement) -> Result<LogicalPlan> {
let ctx = self.session_ctx.clone();
Expand Down
53 changes: 53 additions & 0 deletions crates/datafusion-app/src/observability/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::{sync::Arc, time::Duration};

use datafusion::catalog::{MemorySchemaProvider, SchemaProvider};

use crate::config::ObservabilityConfig;

#[derive(Clone, Debug)]
pub struct ObservabilityContext {
schema: Arc<dyn SchemaProvider>,
_config: ObservabilityConfig,
}

impl ObservabilityContext {
pub fn new(config: ObservabilityConfig) -> Self {
let schema = MemorySchemaProvider::new();
Self {
schema: Arc::new(schema),
_config: config,
}
}

pub fn catalog(&self) -> Arc<dyn SchemaProvider> {
self.schema.clone()
}

pub fn record_request(&self, _sql: &str, _duration: Duration) {}
}

impl Default for ObservabilityContext {
fn default() -> Self {
Self {
schema: Arc::new(MemorySchemaProvider::new()),
_config: ObservabilityConfig::default(),
}
}
}
7 changes: 6 additions & 1 deletion src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,12 @@ pub async fn try_run(cli: DftArgs, config: AppConfig) -> Result<()> {

// CLI mode: executing commands from files or CLI arguments
let session_state = session_state_builder.build()?;
let execution_ctx = ExecutionContext::try_new(&merged_exec_config, session_state)?;
let execution_ctx = ExecutionContext::try_new(
&merged_exec_config,
session_state,
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_VERSION"),
)?;
#[allow(unused_mut)]
let mut app_execution = AppExecution::new(execution_ctx);
#[cfg(feature = "flightsql")]
Expand Down
7 changes: 6 additions & 1 deletion src/server/flightsql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,12 @@ pub async fn try_run(cli: DftArgs, config: AppConfig) -> Result<()> {
.await?;
let session_state = session_state_builder.build()?;
// FlightSQL Server mode: start a FlightSQL server
let execution_ctx = ExecutionContext::try_new(&merged_exec_config, session_state)?;
let execution_ctx = ExecutionContext::try_new(
&merged_exec_config,
session_state,
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_VERSION"),
)?;
if cli.run_ddl {
execution_ctx.execute_ddl().await;
}
Expand Down
7 changes: 6 additions & 1 deletion src/server/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,12 @@ pub async fn try_run(cli: DftArgs, config: AppConfig) -> Result<()> {
.with_extensions()
.await?;
let session_state = session_state_builder.build()?;
let execution_ctx = ExecutionContext::try_new(&merged_exec_config, session_state)?;
let execution_ctx = ExecutionContext::try_new(
&merged_exec_config,
session_state,
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_VERSION"),
)?;
if cli.run_ddl {
execution_ctx.execute_ddl().await;
}
Expand Down
Loading