Skip to content

Commit 1805adb

Browse files
Add Observability (#300)
1 parent be00513 commit 1805adb

15 files changed

Lines changed: 456 additions & 110 deletions

File tree

.github/workflows/test.yml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,31 @@ jobs:
221221
- name: Run UDFs WASM tests
222222
run: |
223223
cargo test --manifest-path crates/datafusion-functions-parquet/Cargo.toml
224+
test-crate-datafusion-app:
225+
name: Crate / DataFusion-App
226+
runs-on: ubuntu-latest
227+
strategy:
228+
matrix:
229+
arch: [amd64]
230+
steps:
231+
- uses: actions/checkout@v2
232+
with:
233+
submodules: true
234+
- name: Cache Cargo
235+
uses: actions/cache@v4
236+
with:
237+
path: /home/runner/.cargo
238+
key: cargo-dft-cache-
239+
- name: Cache Rust dependencies
240+
uses: actions/cache@v4
241+
with:
242+
path: target
243+
key: ${{ runner.os }}-cargo-target-${{ hashFiles('Cargo.lock') }}
244+
- name: Setup Rust Toolchain
245+
uses: ./.github/actions/setup-rust
246+
- name: Run UDFs WASM tests
247+
run: |
248+
cargo test --all-features --manifest-path crates/datafusion-app/Cargo.toml
224249
test-crate-udfs-wasm:
225250
name: Crate / UDFs-WASM
226251
runs-on: ubuntu-latest

Cargo.lock

Lines changed: 51 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ futures = "0.3.30"
2929
http = "1"
3030
http-body = "1"
3131
itertools = "0.13.0"
32+
jiff = { version = "0.2.5", optional = true }
3233
lazy_static = "1.4.0"
3334
log = "0.4.22"
3435
metrics = { version = "0.24.0", optional = true }
@@ -59,7 +60,7 @@ tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
5960
tui-logger = { version = "0.12", features = ["tracing-support"] }
6061
tui-textarea = { version = "0.6.1", features = ["search"] }
6162
url = { version = "2.5.2", optional = true }
62-
uuid = { version = "1.10.0" }
63+
uuid = { version = "1.10.0", optional = true }
6364

6465
[dev-dependencies]
6566
assert_cmd = "2.0.16"
@@ -85,15 +86,18 @@ flightsql = [
8586
"dep:metrics-exporter-prometheus",
8687
"dep:tonic",
8788
"dep:tower-http",
89+
"dep:uuid",
8890
]
8991
functions-json = ["datafusion-app/functions-json"]
9092
functions-parquet = ["datafusion-app/functions-parquet"]
9193
http = [
9294
"axum",
9395
"datafusion-app/observability",
96+
"dep:jiff",
9497
"dep:metrics",
9598
"dep:metrics-exporter-prometheus",
9699
"dep:tower-http",
100+
"dep:uuid",
97101
]
98102
hudi = ["datafusion-app/hudi"]
99103
huggingface = ["datafusion-app/huggingface"]

crates/datafusion-app/src/config.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,9 @@ pub struct ExecutionConfig {
9191
pub wasm_udf: WasmUdfConfig,
9292
#[serde(default = "default_catalog")]
9393
pub catalog: CatalogConfig,
94-
// #[cfg(feature = "observability")]
95-
// #[serde(default)]
96-
// pub observability: ObservabilityConfig,
94+
#[cfg(feature = "observability")]
95+
#[serde(default)]
96+
pub observability: ObservabilityConfig,
9797
}
9898

9999
impl Default for ExecutionConfig {
@@ -109,8 +109,8 @@ impl Default for ExecutionConfig {
109109
#[cfg(feature = "udfs-wasm")]
110110
wasm_udf: default_wasm_udf(),
111111
catalog: default_catalog(),
112-
// #[cfg(feature = "observability")]
113-
// observability: default_observability(),
112+
#[cfg(feature = "observability")]
113+
observability: default_observability(),
114114
}
115115
}
116116
}
@@ -302,20 +302,25 @@ fn default_catalog_name() -> String {
302302
#[cfg(feature = "observability")]
303303
#[derive(Clone, Debug, Deserialize)]
304304
pub struct ObservabilityConfig {
305-
#[serde(default = "default_observability_catalog_name")]
306-
pub catalog_name: String,
305+
#[serde(default = "default_observability_schema_name")]
306+
pub schema_name: String,
307307
}
308308

309309
#[cfg(feature = "observability")]
310310
impl Default for ObservabilityConfig {
311311
fn default() -> Self {
312312
Self {
313-
catalog_name: default_observability_catalog_name(),
313+
schema_name: default_observability_schema_name(),
314314
}
315315
}
316316
}
317317

318318
#[cfg(feature = "observability")]
319-
fn default_observability_catalog_name() -> String {
319+
fn default_observability() -> ObservabilityConfig {
320+
ObservabilityConfig::default()
321+
}
322+
323+
#[cfg(feature = "observability")]
324+
fn default_observability_schema_name() -> String {
320325
"observability".to_string()
321326
}

crates/datafusion-app/src/local.rs

Lines changed: 37 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -97,18 +97,8 @@ impl ExecutionContext {
9797
executor = Some(dedicated_executor)
9898
}
9999

100-
#[cfg(any(
101-
feature = "udfs-wasm",
102-
feature = "observability",
103-
feature = "functions-json"
104-
))]
100+
#[allow(unused_mut)]
105101
let mut session_ctx = SessionContext::new_with_state(session_state);
106-
#[cfg(all(
107-
not(feature = "udfs-wasm"),
108-
not(feature = "observability"),
109-
not(feature = "functions-json")
110-
))]
111-
let session_ctx = SessionContext::new_with_state(session_state);
112102

113103
#[cfg(feature = "functions-json")]
114104
datafusion_functions_json::register_all(&mut session_ctx)?;
@@ -132,26 +122,42 @@ impl ExecutionContext {
132122
let catalog = create_app_catalog(config, app_name, app_version)?;
133123
session_ctx.register_catalog(&config.catalog.name, catalog);
134124

135-
// #[cfg(feature = "observability")]
136-
// {
137-
// let obs = ObservabilityContext::new(config.observability.clone());
138-
// }
139-
140-
#[cfg(feature = "observability")]
141-
let ctx = Self {
142-
config: config.clone(),
143-
session_ctx,
144-
ddl_path: config.ddl_path.as_ref().map(PathBuf::from),
145-
executor,
146-
observability: ObservabilityContext::default(),
147-
};
148-
149-
#[cfg(not(feature = "observability"))]
150-
let ctx = Self {
151-
config: config.clone(),
152-
session_ctx,
153-
ddl_path: config.ddl_path.as_ref().map(PathBuf::from),
154-
executor,
125+
let ctx = {
126+
#[cfg(feature = "observability")]
127+
{
128+
let observability =
129+
ObservabilityContext::try_new(config.observability.clone(), app_name)?;
130+
if let Some(cat) = session_ctx.catalog(&config.catalog.name) {
131+
match cat
132+
.register_schema(&config.observability.schema_name, observability.schema())
133+
{
134+
Ok(_) => {
135+
info!("Registered observability schema")
136+
}
137+
Err(e) => {
138+
error!("Error registering observability schema: {}", e.to_string())
139+
}
140+
}
141+
} else {
142+
error!("Missing catalog to register observability schema")
143+
}
144+
Self {
145+
config: config.clone(),
146+
session_ctx,
147+
ddl_path: config.ddl_path.as_ref().map(PathBuf::from),
148+
executor,
149+
observability,
150+
}
151+
}
152+
#[cfg(not(feature = "observability"))]
153+
{
154+
Self {
155+
config: config.clone(),
156+
session_ctx,
157+
ddl_path: config.ddl_path.as_ref().map(PathBuf::from),
158+
executor,
159+
}
160+
}
155161
};
156162

157163
Ok(ctx)

0 commit comments

Comments
 (0)