diff --git a/Cargo.lock b/Cargo.lock index e368dcf9a..c4cd4cbd4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -50,6 +50,15 @@ dependencies = [ "core_extensions", ] +[[package]] +name = "addr2line" +version = "0.25.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b5d307320b3181d6d7954e663bd7c774a838b8220fe0593c86d9fb09f498b4b" +dependencies = [ + "gimli", +] + [[package]] name = "adler2" version = "2.0.1" @@ -956,6 +965,21 @@ dependencies = [ "tower-service", ] +[[package]] +name = "backtrace" +version = "0.3.76" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb531853791a215d7c62a30daf0dde835f381ab5de4589cfe7c649d2cbe92bd6" +dependencies = [ + "addr2line", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", + "windows-link 0.2.0", +] + [[package]] name = "base64" version = "0.21.7" @@ -1224,6 +1248,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "bytemuck" +version = "1.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fbdf580320f38b612e485521afda1ee26d10cc9884efaaa750d383e13e3c5f4" + [[package]] name = "byteorder" version = "1.5.0" @@ -2139,15 +2169,18 @@ dependencies = [ "datafusion-proto", "env_logger", "futures", + "jemalloc_pprof", "log", "mimalloc", "nix", "object_store", + "parquet", "prost 0.13.5", "rand 0.9.2", "serde_json", "tempfile", "test-utils", + "tikv-jemallocator", "tokio", "tonic", "tracing", @@ -3215,6 +3248,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "gimli" +version = "0.32.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e629b9b98ef3dd8afe6ca2bd0f89306cec16d43d907889945bc5d6687f2f13c7" + [[package]] name = "glob" version = "0.3.3" @@ -3706,6 +3745,28 @@ version = "2.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f4c7245a08504955605670dbf141fceab975f15ca21570696aebe9d2e71576bd" +[[package]] +name = "inferno" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e96d2465363ed2d81857759fc864cf6bb7997f79327aec028d65bd7989393685" +dependencies = [ + "ahash 0.8.12", + "clap 4.5.48", + "crossbeam-channel", + "crossbeam-utils", + "dashmap", + "env_logger", + "indexmap 2.12.0", + "itoa", + "log", + "num-format", + "once_cell", + "quick-xml 0.37.5", + "rgb", + "str_stack", +] + [[package]] name = "insta" version = "1.43.2" @@ -3804,6 +3865,23 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" +[[package]] +name = "jemalloc_pprof" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74ff642505c7ce8d31c0d43ec0e235c6fd4585d9b8172d8f9dd04d36590200b5" +dependencies = [ + "anyhow", + "libc", + "mappings", + "once_cell", + "pprof_util", + "tempfile", + "tikv-jemalloc-ctl", + "tokio", + "tracing", +] + [[package]] name = "jiff" version = "0.2.15" @@ -4046,6 +4124,19 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "mappings" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db4d277bb50d4508057e7bddd7fcd19ef4a4cc38051b6a5a36868d75ae2cbeb9" +dependencies = [ + "anyhow", + "libc", + "once_cell", + "pprof_util", + "tracing", +] + [[package]] name = "matchit" version = "0.8.4" @@ -4224,6 +4315,16 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" +[[package]] +name = "num-format" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a652d9771a63711fd3c3deb670acfbe5c30a4072e664d7a3bf5a9e1056ac72c3" +dependencies = [ + "arrayvec", + "itoa", +] + [[package]] name = "num-integer" version = "0.1.46" @@ -4284,6 +4385,15 @@ dependencies = [ "objc2-core-foundation", ] +[[package]] +name = "object" +version = "0.37.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff76201f031d8863c38aa7f905eca4f53abbfa15f609db4277d44cd8938f33fe" +dependencies = [ + "memchr", +] + [[package]] name = "object_store" version = "0.12.4" @@ -4304,7 +4414,7 @@ dependencies = [ "md-5", "parking_lot", "percent-encoding", - "quick-xml", + "quick-xml 0.38.3", "rand 0.9.2", "reqwest", "ring", @@ -4716,6 +4826,21 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" +[[package]] +name = "pprof_util" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9aba4251d95ac86f14c33e688d57a9344bfcff29e9b0c5a063fc66b5facc8a1" +dependencies = [ + "anyhow", + "backtrace", + "flate2", + "inferno", + "num", + "paste", + "prost 0.13.5", +] + [[package]] name = "ppv-lite86" version = "0.2.21" @@ -4997,6 +5122,15 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a651516ddc9168ebd67b24afd085a718be02f8858fe406591b013d101ce2f40" +[[package]] +name = "quick-xml" +version = "0.37.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "331e97a1af0bf59823e6eadffe373d7b27f485be8748f71471c662c1f269b7fb" +dependencies = [ + "memchr", +] + [[package]] name = "quick-xml" version = "0.38.3" @@ -5362,6 +5496,15 @@ dependencies = [ "web-sys", ] +[[package]] +name = "rgb" +version = "0.8.52" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c6a884d2998352bb4daf0183589aec883f16a6da1f4dde84d8e2e9a5409a1ce" +dependencies = [ + "bytemuck", +] + [[package]] name = "ring" version = "0.17.14" @@ -5463,6 +5606,12 @@ dependencies = [ "serde_json", ] +[[package]] +name = "rustc-demangle" +version = "0.1.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace" + [[package]] name = "rustc-hash" version = "2.1.1" @@ -6036,6 +6185,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "str_stack" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9091b6114800a5f2141aee1d1b9d6ca3592ac062dc5decb3764ec5895a47b4eb" + [[package]] name = "stringprep" version = "0.1.5" @@ -6358,6 +6513,37 @@ dependencies = [ "ordered-float", ] +[[package]] +name = "tikv-jemalloc-ctl" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "661f1f6a57b3a36dc9174a2c10f19513b4866816e13425d3e418b11cc37bc24c" +dependencies = [ + "libc", + "paste", + "tikv-jemalloc-sys", +] + +[[package]] +name = "tikv-jemalloc-sys" +version = "0.6.1+5.3.0-1-ge13ca993e8ccb9ba9847cc330696e02839f328f7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd8aa5b2ab86a2cefa406d889139c162cbb230092f7d1d7cbc1716405d852a3b" +dependencies = [ + "cc", + "libc", +] + +[[package]] +name = "tikv-jemallocator" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0359b4327f954e0567e69fb191cf1436617748813819c94b8cd4a431422d053a" +dependencies = [ + "libc", + "tikv-jemalloc-sys", +] + [[package]] name = "time" version = "0.3.44" diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index 68bb5376a..0419c7e5c 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -56,6 +56,10 @@ path = "examples/external_dependency/query-aws-s3.rs" name = "custom_file_casts" path = "examples/custom_file_casts.rs" +[[example]] +name = "proto_memory" +path = "examples/proto_memory.rs" + [dev-dependencies] arrow = { workspace = true } # arrow_schema is required for record_batch! macro :sad: @@ -75,6 +79,7 @@ futures = { workspace = true } log = { workspace = true } mimalloc = { version = "0.1", default-features = false } object_store = { workspace = true, features = ["aws", "http"] } +parquet = { workspace = true } prost = { workspace = true } rand = { workspace = true } serde_json = { workspace = true } @@ -89,3 +94,7 @@ uuid = "1.18" [target.'cfg(not(target_os = "windows"))'.dev-dependencies] nix = { version = "0.30.1", features = ["fs"] } + +[target.'cfg(not(target_env = "msvc"))'.dependencies] +tikv-jemallocator = { version = "0.6.0", features = ["profiling", "unprefixed_malloc_on_supported_platforms"] } +jemalloc_pprof = { version = "0.8.1", features = ["symbolize", "flamegraph"] } \ No newline at end of file diff --git a/datafusion-examples/examples/proto_memory.rs b/datafusion-examples/examples/proto_memory.rs new file mode 100644 index 000000000..e5e30f7ef --- /dev/null +++ b/datafusion-examples/examples/proto_memory.rs @@ -0,0 +1,44 @@ +use datafusion::{common::Result, prelude::*}; +use datafusion_proto::bytes::{physical_plan_from_bytes, physical_plan_to_bytes}; +use parquet::{arrow::ArrowWriter, file::properties::WriterProperties}; + + +#[cfg(not(target_env = "msvc"))] +#[global_allocator] +static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; + +#[allow(non_upper_case_globals)] +#[export_name = "malloc_conf"] +pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:19\0"; + + +#[tokio::main] +async fn main() -> Result<()> { + let mut prof_ctl = jemalloc_pprof::PROF_CTL.as_ref().unwrap().lock().await; + prof_ctl.activate().unwrap(); + + let _plan = { + let ctx = SessionContext::new(); + let batches = ctx.sql("SELECT c FROM generate_series(1, 1000000) t(c)").await?.collect().await?; + let file = std::fs::File::create("test.parquet")?; + let props = WriterProperties::builder() + // limit batch sizes so that we have useful statistics + .set_max_row_group_size(4096) + .build(); + let mut writer = ArrowWriter::try_new(file, batches[0].schema(), Some(props))?; + for batch in &batches { + writer.write(batch)?; + } + writer.close()?; + + let mut df = ctx.read_parquet("test.parquet", ParquetReadOptions::default()).await?; + df = df.filter(col("c").in_list((1_000..10_000).map(|v| lit(v)).collect(), false))?; + let plan = df.create_physical_plan().await?; + physical_plan_from_bytes(&physical_plan_to_bytes(plan)?, &ctx.task_ctx())? + }; + + let pprof = prof_ctl.dump_pprof().unwrap(); + std::fs::write("proto_memory.pprof", pprof).unwrap(); + + Ok(()) +}