diff --git a/.circleci/config.yml b/.circleci/config.yml index a1d83adb923..661cc58babb 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -271,10 +271,8 @@ jobs: command: cargo install cargo-deny --locked - run: name: cargo-deny Checks - # `--warn unsound` downgrades RustSec `informational = "unsound"` - # advisories to warnings, they flag API soundness concerns, not - # exploitable vulnerabilities, so we don't want them to fail CI. command: cargo deny check -s --warn unsound + doc: docker: - image: quay.io/influxdb/rust:ci diff --git a/.circleci/packages/influxdb3/fs/usr/share/influxdb3/influxdb3-core.conf b/.circleci/packages/influxdb3/fs/usr/share/influxdb3/influxdb3-core.conf index 834a68c96ad..d7e025c58fb 100644 --- a/.circleci/packages/influxdb3/fs/usr/share/influxdb3/influxdb3-core.conf +++ b/.circleci/packages/influxdb3/fs/usr/share/influxdb3/influxdb3-core.conf @@ -228,6 +228,10 @@ # URL of plugin repository (env: INFLUXDB3_PLUGIN_REPO) #plugin-repo="" +# Restrict plugin triggers to the provided trigger type(s). Comma-separated +# list of: wal, schedule, request (env: INFLUXDB3_RESTRICT_PLUGIN_TRIGGERS_TO) +#restrict-plugin-triggers-to="" + # ============================================================================= # Data Lifecycle & Retention # ============================================================================= diff --git a/Cargo.lock b/Cargo.lock index f6cf643c09f..7510357e2f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -168,7 +168,7 @@ version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -179,7 +179,7 @@ checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -2775,7 +2775,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -3578,7 +3578,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.6.3", + "socket2 0.5.10", "tokio", "tower-service", "tracing", @@ -4261,6 +4261,7 @@ dependencies = [ "arrow-flight", "arrow-json", "arrow-schema", + "async-trait", "authz", "base64 0.21.7", "bytes", @@ -4308,6 +4309,7 @@ dependencies = [ "metric_exporters", "mime", "object_store", + "object_store_utils", "observability_deps", "parquet", "parquet_file", @@ -4883,7 +4885,7 @@ checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46" dependencies = [ "hermit-abi", "libc", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -5455,7 +5457,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -5730,6 +5732,7 @@ dependencies = [ "backon", "bytes", "futures", + "iox_time", "object_store", "observability_deps", "tokio", @@ -6600,7 +6603,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls 0.23.37", - "socket2 0.6.3", + "socket2 0.5.10", "thiserror 2.0.18", "tokio", "tracing", @@ -6637,7 +6640,7 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2 0.6.3", + "socket2 0.5.10", "tracing", "windows-sys 0.60.2", ] @@ -7101,7 +7104,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.12.1", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -7138,7 +7141,7 @@ dependencies = [ "once_cell", "ring", "rustls-pki-types", - "rustls-webpki 0.103.12", + "rustls-webpki 0.103.13", "subtle", "zeroize", ] @@ -7187,9 +7190,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.103.12" +version = "0.103.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8279bb85272c9f10811ae6a6c547ff594d6a7f3c6c6b02ee9726d1d0dcfcdd06" +checksum = "61c429a8649f110dddef65e2a5ad240f747e85f7758a6bccc7e5777bd33f756e" dependencies = [ "ring", "rustls-pki-types", @@ -7655,7 +7658,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -8079,7 +8082,7 @@ dependencies = [ "getrandom 0.4.2", "once_cell", "rustix 1.1.4", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -8328,9 +8331,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.52.0" +version = "1.52.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a91135f59b1cbf38c91e73cf3386fca9bb77915c45ce2771460c9d92f0f3d776" +checksum = "b67dee974fe86fd92cc45b7a95fdd2f99a36a6d7b0d431a231178d3d670bbcc6" dependencies = [ "bytes", "libc", @@ -9572,7 +9575,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.48.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index aa68485e467..8a36be1fd1e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -212,7 +212,7 @@ regex = "1.11.1" reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "stream", "json"] } rstest = "0.26" rustls = { version = "0.23", default-features = false, features = ["logging", "ring", "std", "tls12"] } -rustls-webpki = { version = "0.103.12", default-features = false, features = ["ring", "std"] } +rustls-webpki = { version = "0.103.13", default-features = false, features = ["ring", "std"] } secrecy = "0.8.0" serde = { version = "1.0", features = ["derive"] } # serde_json is set to 1.0.127 to prevent a conflict with core, if that gets updated upstream, this diff --git a/core/README.md b/core/README.md new file mode 100644 index 00000000000..f5297def4e8 --- /dev/null +++ b/core/README.md @@ -0,0 +1,21 @@ +# `oss/core` + +`oss/core` contains the lower-level shared crates shared across: + +- InfluxDB 3 Core +- InfluxDB 3 Enterprise +- InfluxDB 3 IOx + +These crates provide common building blocks such as query execution, schema and data +types, HTTP and gRPC utilities, observability helpers, and supporting Arrow and Parquet +infrastructure. + + +Examples of crates in this directory include: + +- query and planner crates such as `iox_query`, `iox_query_influxql`, and `query_functions` +- shared data model crates such as `data_types`, `schema`, and `mutable_batch` +- service and protocol crates such as `generated_types`, `service_grpc_flight`, and `iox_http` +- common runtime and observability crates such as `executor`, `metric`, `trace`, and `trogging` + +As a rule of thumb, code belongs in `core` when it is product-agnostic infrastructure. diff --git a/core/data_types/src/columns.rs b/core/data_types/src/columns.rs index be877b182bc..e689bba89fd 100644 --- a/core/data_types/src/columns.rs +++ b/core/data_types/src/columns.rs @@ -411,6 +411,25 @@ impl std::fmt::Display for ColumnType { } } +impl std::str::FromStr for ColumnType { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "i64" => Ok(Self::I64), + "u64" => Ok(Self::U64), + "f64" => Ok(Self::F64), + "bool" => Ok(Self::Bool), + "string" => Ok(Self::String), + "time" => Ok(Self::Time), + "tag" => Ok(Self::Tag), + _ => Err(format!( + "invalid column type '{s}': valid types are i64, u64, f64, bool, string, time, tag" + )), + } + } +} + impl TryFrom<&ArrowDataType> for ColumnType { type Error = &'static str; diff --git a/core/iox_v1_query_api/src/handler.rs b/core/iox_v1_query_api/src/handler.rs index bb4945d5efd..f1da1cce626 100644 --- a/core/iox_v1_query_api/src/handler.rs +++ b/core/iox_v1_query_api/src/handler.rs @@ -61,6 +61,48 @@ struct QueryPlan { context: IOxSessionContext, } +/// A V1 `/query` request parsed but not yet executed. +/// +/// Returned by [`V1HttpHandler::extract_query_request`] and consumed by +/// [`V1HttpHandler::execute_query`]. Callers can read +/// [`Self::requested_database`] between the two phases to wire up +/// observability that needs the database name from the request. +/// +/// The inner fields hold the auth token bytes and the raw query text; +/// the manual `Debug` impl redacts both so accidental `{:?}` formatting +/// can't leak them into logs. +pub struct ExtractedV1Request { + span_ctx: Option, + token: Option>, + params: QueryParams, + format: QueryFormat, +} + +impl std::fmt::Debug for ExtractedV1Request { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ExtractedV1Request") + .field("requested_database", &self.requested_database()) + .field("token", &self.token.as_ref().map(|_| "")) + .field("query", &self.params.query.as_ref().map(|_| "")) + .field("format", &self.format) + .finish() + } +} + +impl ExtractedV1Request { + /// The database name as supplied by the request — URL query string, + /// `application/x-www-form-urlencoded` body, or multipart `db` field. + /// Empty strings collapse to `None`, matching `execute_query`'s own + /// handling. + /// + /// This is the request-level value only. Per-statement DB/RP overrides + /// embedded in InfluxQL (e.g. `SELECT … FROM "otherdb"."rp"."m"`) are + /// resolved during execution and are not reflected here. + pub fn requested_database(&self) -> Option<&str> { + self.params.database.as_deref().filter(|s| !s.is_empty()) + } +} + #[derive(Debug, Clone)] pub struct V1HttpHandler { database: Arc, @@ -132,7 +174,19 @@ impl V1HttpHandler { .map_err(|e| HttpError::InternalError(e.to_string())) } - async fn handle_parameterized_query(&self, mut req: Request) -> Result { + async fn handle_parameterized_query(&self, req: Request) -> Result { + let extracted = self.extract_query_request(req).await?; + self.execute_query(extracted).await + } + + /// Parse the request: trace context, auth token, and body params/format. + /// + /// Splitting this out from execution lets callers inspect the resolved + /// `database` (e.g. for service-level logging) before the request runs. + pub async fn extract_query_request( + &self, + mut req: Request, + ) -> Result { let span_ctx = Some(SpanContext::new_with_optional_collector( self.trace_collector.as_ref().map(Arc::clone), )); @@ -143,6 +197,26 @@ impl V1HttpHandler { let (params, format) = extract_request(req).await?; + Ok(ExtractedV1Request { + span_ctx, + token, + params, + format, + }) + } + + /// Execute a previously parsed V1 request. + pub async fn execute_query( + &self, + extracted: ExtractedV1Request, + ) -> Result { + let ExtractedV1Request { + span_ctx, + token, + params, + format, + } = extracted; + let QueryParams { chunk_size, chunked, @@ -975,4 +1049,114 @@ mod tests { insta::assert_snapshot!(res); }); } + + /// Helper: build a fresh handler for `extract_query_request` tests. + /// These tests do not exercise execution, so the database / authz + /// implementations are inert. + fn handler_for_extract_tests() -> V1HttpHandler { + let db: Arc = Arc::new(TestDatabaseStore::default()); + V1HttpHandler::new(db, None, None, "test".to_string()) + } + + #[tokio::test] + async fn extract_query_request_db_in_url() { + let req = RequestBuilder::new() + .method("GET") + .uri("http://h/query?db=mydb&q=SELECT%201") + .body(empty_request_body()) + .unwrap(); + let extracted = handler_for_extract_tests() + .extract_query_request(req) + .await + .expect("extract should succeed"); + assert_eq!(extracted.requested_database(), Some("mydb")); + } + + #[tokio::test] + async fn extract_query_request_db_in_form_body() { + let req = RequestBuilder::new() + .method("POST") + .uri("http://h/query") + .header("Content-Type", "application/x-www-form-urlencoded") + .body(iox_http_util::bytes_to_request_body("db=mydb&q=SELECT+1")) + .unwrap(); + let extracted = handler_for_extract_tests() + .extract_query_request(req) + .await + .expect("extract should succeed"); + assert_eq!(extracted.requested_database(), Some("mydb")); + } + + #[tokio::test] + async fn extract_query_request_db_in_multipart_body() { + let boundary = "----v1tests-boundary-9f8d"; + let body = format!( + "--{boundary}\r\n\ + Content-Disposition: form-data; name=\"db\"\r\n\r\n\ + mydb\r\n\ + --{boundary}\r\n\ + Content-Disposition: form-data; name=\"q\"\r\n\r\n\ + SELECT 1\r\n\ + --{boundary}--\r\n" + ); + let req = RequestBuilder::new() + .method("POST") + .uri("http://h/query") + .header( + "Content-Type", + format!("multipart/form-data; boundary={boundary}"), + ) + .body(iox_http_util::bytes_to_request_body(body)) + .unwrap(); + let extracted = handler_for_extract_tests() + .extract_query_request(req) + .await + .expect("extract should succeed"); + assert_eq!(extracted.requested_database(), Some("mydb")); + } + + #[tokio::test] + async fn extract_query_request_debug_redacts_token_and_query() { + // Token comes from the `p=` URL parameter on the V1 path — that's + // one of the two real ingress points for V1 auth (the other is + // `AuthorizationHeaderExtension` in `req.extensions()`, which is + // populated by upstream middleware not present in this unit test). + let req = RequestBuilder::new() + .method("POST") + .uri("http://h/query?p=supersecret-token-bytes") + .header("Content-Type", "application/x-www-form-urlencoded") + .body(iox_http_util::bytes_to_request_body( + "db=mydb&q=SELECT+secret_field+FROM+t", + )) + .unwrap(); + let extracted = handler_for_extract_tests() + .extract_query_request(req) + .await + .expect("extract should succeed"); + // Sanity: the token actually got captured, otherwise we'd be + // proving redaction of a None field. + assert!( + extracted.token.is_some(), + "test fixture should populate a token via ?p=" + ); + let dbg = format!("{extracted:?}"); + assert!(!dbg.contains("supersecret-token-bytes"), "{dbg}"); + assert!(!dbg.contains("secret_field"), "{dbg}"); + // Database name is fine to display. + assert!(dbg.contains("mydb"), "{dbg}"); + } + + #[tokio::test] + async fn extract_query_request_empty_db_collapses_to_none() { + let req = RequestBuilder::new() + .method("GET") + .uri("http://h/query?db=&q=SELECT%201") + .body(empty_request_body()) + .unwrap(); + let extracted = handler_for_extract_tests() + .extract_query_request(req) + .await + .expect("extract should succeed"); + assert_eq!(extracted.requested_database(), None); + } } diff --git a/core/iox_v1_query_api/src/lib.rs b/core/iox_v1_query_api/src/lib.rs index e35a40838e7..402a5214dfc 100644 --- a/core/iox_v1_query_api/src/lib.rs +++ b/core/iox_v1_query_api/src/lib.rs @@ -12,7 +12,7 @@ use types::Statement; mod error; pub use error::HttpError; mod handler; -pub use handler::V1HttpHandler; +pub use handler::{ExtractedV1Request, V1HttpHandler}; mod response; mod types; mod value; diff --git a/core/service_grpc_flight/src/lib.rs b/core/service_grpc_flight/src/lib.rs index 36f728bc497..7fa62c02b84 100644 --- a/core/service_grpc_flight/src/lib.rs +++ b/core/service_grpc_flight/src/lib.rs @@ -116,7 +116,7 @@ const IOX_FLIGHT_PARTITIONS_RESPONSE_TRAILER: &str = "x-influxdata-partitions"; const IOX_FLIGHT_PARQUET_FILES_RESPONSE_TRAILER: &str = "x-influxdata-parquet-files"; /// Trailer that describes the peak memory usage of a query. -const IOX_FLIGHT_MAX_MEMORY_RESPONSE_TRAILER: &str = "x-influxdata-max-memory-bytes"; +pub const IOX_FLIGHT_MAX_MEMORY_RESPONSE_TRAILER: &str = "x-influxdata-max-memory-bytes"; /// Trailer that describes the latency to planning from ingester response. const IOX_FLIGHT_INGESTER_LATENCY_PLAN_RESPONSE_TRAILER: &str = diff --git a/deny.toml b/deny.toml index b9c79555427..e0643f09e57 100644 --- a/deny.toml +++ b/deny.toml @@ -29,6 +29,13 @@ ignore = [ # via datafusion-udf-wasm/wasmtime/rustls 0.22.x, with the same current # incompatibility on the available upstream fix path. "RUSTSEC-2026-0098", + # rustls-webpki CRL parsing reachable panic; only 0.103.13+ and 0.104.0-alpha.7+ + # ship the fix. The 0.103.x transitive is patched via the Cargo.toml pin bump, + # but the 0.102.x transitive is still stuck via datafusion-udf-wasm/wasmtime/ + # rustls 0.22.x with no patched 0.102.x release available. We don't configure + # CRLs anywhere in this repo, so the panic is unreachable in practice on that + # remaining 0.102.x path. + "RUSTSEC-2026-0104", # wasmtime 41.0.4 advisories (transitive dep via datafusion-udf-wasm) # @@ -55,6 +62,9 @@ ignore = [ # Guest realloc validation (single-component host-guest interaction; low risk given UDFs are disabled): "RUSTSEC-2026-0091", # GHSA-394w-hwhg-8vgm: OOB write from unvalidated guest realloc # + # Panic when allocating a table that is larger than host address space. + # The UDF/WASM parts are currently unused in production. + "RUSTSEC-2026-0114", # TODO: update wasmtime via datafusion-udf-wasm to a patched version (>=42.0.2) ] git-fetch-with-cli = true diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index 9cdbb31ac23..6bd4f113dd5 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -10,8 +10,8 @@ use influxdb3_cache::{ last_cache::{self, LastCacheProvider}, parquet_cache::create_cached_obj_store_and_oracle, }; -use influxdb3_catalog::{CatalogError, catalog::Catalog}; -use influxdb3_clap_blocks::plugins::{PackageManager, ProcessingEngineConfig}; +use influxdb3_catalog::{CatalogError, catalog::Catalog, log::PluginType}; +use influxdb3_clap_blocks::plugins::{PackageManager, PluginTriggerType, ProcessingEngineConfig}; use influxdb3_clap_blocks::{ datafusion::IoxQueryDatafusionConfig, memory_size::MemorySizeMb, object_store::ObjectStoreConfig, socket_addr::SocketAddr, tokio::TokioDatafusionConfig, @@ -1197,6 +1197,7 @@ pub async fn command(config: Config, user_params: HashMap) -> Re Arc::clone(&write_buffer), Arc::clone(&query_executor) as _, Arc::clone(&processing_engine), + restricted_plugin_trigger_types(&config.processing_engine_config), config.max_http_request_size, Arc::clone(&authorizer), )); @@ -1398,6 +1399,18 @@ pub(crate) fn setup_processing_engine_env_manager( } } +fn restricted_plugin_trigger_types(config: &ProcessingEngineConfig) -> Vec { + config + .restrict_plugin_triggers_to + .iter() + .map(|trigger_type| match trigger_type { + PluginTriggerType::Wal => PluginType::WalRows, + PluginTriggerType::Schedule => PluginType::Schedule, + PluginTriggerType::Request => PluginType::Request, + }) + .collect() +} + fn determine_package_manager() -> Arc { // Check for pip (highest preference) let python_exe = find_python(); diff --git a/influxdb3/src/commands/serve/cli_params.rs b/influxdb3/src/commands/serve/cli_params.rs index 95edd1bf60e..ea916f74a15 100644 --- a/influxdb3/src/commands/serve/cli_params.rs +++ b/influxdb3/src/commands/serve/cli_params.rs @@ -119,6 +119,7 @@ const NON_SENSITIVE_PARAMS: &[&str] = &[ "virtual-env-location", "package-manager", "plugin-repo", + "restrict-plugin-triggers-to", // Other parameters "tcp-listener-file-path", // Tokio console parameters diff --git a/influxdb3/src/help/serve.txt b/influxdb3/src/help/serve.txt index 5965ee77fc0..5917359115c 100644 --- a/influxdb3/src/help/serve.txt +++ b/influxdb3/src/help/serve.txt @@ -73,6 +73,10 @@ Examples --package-manager Python package manager [possible values: discover, pip, uv, disabled] [default: discover] [env: INFLUXDB3_PACKAGE_MANAGER=] + --restrict-plugin-triggers-to ... + Restrict plugin triggers to the provided trigger type(s) + [possible values: wal, schedule, request] + [env: INFLUXDB3_RESTRICT_PLUGIN_TRIGGERS_TO=] {} -h, --help Print help information diff --git a/influxdb3/src/help/serve_all.txt b/influxdb3/src/help/serve_all.txt index 42e46f0e056..2be7dd67c7f 100644 --- a/influxdb3/src/help/serve_all.txt +++ b/influxdb3/src/help/serve_all.txt @@ -153,6 +153,9 @@ Examples: [env: INFLUXDB3_PACKAGE_MANAGER=] --plugin-repo URL of plugin repository [env: INFLUXDB3_PLUGIN_REPO=] + --restrict-plugin-triggers-to ... Restrict plugin triggers to the provided trigger type(s) + [possible values: wal, schedule, request] + [env: INFLUXDB3_RESTRICT_PLUGIN_TRIGGERS_TO=] {} --gen1-duration Duration for Parquet file arrangement [default: 10m] diff --git a/influxdb3/tests/server/mod.rs b/influxdb3/tests/server/mod.rs index bab0b728624..40e13fccacc 100644 --- a/influxdb3/tests/server/mod.rs +++ b/influxdb3/tests/server/mod.rs @@ -39,6 +39,7 @@ mod limits; mod logs; mod packages; mod ping; +mod plugin_restriction; mod query; mod system_tables; mod write; @@ -95,6 +96,7 @@ pub struct TestConfig { plugin_dir: Option, virtual_env_dir: Option, package_manager: Option, + restrict_plugin_triggers_to: Vec, // If None, use memory object store. object_store_dir: Option, disable_authz: Vec, @@ -164,6 +166,15 @@ impl TestConfig { self } + pub fn with_restrict_plugin_triggers_to(mut self, types: I) -> Self + where + I: IntoIterator, + S: Into, + { + self.restrict_plugin_triggers_to = types.into_iter().map(Into::into).collect(); + self + } + // Set the object store dir for this [`TestServer`] pub fn with_object_store_dir>(mut self, object_store_dir: S) -> Self { self.object_store_dir = Some(object_store_dir.into()); @@ -273,6 +284,12 @@ impl ConfigProvider for TestConfig { package_manager.to_owned(), ]); } + if !self.restrict_plugin_triggers_to.is_empty() { + args.push("--restrict-plugin-triggers-to".to_string()); + for t in &self.restrict_plugin_triggers_to { + args.push(t.to_owned()); + } + } args.push("--node-id".to_string()); if let Some(host) = &self.node_id { args.push(host.to_owned()); diff --git a/influxdb3/tests/server/plugin_restriction.rs b/influxdb3/tests/server/plugin_restriction.rs new file mode 100644 index 00000000000..f5c56f025c4 --- /dev/null +++ b/influxdb3/tests/server/plugin_restriction.rs @@ -0,0 +1,206 @@ +use crate::server::{ConfigProvider, TestServer}; +use anyhow::Result; +use tempfile::TempDir; + +struct PluginTest { + plugin_dir: TempDir, + plugin_filename: &'static str, +} + +impl PluginTest { + fn new() -> Result { + let plugin_dir = TempDir::new()?; + let plugin_path = plugin_dir.path().join("test_plugin.py"); + std::fs::write( + &plugin_path, + b"def process_scheduled_call(influxdb3_local, schedule_time, args=None): pass\n", + )?; + Ok(Self { + plugin_dir, + plugin_filename: "test_plugin.py", + }) + } + + fn plugin_dir_str(&self) -> String { + self.plugin_dir.path().to_string_lossy().into_owned() + } +} + +#[test_log::test(tokio::test)] +async fn test_restrict_plugin_triggers_to_schedule_rejects_other_types() -> Result<()> { + let setup = PluginTest::new()?; + + let server = TestServer::configure() + .with_plugin_dir(setup.plugin_dir_str()) + .with_restrict_plugin_triggers_to(["schedule"]) + .spawn() + .await; + + server.create_database("testdb").run()?; + + // WAL trigger (all_tables) should be rejected + let err = server + .create_trigger("testdb", "wal_trigger", setup.plugin_filename, "all_tables") + .run() + .expect_err("WAL trigger should be rejected when only schedule is allowed"); + assert!( + err.to_string() + .contains("server is configured to reject WalRows"), + "Expected restriction error, got: {err}" + ); + + // WAL trigger (table:X) should also be rejected + let err = server + .create_trigger( + "testdb", + "wal_table_trigger", + setup.plugin_filename, + "table:cpu", + ) + .run() + .expect_err("Table WAL trigger should be rejected when only schedule is allowed"); + assert!( + err.to_string() + .contains("server is configured to reject WalRows"), + "Expected restriction error, got: {err}" + ); + + // Request trigger should be rejected + let err = server + .create_trigger( + "testdb", + "request_trigger", + setup.plugin_filename, + "request:mypath", + ) + .run() + .expect_err("Request trigger should be rejected when only schedule is allowed"); + assert!( + err.to_string() + .contains("server is configured to reject Request"), + "Expected restriction error, got: {err}" + ); + + // Schedule trigger (every:) should be allowed + server + .create_trigger( + "testdb", + "every_trigger", + setup.plugin_filename, + "every:10s", + ) + .disabled(true) + .run() + .expect("every:10s schedule trigger should be allowed"); + + // Schedule trigger (cron:) should also be allowed + server + .create_trigger( + "testdb", + "cron_trigger", + setup.plugin_filename, + "cron:* * * * * *", + ) + .disabled(true) + .run() + .expect("cron schedule trigger should be allowed"); + + Ok(()) +} + +#[test_log::test(tokio::test)] +async fn test_restrict_plugin_triggers_to_wal_rejects_other_types() -> Result<()> { + let setup = PluginTest::new()?; + + let server = TestServer::configure() + .with_plugin_dir(setup.plugin_dir_str()) + .with_restrict_plugin_triggers_to(["wal"]) + .spawn() + .await; + + server.create_database("testdb").run()?; + + // Schedule trigger should be rejected + let err = server + .create_trigger( + "testdb", + "schedule_trigger", + setup.plugin_filename, + "every:10s", + ) + .run() + .expect_err("Schedule trigger should be rejected when only WAL is allowed"); + assert!( + err.to_string() + .contains("server is configured to reject Schedule"), + "Expected restriction error, got: {err}" + ); + + // Request trigger should be rejected + let err = server + .create_trigger( + "testdb", + "request_trigger", + setup.plugin_filename, + "request:mypath", + ) + .run() + .expect_err("Request trigger should be rejected when only WAL is allowed"); + assert!( + err.to_string() + .contains("server is configured to reject Request"), + "Expected restriction error, got: {err}" + ); + + // WAL trigger should be allowed + server + .create_trigger("testdb", "wal_trigger", setup.plugin_filename, "all_tables") + .disabled(true) + .run() + .expect("WAL trigger should be allowed"); + + Ok(()) +} + +#[test_log::test(tokio::test)] +async fn test_no_plugin_trigger_restriction_allows_all_types() -> Result<()> { + let setup = PluginTest::new()?; + + let server = TestServer::configure() + .with_plugin_dir(setup.plugin_dir_str()) + .spawn() + .await; + + server.create_database("testdb").run()?; + + // All trigger types should be allowed with no restriction + server + .create_trigger("testdb", "wal_trigger", setup.plugin_filename, "all_tables") + .disabled(true) + .run() + .expect("WAL trigger should be allowed with no restriction"); + + server + .create_trigger( + "testdb", + "schedule_trigger", + setup.plugin_filename, + "every:10s", + ) + .disabled(true) + .run() + .expect("Schedule trigger should be allowed with no restriction"); + + server + .create_trigger( + "testdb", + "request_trigger", + setup.plugin_filename, + "request:mypath", + ) + .disabled(true) + .run() + .expect("Request trigger should be allowed with no restriction"); + + Ok(()) +} diff --git a/influxdb3_clap_blocks/src/plugins.rs b/influxdb3_clap_blocks/src/plugins.rs index 2af07d17e6a..08b51db8093 100644 --- a/influxdb3_clap_blocks/src/plugins.rs +++ b/influxdb3_clap_blocks/src/plugins.rs @@ -18,6 +18,23 @@ pub struct ProcessingEngineConfig { pub package_manager: PackageManager, #[clap(long = "plugin-repo", env = "INFLUXDB3_PLUGIN_REPO")] pub plugin_repo: Option, + /// Restrict plugin triggers to the provided trigger type(s). + #[clap( + long = "restrict-plugin-triggers-to", + env = "INFLUXDB3_RESTRICT_PLUGIN_TRIGGERS_TO", + value_enum, + value_delimiter = ',', + num_args = 1.. + )] + pub restrict_plugin_triggers_to: Vec, +} + +#[derive(Debug, Clone, Copy, Eq, PartialEq, clap::ValueEnum)] +pub enum PluginTriggerType { + #[value(alias = "wal_flush")] + Wal, + Schedule, + Request, } #[derive(Debug, Clone, Copy, Default, clap::ValueEnum)] diff --git a/influxdb3_commands/src/helpers.rs b/influxdb3_commands/src/helpers.rs index bbf8bcaaded..60379096173 100644 --- a/influxdb3_commands/src/helpers.rs +++ b/influxdb3_commands/src/helpers.rs @@ -5,9 +5,8 @@ use std::{str::FromStr, sync::OnceLock}; use influxdb3_server::all_paths; use observability_deps::tracing::trace; -const DISABLED_AUTHZ_TOO_MANY_VALUES_ERR: &str = "--disable-authz cannot take more than 3 items"; -const DISABLED_AUTHZ_INVALID_VALUE_ERR: &str = - "invalid value passed in for --disable-authz, allowed values are health, ping, and metrics"; +const DISABLED_AUTHZ_TOO_MANY_VALUES_ERR: &str = "--disable-authz cannot take more than 4 items"; +const DISABLED_AUTHZ_INVALID_VALUE_ERR: &str = "invalid value passed in for --disable-authz, allowed values are health, ping, metrics, and ready"; static AUTHZ_DISABLED_RESOURCES: OnceLock> = OnceLock::new(); @@ -41,12 +40,12 @@ impl FromStr for DisableAuthzList { fn from_str(s: &str) -> std::result::Result { let resources: Vec = s.split(',').map(|s| s.trim().to_string()).collect(); - if resources.len() > 3 { + if resources.len() > 4 { return Err(DISABLED_AUTHZ_TOO_MANY_VALUES_ERR); } for r in &resources { - if !["health", "ping", "metrics"] + if !["health", "ping", "metrics", "ready"] .iter() .any(|resource| resource == r) { @@ -68,6 +67,10 @@ impl FromStr for DisableAuthzList { if path == "metrics" { return vec![all_paths::API_METRICS]; } + + if path == "ready" { + return vec![all_paths::API_V3_READY]; + } vec![] }) .collect(); diff --git a/influxdb3_commands/src/helpers/tests.rs b/influxdb3_commands/src/helpers/tests.rs index 039cc1db3b0..52ccdf8d8a8 100644 --- a/influxdb3_commands/src/helpers/tests.rs +++ b/influxdb3_commands/src/helpers/tests.rs @@ -4,15 +4,19 @@ use crate::helpers::{DISABLED_AUTHZ_INVALID_VALUE_ERR, DISABLED_AUTHZ_TOO_MANY_V use super::DisableAuthzList; +// AUTHZ_DISABLED_RESOURCES is a OnceLock whose state persists for the lifetime of +// the test process. Successful-parse tests must run in a deterministic order -- +// we combine the coverage into a single test to avoid racing on the global state. #[test] fn test_parses_disabled_authz() { - let list: DisableAuthzList = "health,ping,metrics".parse().expect("parseable"); + let list: DisableAuthzList = "health,ping,metrics,ready".parse().expect("parseable"); let all_mapped = list.get_mapped_endpoints(); - assert_eq!(4, all_mapped.len()); + assert_eq!(5, all_mapped.len()); assert_eq!(*all_mapped.first().unwrap(), all_paths::API_V3_HEALTH); assert_eq!(*all_mapped.get(1).unwrap(), all_paths::API_V1_HEALTH); assert_eq!(*all_mapped.get(2).unwrap(), all_paths::API_PING); assert_eq!(*all_mapped.get(3).unwrap(), all_paths::API_METRICS); + assert_eq!(*all_mapped.get(4).unwrap(), all_paths::API_V3_READY); } #[test] @@ -33,7 +37,7 @@ fn test_fails_to_parse_disabled_authz_list_too_many_values_err() { #[test] fn test_fails_to_parse_disabled_authz_list_too_many_allowed_values_err() { - let list = "health,metrics,ping,health" + let list = "health,metrics,ping,health,ready" .parse::() .unwrap_err(); assert_eq!(list, DISABLED_AUTHZ_TOO_MANY_VALUES_ERR); diff --git a/influxdb3_server/Cargo.toml b/influxdb3_server/Cargo.toml index 2f51aa96e3e..378a8b5e0a2 100644 --- a/influxdb3_server/Cargo.toml +++ b/influxdb3_server/Cargo.toml @@ -26,6 +26,7 @@ iox_time = { path = "../core/iox_time" } iox_v1_query_api = { path = "../core/iox_v1_query_api" } metric = { path = "../core/metric" } metric_exporters = { path = "../core/metric_exporters" } +object_store_utils = { path = "../object_store_utils" } observability_deps = { path = "../core/observability_deps" } schema = { path = "../core/schema" } service_grpc_flight = { path = "../core/service_grpc_flight" } @@ -50,6 +51,7 @@ influxdb3_write = { path = "../influxdb3_write" } # crates.io Dependencies anyhow.workspace = true arrow.workspace = true +async-trait.workspace = true arrow-csv.workspace = true arrow-flight.workspace = true arrow-json.workspace = true diff --git a/influxdb3_server/src/all_paths.rs b/influxdb3_server/src/all_paths.rs index a0e0ff3fbc3..b526f901508 100644 --- a/influxdb3_server/src/all_paths.rs +++ b/influxdb3_server/src/all_paths.rs @@ -6,6 +6,7 @@ pub(crate) const API_V3_QUERY_INFLUXQL: &str = "/api/v3/query_influxql"; pub(crate) const API_V1_QUERY: &str = "/query"; pub const API_V3_HEALTH: &str = "/health"; pub const API_V1_HEALTH: &str = "/api/v1/health"; +pub const API_V3_READY: &str = "/ready"; pub(crate) const API_V3_ENGINE: &str = "/api/v3/engine/"; pub(crate) const API_V3_CONFIGURE_DISTINCT_CACHE: &str = "/api/v3/configure/distinct_cache"; pub(crate) const API_V3_CONFIGURE_LAST_CACHE: &str = "/api/v3/configure/last_cache"; diff --git a/influxdb3_server/src/http.rs b/influxdb3_server/src/http.rs index 0fd3b568751..0a8ae7a9168 100644 --- a/influxdb3_server/src/http.rs +++ b/influxdb3_server/src/http.rs @@ -33,7 +33,7 @@ use influxdb3_cache::distinct_cache; use influxdb3_cache::last_cache; use influxdb3_catalog::CatalogError; use influxdb3_catalog::catalog::HardDeletionTime; -use influxdb3_catalog::log::FieldDataType; +use influxdb3_catalog::log::{FieldDataType, PluginType, TriggerSpecificationDefinition}; use influxdb3_id::TokenId; use influxdb3_internal_api::query_executor::{ QueryExecutor, QueryExecutorError, ShowDatabases, ShowRetentionPolicies, @@ -382,6 +382,9 @@ pub enum Error { #[error("Current node mode does not use the processing engine")] NoProcessingEngine, + #[error("invalid request: {0}")] + InvalidRequest(String), + #[error(transparent)] LegacyWriteParse(#[from] WriteParseError), } @@ -926,6 +929,10 @@ impl IntoResponse for Error { .status(StatusCode::METHOD_NOT_ALLOWED) .body(bytes_to_response_body(self.to_string())) .unwrap(), + Self::InvalidRequest(_) => ResponseBuilder::new() + .status(StatusCode::BAD_REQUEST) + .body(bytes_to_response_body(self.to_string())) + .unwrap(), Self::MissingDb(_) | Self::MissingTable(_) | Self::NoHandler => ResponseBuilder::new() .status(StatusCode::NOT_FOUND) .body(bytes_to_response_body(self.to_string())) @@ -985,6 +992,7 @@ pub struct HttpApi { common_state: CommonServerState, write_buffer: Arc, processing_engine: Arc, + allowed_plugin_trigger_types: Vec, time_provider: Arc, pub(crate) query_executor: Arc, max_request_bytes: usize, @@ -993,12 +1001,14 @@ pub struct HttpApi { } impl HttpApi { + #[allow(clippy::too_many_arguments)] pub fn new( common_state: CommonServerState, time_provider: Arc, write_buffer: Arc, query_executor: Arc, processing_engine: Arc, + allowed_plugin_trigger_types: Vec, max_request_bytes: usize, authorizer: Arc, ) -> Self { @@ -1015,6 +1025,7 @@ impl HttpApi { authorizer, legacy_write_param_unifier, processing_engine, + allowed_plugin_trigger_types, } } } @@ -1545,6 +1556,10 @@ impl HttpApi { self.read_body_json(req).await? }; debug!(%db, %plugin_filename, %trigger_name, %trigger_specification, %disabled, "configure_processing_engine_trigger"); + validate_restricted_plugin_trigger_specification( + &trigger_specification, + &self.allowed_plugin_trigger_types, + )?; let plugin_filename = self .processing_engine .validate_plugin_filename(&plugin_filename) @@ -2867,5 +2882,24 @@ pub(crate) async fn route_admin_token_recovery_request( Ok(response) } +fn validate_restricted_plugin_trigger_specification( + trigger_specification: &str, + allowed_plugin_trigger_types: &[PluginType], +) -> Result<()> { + if allowed_plugin_trigger_types.is_empty() { + return Ok(()); + } + + let trigger = TriggerSpecificationDefinition::from_string_rep(trigger_specification)?; + let plugin_type = trigger.plugin_type(); + if allowed_plugin_trigger_types.contains(&plugin_type) { + Ok(()) + } else { + Err(Error::InvalidRequest(format!( + "server is configured to reject {plugin_type} plugin triggers" + ))) + } +} + #[cfg(test)] mod tests; diff --git a/influxdb3_server/src/http/tests.rs b/influxdb3_server/src/http/tests.rs index 6bbd42e9b29..00e04c75659 100644 --- a/influxdb3_server/src/http/tests.rs +++ b/influxdb3_server/src/http/tests.rs @@ -2,8 +2,8 @@ use http::{HeaderMap, HeaderValue, StatusCode, header::ACCEPT}; use http::{Request, Uri}; use super::{ - MAXIMUM_DATABASE_NAME_LENGTH, extract_client_ip, extract_db_from_query_param, - truncate_for_logging, + MAXIMUM_DATABASE_NAME_LENGTH, PluginType, extract_client_ip, extract_db_from_query_param, + truncate_for_logging, validate_restricted_plugin_trigger_specification, }; use crate::http::{AuthenticationError, Error}; @@ -71,6 +71,43 @@ fn test_validate_db_name() { assert_validate_db_name!("", Err(ValidateDbNameError::Empty)); } +#[test] +fn test_validate_restricted_plugin_trigger_specification() { + assert!(validate_restricted_plugin_trigger_specification("all_tables", &[]).is_ok()); + + let schedule_only = [PluginType::Schedule]; + assert!( + validate_restricted_plugin_trigger_specification("cron:* * * * * *", &schedule_only) + .is_ok() + ); + assert!(validate_restricted_plugin_trigger_specification("every:10s", &schedule_only).is_ok()); + + assert!(matches!( + validate_restricted_plugin_trigger_specification("all_tables", &schedule_only), + Err(Error::InvalidRequest(_)) + )); + assert!(matches!( + validate_restricted_plugin_trigger_specification("table:cpu", &schedule_only), + Err(Error::InvalidRequest(_)) + )); + assert!(matches!( + validate_restricted_plugin_trigger_specification("request:foo", &schedule_only), + Err(Error::InvalidRequest(_)) + )); + + let wal_and_request = [PluginType::WalRows, PluginType::Request]; + assert!( + validate_restricted_plugin_trigger_specification("table:cpu", &wal_and_request).is_ok() + ); + assert!( + validate_restricted_plugin_trigger_specification("request:foo", &wal_and_request).is_ok() + ); + assert!(matches!( + validate_restricted_plugin_trigger_specification("every:10s", &wal_and_request), + Err(Error::InvalidRequest(_)) + )); +} + #[tokio::test] async fn test_json_output_empty() { // Turn RecordBatches into a Body and then collect into Bytes to assert diff --git a/influxdb3_server/src/tests.rs b/influxdb3_server/src/tests.rs index b89d91b81be..7911f3f3b43 100644 --- a/influxdb3_server/src/tests.rs +++ b/influxdb3_server/src/tests.rs @@ -1445,6 +1445,7 @@ async fn setup_server(start_time: i64) -> (String, CancellationToken, Arc(&self, serializer: S) -> std::result::Result + where + S: Serializer, + { + use serde::ser::SerializeStruct; + let mut state = serializer.serialize_struct("WriteLineError", 3)?; + const TRUNCATE_LIMIT: usize = 20; + let truncated_line = + &self.original_line[..self.original_line.floor_char_boundary(TRUNCATE_LIMIT)]; + state.serialize_field("error_message", &self.error_message)?; + state.serialize_field("line_number", &self.line_number)?; + state.serialize_field("original_line", truncated_line)?; + state.end() + } +} + #[cfg(test)] mod tests; diff --git a/influxdb3_types/src/write/tests.rs b/influxdb3_types/src/write/tests.rs index 37bf93f6b9e..d7145211fc3 100644 --- a/influxdb3_types/src/write/tests.rs +++ b/influxdb3_types/src/write/tests.rs @@ -261,6 +261,45 @@ fn test_to_nanos_explicit_all_precisions() { ); } +#[test] +fn test_write_line_error_serialize_truncates_at_utf8_boundary() { + // Serialize caps `original_line` at this many bytes (mirrors the constant in the impl). + const TRUNCATE_LIMIT: usize = 20; + + // 19 ASCII bytes followed by a 2-byte 'é' starting at byte 19. A naive `[..TRUNCATE_LIMIT]` + // slice lands in the middle of 'é' and would panic; floor_char_boundary backs up to byte 19. + let long_line = format!("{}érest of the line", "a".repeat(19)); + assert!( + !long_line.is_char_boundary(TRUNCATE_LIMIT), + "test premise: byte {TRUNCATE_LIMIT} must fall inside a multi-byte char", + ); + + let err = WriteLineError { + original_line: long_line, + line_number: 1, + error_message: "bad line".to_string(), + }; + let json = serde_json::to_string(&err).expect("serialize must not panic"); + let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); + let emitted = parsed["original_line"] + .as_str() + .expect("original_line must serialize as a JSON string"); + + assert_eq!(emitted, "a".repeat(19)); + + // Short input passes through unchanged. + let short = "short line"; + assert!(short.len() < TRUNCATE_LIMIT); + let err = WriteLineError { + original_line: short.to_string(), + line_number: 2, + error_message: "bad".to_string(), + }; + let json = serde_json::to_string(&err).unwrap(); + let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed["original_line"].as_str().unwrap(), short); +} + #[test] fn test_to_nanos_default_truncation() { assert_eq!( diff --git a/influxdb3_write/src/lib.rs b/influxdb3_write/src/lib.rs index 0c08f47e2cd..ceb4afd2781 100644 --- a/influxdb3_write/src/lib.rs +++ b/influxdb3_write/src/lib.rs @@ -37,7 +37,7 @@ use iox_query::QueryChunk; use iox_time::Time; use observability_deps::tracing::debug; use schema::TIME_COLUMN_NAME; -use serde::{Deserialize, Serialize, Serializer}; +use serde::{Deserialize, Serialize}; use std::{fmt::Debug, sync::Arc}; use thiserror::Error; @@ -81,7 +81,7 @@ pub trait Bufferer: Debug + Send + Sync + 'static { /// Returns the database schema provider fn catalog(&self) -> Arc; - /// Reutrns the WAL this bufferer is using + /// Returns the WAL this bufferer is using fn wal(&self) -> Arc; /// Returns the parquet files for a given database and table @@ -130,34 +130,7 @@ pub trait LastCacheManager: Debug + Send + Sync + 'static { fn last_cache_provider(&self) -> Arc; } -/// A single write request can have many lines in it. A writer can request to accept all lines that are valid, while -/// returning an error for any invalid lines. This is the error information for a single invalid line. -#[derive(Debug)] -pub struct WriteLineError { - pub original_line: String, - pub line_number: usize, - pub error_message: String, -} - -impl Serialize for WriteLineError { - fn serialize(&self, serializer: S) -> std::result::Result - where - S: Serializer, - { - use serde::ser::SerializeStruct; - let mut state = serializer.serialize_struct("WriteLineError", 3)?; - const TRUNCATE_LIMIT: usize = 20; - let truncated_line = if self.original_line.len() > TRUNCATE_LIMIT { - &self.original_line[..TRUNCATE_LIMIT] - } else { - &self.original_line - }; - state.serialize_field("error_message", &self.error_message)?; - state.serialize_field("line_number", &self.line_number)?; - state.serialize_field("original_line", truncated_line)?; - state.end() - } -} +pub use influxdb3_types::write::WriteLineError; /// A write that has been validated against the catalog schema, written to the WAL (if configured), and buffered in /// memory. This is the summary information for the write along with any errors that were encountered. diff --git a/install_influxdb.sh b/install_influxdb.sh index 3739ed9f85d..ef12c2ea8cd 100755 --- a/install_influxdb.sh +++ b/install_influxdb.sh @@ -1437,8 +1437,11 @@ else STARTUP_CHOICE=${STARTUP_CHOICE:-1} case "$STARTUP_CHOICE" in - 1) + 1|*) # Quick Start - use defaults and check for existing license + if [ "$STARTUP_CHOICE" != "1" ]; then + printf "Invalid choice. Using Quick Start (option 1).\n" + fi setup_quick_start_defaults enterprise setup_license_for_quick_start START_SERVICE="y" @@ -1451,12 +1454,6 @@ else # Skip startup START_SERVICE="n" ;; - *) - printf "Invalid choice. Using Quick Start (option 1).\n" - setup_quick_start_defaults enterprise - setup_license_for_quick_start - START_SERVICE="y" - ;; esac if [ "$START_SERVICE" = "y" ] && [ "$STARTUP_CHOICE" = "1" ]; then diff --git a/object_store_utils/Cargo.toml b/object_store_utils/Cargo.toml index 519411b97f0..9079a62cfa0 100644 --- a/object_store_utils/Cargo.toml +++ b/object_store_utils/Cargo.toml @@ -10,6 +10,7 @@ test-helpers = ["tokio"] [dependencies] # core deps +iox_time = { path = "../core/iox_time" } observability_deps = { path = "../core/observability_deps" } # crates.io deps diff --git a/object_store_utils/src/lib.rs b/object_store_utils/src/lib.rs index b1e05956af5..54ee3798719 100644 --- a/object_store_utils/src/lib.rs +++ b/object_store_utils/src/lib.rs @@ -7,6 +7,12 @@ pub use retryable_object_store::set_default_retry_params; mod adaptive_put; pub use adaptive_put::{AdaptivePutExt, DEFAULT_MULTIPART_CHUNK_SIZE}; +mod object_store_health; +pub use object_store_health::{ErrorCategory, ObjectStoreHealth}; + +mod observed_object_store; +pub use observed_object_store::ObservedObjectStore; + #[cfg(any(feature = "test-helpers", test))] mod test_object_store; #[cfg(any(feature = "test-helpers", test))] diff --git a/object_store_utils/src/object_store_health.rs b/object_store_utils/src/object_store_health.rs new file mode 100644 index 00000000000..7c1db639462 --- /dev/null +++ b/object_store_utils/src/object_store_health.rs @@ -0,0 +1,198 @@ +//! Shared state for tracking object store health, used by the `/ready` endpoint. +//! +//! [`ObjectStoreHealth`] is intended to be shared between writers (an +//! `ObservedObjectStore` wrapper and a startup probe) and a reader (the +//! `/ready` HTTP handler). Reads must be lock-free since the handler may be +//! polled frequently. + +use iox_time::Time; +use std::sync::Arc; +use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; + +/// Operator-friendly classification of an [`object_store::Error`]. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(u8)] +pub enum ErrorCategory { + AuthFailure = 1, + NotFound = 2, + Timeout = 3, + ConfigError = 4, + Unknown = 5, +} + +impl ErrorCategory { + pub fn as_str(self) -> &'static str { + match self { + Self::AuthFailure => "auth_failure", + Self::NotFound => "not_found", + Self::Timeout => "timeout", + Self::ConfigError => "config_error", + Self::Unknown => "unknown", + } + } + + pub fn hint(self) -> &'static str { + match self { + Self::AuthFailure => { + "object store returned access denied; verify credentials and IAM policy" + } + Self::NotFound => { + "object store target path was not found; verify bucket and prefix configuration" + } + Self::Timeout => { + "object store request timed out; verify network connectivity to the endpoint" + } + Self::ConfigError => { + "object store is misconfigured; check --object-store and related flags" + } + Self::Unknown => "object store request failed; check logs for details", + } + } + + /// Classify an [`object_store::Error`] into an operator-friendly category. + /// + /// Note: this does **not** return [`ErrorCategory::Timeout`]. The + /// `object_store` crate has no dedicated timeout variant; timeouts surface + /// as `Error::Generic` with the timeout wrapped inside an opaque source, + /// and walking that chain to reliably detect timeouts is fragile across + /// crate versions. Runtime operation timeouts are therefore categorized as + /// [`ErrorCategory::Unknown`] by this function. + /// + /// [`ErrorCategory::Timeout`] is reserved for callers that know a timeout + /// occurred because they applied an explicit `tokio::time::timeout` + /// wrapper themselves (e.g. the startup probe in `ready_probe.rs`). Such + /// callers should set the category directly via + /// [`ObjectStoreHealth::record_error`] rather than relying on this + /// function. + pub fn categorize(err: &object_store::Error) -> Self { + use object_store::Error; + match err { + Error::PermissionDenied { .. } | Error::Unauthenticated { .. } => Self::AuthFailure, + Error::NotFound { .. } => Self::NotFound, + Error::NotSupported { .. } | Error::NotImplemented | Error::InvalidPath { .. } => { + Self::ConfigError + } + _ => Self::Unknown, + } + } +} + +/// Lock-free shared state recording the last successful and last failed object +/// store operation. +/// +/// `last_success_at` is nanoseconds since the Unix epoch in an [`AtomicI64`], +/// using `0` as the unset sentinel. +/// +/// The last-error state (timestamp + category) is packed into a single +/// [`AtomicU64`] rather than two separate atomics. Packing ensures readers +/// always see a consistent `(timestamp, category)` pair: with two atomics and +/// `Relaxed` ordering, a reader could observe a new timestamp alongside an old +/// or zero category (or vice versa), which would render as a timestamp without +/// a category in `/ready` responses. Using a single atomic eliminates that +/// window without needing `Acquire`/`Release` ordering or a lock. +/// +/// Packing layout (64 bits): +/// - bits 63..56: [`ErrorCategory`] discriminant (0 = unset) +/// - bits 55..0: nanoseconds since the Unix epoch (truncated to 56 bits) +/// +/// 56 bits of nanos covers through roughly year 2285, which is plenty for +/// practical server runtimes. We assume timestamps are non-negative; a +/// negative timestamp would alias into the category byte when masked. In +/// practice `TimeProvider::now()` returns non-negative values, so this does +/// not arise. +/// +/// The packed value `0` is reserved for "no error recorded" (category 0 is +/// not a valid discriminant). +#[derive(Debug, Default)] +pub struct ObjectStoreHealth { + last_success_at: AtomicI64, + last_error: AtomicU64, +} + +const ERROR_TIMESTAMP_MASK: u64 = 0x00FF_FFFF_FFFF_FFFF; + +impl ObjectStoreHealth { + /// Construct a fresh, shared [`ObjectStoreHealth`]. All fields start + /// unset; [`is_ok`](Self::is_ok) returns `false` until the first success + /// is recorded. + pub fn new() -> Arc { + Arc::new(Self::default()) + } + + /// Record that an object store operation succeeded at `at`. Overwrites + /// any previously recorded success timestamp. Does not clear prior error + /// state; [`last_error_at`](Self::last_error_at) keeps its value so + /// operators retain historical context after recovery. Use + /// [`is_ok`](Self::is_ok) or compare timestamps to determine current + /// health. + pub fn record_success(&self, at: Time) { + self.last_success_at + .store(at.timestamp_nanos(), Ordering::Relaxed); + } + + /// Record that an object store operation failed at `at` with the given + /// category. Writes atomically -- readers never observe a new timestamp + /// with an old category (or vice versa). Overwrites any previously + /// recorded error. + pub fn record_error(&self, at: Time, category: ErrorCategory) { + let nanos = at.timestamp_nanos() as u64 & ERROR_TIMESTAMP_MASK; + let packed = ((category as u8 as u64) << 56) | nanos; + self.last_error.store(packed, Ordering::Relaxed); + } + + /// Return the timestamp of the most recent recorded success, or `None` + /// if no success has ever been recorded. + pub fn last_success_at(&self) -> Option