Skip to content

Commit 5fc8a84

Browse files
adriangbclaude
andcommitted
feat: add pgjson format support for EXPLAIN ANALYZE
Extend the existing `FORMAT pgjson` option so that it also renders `EXPLAIN ANALYZE` output as PostgreSQL-style JSON, suitable for visualizers such as Dalibo and PEV2. Each physical operator becomes a JSON object carrying: - `Node Type` — `ExecutionPlan::name()` - `Details` — the one-line `DisplayAs::Default` rendering - `Actual Rows` / `Actual Total Time` — PG-canonical metric keys populated from `output_rows` / `elapsed_compute` - `Extras` — remaining DataFusion metrics keyed by their native name - `Plans` — child nodes The existing logical-plan pgjson path, metric filtering config (`analyze_categories`), and indent-format behavior are unchanged. `Tree` and `Graphviz` with `ANALYZE` remain unsupported with a clear error. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent cfafce4 commit 5fc8a84

10 files changed

Lines changed: 684 additions & 62 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/core/src/physical_planner.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2725,14 +2725,17 @@ impl DefaultPhysicalPlanner {
27252725
ExplainAnalyzeCategories::All => None,
27262726
ExplainAnalyzeCategories::Only(cats) => Some(cats),
27272727
};
2728-
Ok(Arc::new(AnalyzeExec::new(
2729-
a.verbose,
2730-
show_statistics,
2731-
metric_types,
2732-
metric_categories,
2733-
input,
2734-
schema,
2735-
)))
2728+
Ok(Arc::new(
2729+
AnalyzeExec::new(
2730+
a.verbose,
2731+
show_statistics,
2732+
metric_types,
2733+
metric_categories,
2734+
input,
2735+
schema,
2736+
)
2737+
.with_format(a.format.clone()),
2738+
))
27362739
}
27372740

27382741
/// Optimize a physical plan by applying each physical optimizer,

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1327,6 +1327,7 @@ impl LogicalPlanBuilder {
13271327
if explain_option.analyze {
13281328
Ok(Self::new(LogicalPlan::Analyze(Analyze {
13291329
verbose: explain_option.verbose,
1330+
format: explain_option.format,
13301331
input: self.plan,
13311332
schema,
13321333
})))

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1092,6 +1092,7 @@ impl LogicalPlan {
10921092
let input = self.only_input(inputs)?;
10931093
Ok(LogicalPlan::Analyze(Analyze {
10941094
verbose: a.verbose,
1095+
format: a.format.clone(),
10951096
schema: Arc::clone(&a.schema),
10961097
input: Arc::new(input),
10971098
}))
@@ -3299,13 +3300,17 @@ impl PartialOrd for Explain {
32993300
pub struct Analyze {
33003301
/// Should extra detail be included?
33013302
pub verbose: bool,
3303+
/// Output syntax/format for the rendered physical plan + metrics.
3304+
pub format: ExplainFormat,
33023305
/// The logical plan that is being EXPLAIN ANALYZE'd
33033306
pub input: Arc<LogicalPlan>,
33043307
/// The output schema of the explain (2 columns of text)
33053308
pub schema: DFSchemaRef,
33063309
}
33073310

3308-
// Manual implementation needed because of `schema` field. Comparison excludes this field.
3311+
// Manual implementation needed because of `schema` field, and because
3312+
// `ExplainFormat` does not implement `PartialOrd`. Comparison excludes both
3313+
// fields.
33093314
impl PartialOrd for Analyze {
33103315
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
33113316
match self.verbose.partial_cmp(&other.verbose) {

datafusion/expr/src/logical_plan/tree_node.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,11 +215,13 @@ impl TreeNode for LogicalPlan {
215215
}),
216216
LogicalPlan::Analyze(Analyze {
217217
verbose,
218+
format,
218219
input,
219220
schema,
220221
}) => input.map_elements(f)?.update_data(|input| {
221222
LogicalPlan::Analyze(Analyze {
222223
verbose,
224+
format,
223225
input,
224226
schema,
225227
})

datafusion/physical-plan/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ log = { workspace = true }
7070
num-traits = { workspace = true }
7171
parking_lot = { workspace = true }
7272
pin-project-lite = "^0.2.7"
73+
serde_json = { workspace = true }
7374
tokio = { workspace = true }
7475

7576
[dev-dependencies]
@@ -81,6 +82,9 @@ insta = { workspace = true }
8182
rand = { workspace = true }
8283
rstest = { workspace = true }
8384
rstest_reuse = "0.7.0"
85+
# Ensure `pgjson_snapshot_of_sample_plan` sees insertion-order JSON output
86+
# regardless of feature unification with upstream consumers.
87+
serde_json = { workspace = true, features = ["preserve_order"] }
8488
tokio = { workspace = true, features = [
8589
"rt-multi-thread",
8690
"fs",

datafusion/physical-plan/src/analyze.rs

Lines changed: 91 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,12 @@ use crate::metrics::{MetricCategory, MetricType};
2929
use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
3030

3131
use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
32+
use datafusion_common::format::ExplainFormat;
3233
use datafusion_common::instant::Instant;
3334
use datafusion_common::tree_node::TreeNodeRecursion;
34-
use datafusion_common::{DataFusionError, Result, assert_eq_or_internal_err};
35+
use datafusion_common::{
36+
DataFusionError, Result, assert_eq_or_internal_err, internal_err,
37+
};
3538
use datafusion_execution::TaskContext;
3639
use datafusion_physical_expr::EquivalenceProperties;
3740
use datafusion_physical_expr::PhysicalExpr;
@@ -50,6 +53,8 @@ pub struct AnalyzeExec {
5053
metric_types: Vec<MetricType>,
5154
/// Optional filter by semantic category (rows / bytes / timing).
5255
metric_categories: Option<Vec<MetricCategory>>,
56+
/// Output format for the rendered plan + metrics.
57+
format: ExplainFormat,
5358
/// The input plan (the plan being analyzed)
5459
pub(crate) input: Arc<dyn ExecutionPlan>,
5560
/// The output schema for RecordBatches of this exec node
@@ -58,7 +63,7 @@ pub struct AnalyzeExec {
5863
}
5964

6065
impl AnalyzeExec {
61-
/// Create a new AnalyzeExec
66+
/// Create a new AnalyzeExec with the default output format (indent).
6267
pub fn new(
6368
verbose: bool,
6469
show_statistics: bool,
@@ -73,12 +78,19 @@ impl AnalyzeExec {
7378
show_statistics,
7479
metric_types,
7580
metric_categories,
81+
format: ExplainFormat::Indent,
7682
input,
7783
schema,
7884
cache: Arc::new(cache),
7985
}
8086
}
8187

88+
/// Builder: set the output format (indent or pgjson).
89+
pub fn with_format(mut self, format: ExplainFormat) -> Self {
90+
self.format = format;
91+
self
92+
}
93+
8294
/// Access to verbose
8395
pub fn verbose(&self) -> bool {
8496
self.verbose
@@ -94,6 +106,11 @@ impl AnalyzeExec {
94106
self.metric_categories.as_deref()
95107
}
96108

109+
/// Access to format
110+
pub fn format(&self) -> &ExplainFormat {
111+
&self.format
112+
}
113+
97114
/// The input plan
98115
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
99116
&self.input
@@ -160,14 +177,17 @@ impl ExecutionPlan for AnalyzeExec {
160177
self: Arc<Self>,
161178
mut children: Vec<Arc<dyn ExecutionPlan>>,
162179
) -> Result<Arc<dyn ExecutionPlan>> {
163-
Ok(Arc::new(Self::new(
164-
self.verbose,
165-
self.show_statistics,
166-
self.metric_types.clone(),
167-
self.metric_categories.clone(),
168-
children.pop().unwrap(),
169-
Arc::clone(&self.schema),
170-
)))
180+
Ok(Arc::new(
181+
Self::new(
182+
self.verbose,
183+
self.show_statistics,
184+
self.metric_types.clone(),
185+
self.metric_categories.clone(),
186+
children.pop().unwrap(),
187+
Arc::clone(&self.schema),
188+
)
189+
.with_format(self.format.clone()),
190+
))
171191
}
172192

173193
fn execute(
@@ -204,6 +224,7 @@ impl ExecutionPlan for AnalyzeExec {
204224
let show_statistics = self.show_statistics;
205225
let metric_types = self.metric_types.clone();
206226
let metric_categories = self.metric_categories.clone();
227+
let format = self.format.clone();
207228

208229
// future that gathers the results from all the tasks in the
209230
// JoinSet that computes the overall row count and final
@@ -225,6 +246,7 @@ impl ExecutionPlan for AnalyzeExec {
225246
&captured_schema,
226247
&metric_types,
227248
metric_categories.as_deref(),
249+
&format,
228250
)
229251
};
230252

@@ -246,39 +268,69 @@ fn create_output_batch(
246268
schema: &SchemaRef,
247269
metric_types: &[MetricType],
248270
metric_categories: Option<&[MetricCategory]>,
271+
format: &ExplainFormat,
249272
) -> Result<RecordBatch> {
250273
let mut type_builder = StringBuilder::with_capacity(1, 1024);
251274
let mut plan_builder = StringBuilder::with_capacity(1, 1024);
252275

253-
// TODO use some sort of enum rather than strings?
254-
type_builder.append_value("Plan with Metrics");
255-
256-
let annotated_plan = DisplayableExecutionPlan::with_metrics(input.as_ref())
257-
.set_metric_types(metric_types.to_vec())
258-
.set_metric_categories(metric_categories.map(|c| c.to_vec()))
259-
.set_show_statistics(show_statistics)
260-
.indent(verbose)
261-
.to_string();
262-
plan_builder.append_value(annotated_plan);
263-
264-
// Verbose output
265-
// TODO make this more sophisticated
266-
if verbose {
267-
type_builder.append_value("Plan with Full Metrics");
268-
269-
let annotated_plan = DisplayableExecutionPlan::with_full_metrics(input.as_ref())
270-
.set_metric_types(metric_types.to_vec())
271-
.set_metric_categories(metric_categories.map(|c| c.to_vec()))
272-
.set_show_statistics(show_statistics)
273-
.indent(verbose)
274-
.to_string();
275-
plan_builder.append_value(annotated_plan);
276-
277-
type_builder.append_value("Output Rows");
278-
plan_builder.append_value(total_rows.to_string());
279-
280-
type_builder.append_value("Duration");
281-
plan_builder.append_value(format!("{duration:?}"));
276+
match format {
277+
ExplainFormat::Indent => {
278+
// TODO use some sort of enum rather than strings?
279+
type_builder.append_value("Plan with Metrics");
280+
281+
let annotated_plan = DisplayableExecutionPlan::with_metrics(input.as_ref())
282+
.set_metric_types(metric_types.to_vec())
283+
.set_metric_categories(metric_categories.map(|c| c.to_vec()))
284+
.set_show_statistics(show_statistics)
285+
.indent(verbose)
286+
.to_string();
287+
plan_builder.append_value(annotated_plan);
288+
289+
// Verbose output
290+
// TODO make this more sophisticated
291+
if verbose {
292+
type_builder.append_value("Plan with Full Metrics");
293+
294+
let annotated_plan =
295+
DisplayableExecutionPlan::with_full_metrics(input.as_ref())
296+
.set_metric_types(metric_types.to_vec())
297+
.set_metric_categories(metric_categories.map(|c| c.to_vec()))
298+
.set_show_statistics(show_statistics)
299+
.indent(verbose)
300+
.to_string();
301+
plan_builder.append_value(annotated_plan);
302+
303+
type_builder.append_value("Output Rows");
304+
plan_builder.append_value(total_rows.to_string());
305+
306+
type_builder.append_value("Duration");
307+
plan_builder.append_value(format!("{duration:?}"));
308+
}
309+
}
310+
ExplainFormat::PostgresJSON => {
311+
// For pgjson we emit a single self-contained JSON array so the
312+
// result remains parseable. Summary values (total rows / duration)
313+
// are attached to the root object in verbose mode rather than
314+
// emitted as separate rows.
315+
type_builder.append_value("Plan with Metrics");
316+
317+
let mut displayable = if verbose {
318+
DisplayableExecutionPlan::with_full_metrics(input.as_ref())
319+
} else {
320+
DisplayableExecutionPlan::with_metrics(input.as_ref())
321+
};
322+
displayable = displayable
323+
.set_metric_types(metric_types.to_vec())
324+
.set_metric_categories(metric_categories.map(|c| c.to_vec()))
325+
.set_show_statistics(show_statistics);
326+
if verbose {
327+
displayable = displayable.set_summary(Some(total_rows), Some(duration));
328+
}
329+
plan_builder.append_value(displayable.pgjson(verbose).to_string());
330+
}
331+
ExplainFormat::Tree | ExplainFormat::Graphviz => {
332+
return internal_err!("AnalyzeExec does not support {format} output format");
333+
}
282334
}
283335

284336
RecordBatch::try_new(

0 commit comments

Comments
 (0)