Conversation
| self.cache = | ||
| Self::compute_properties(&self.input, Arc::clone(&self.input.schema())); |
There was a problem hiding this comment.
Needs to be recomputed since the output changes.
| if auto_explain { | ||
| if duration.as_millis() >= auto_explain_min_duration as u128 { | ||
| export_auto_explain(out, &auto_explain_output)?; | ||
| } | ||
| concat_batches(&inner_schema, &batches).map_err(DataFusionError::from) | ||
| } else { | ||
| Ok(out) | ||
| } |
There was a problem hiding this comment.
The auto_explain mode will return the input's batches instead of the analyze.
| let fd: &mut dyn Write = match output { | ||
| "stdout" => &mut io::stdout(), | ||
| "stderr" => &mut io::stderr(), | ||
| _ => &mut OpenOptions::new().create(true).append(true).open(output)?, |
There was a problem hiding this comment.
Does this need any kind of validation of the file location ?
Or it is left to the developer/admin to make sure it is a safe place ?
There was a problem hiding this comment.
Does this need some kind of synchronisation when a file path is used for the output ? Two or more DF sessions using the same config may try to write to the same file simultaneously.
There was a problem hiding this comment.
Does this need any kind of validation of the file location ?
Or it is left to the developer/admin to make sure it is a safe place ?
I think it's better to leave this to the user (either way, an error is returned).
There was a problem hiding this comment.
Does this need some kind of synchronisation when a file path is used for the output ? Two or more DF sessions using the same config may try to write to the same file simultaneously.
I think again the responsibility of this falls on the user. Is it common to use multiple sessions over the same config?
| # test auto_explain | ||
|
|
||
| statement ok | ||
| set datafusion.explain.auto_explain_output = 'test_files/scratch/auto_explain.txt'; |
There was a problem hiding this comment.
Does something assert the contents of this output file ?
Does something remove this file at the end ?
There was a problem hiding this comment.
I originally tried to load the file to a table as CSV, as I think it is the only feasible way to check the contents, but since the file cannot be removed the result would always change. I mainly added these sqllogictests just to check the "set ..." commands.
As for removing the file, I'm not sure it is possible. With that said, I don't think it is necessary since it's written to the sqllogictest temporary dir.
Co-authored-by: Martin Grigorov <martin-g@users.noreply.github.com>
Co-authored-by: Martin Grigorov <martin-g@users.noreply.github.com>
alamb
left a comment
There was a problem hiding this comment.
Thanks @nuno-faria and @martin-g -- sorry for the delay in reviewing this PR
Form my perspective, this is a valuable feature but having the output to stdout / stderr in the core library is not idea.
Instead, it seems to me that auto explain belongs in the client applications themselves (e.g. datafusion-cli)
I tried to explain my thinking more here
|
@alamb thanks for the review. I also get the concerns of polluting I will have to look at the Observer suggestion once I get the time. |
What about using |
I added I was also thinking of adding the query to the output, similar to Postgre's |
I think if you wanted to make it easier to implement downstream maybe we would add some sort of API / callback thing (trait object?) that could get the info for auto_explains. Then the default implementation could log with |
Sounds good. I'll try to tackle this soon. |
|
@alamb I refactored the previous auto_explain mode to now use a new pub trait PlanObserver: Send + Sync + 'static + Debug {
fn plan_created(
&self,
id: &str,
logical_plan: &LogicalPlan,
physical_plan: &Arc<dyn ExecutionPlan>,
) -> Result<()>;
fn plan_executed(
&self,
id: &str,
explain_result: RecordBatch,
duration: Duration,
) -> Result<()>;
}The It can be used like this: let plan_observer = DefaultPlanObserver::new("auto_explain.txt".to_owned(), 0);
let ctx = SessionContext::new().with_plan_observer(Arc::new(plan_observer));
ctx.sql("create table t (k int, v int)").await?.collect().await?;
// auto explain needs to be enabled
ctx.sql("set datafusion.explain.auto_explain = true").await?.collect().await?;
ctx.sql("select * from t where k = 1 or k = 2 order by v desc limit 5").await?.collect().await?;The QUERY: SELECT t.k, t.v FROM t WHERE ((t.k = 1) OR (t.k = 2)) ORDER BY t.v DESC NULLS FIRST LIMIT 5
DURATION: 0.689ms
EXPLAIN:
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | SortExec: TopK(fetch=5), expr=[v@1 DESC], preserve_partitioning=[false], metrics=[output_rows=0, elapsed_compute=13.40µs, output_bytes=0.0 B, output_batches=0, row_replacements=0] |
| | FilterExec: k@0 = 1 OR k@0 = 2, metrics=[output_rows=0, elapsed_compute=1ns, output_bytes=0.0 B, output_batches=0, selectivity=N/A (0/0)] |
| | DataSourceExec: partitions=1, partition_sizes=[0], metrics=[] |
| | |
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+(If the Let me know what you think about the API. |
alamb
left a comment
There was a problem hiding this comment.
Thanks @nuno-faria -- I went over this ne again. I think it is getting close
| pub struct DefaultPlanObserver { | ||
| output: String, | ||
| min_duration_ms: usize, | ||
| /// stores a SQL representation of the logical plan, if the `sql` feature is enabled. |
There was a problem hiding this comment.
Codex points out that on the error path this map never gets cleaned up -- so a bunch of errors will potentially cause this map to grow without bound. Maybe something we could clean up as a follow on PR (file a ticket to fix, etc)
There was a problem hiding this comment.
Good call, I completely missed that. I think we could use a queue that removes queries from that map once it reaches some limit.
| /// - `log::info` | ||
| /// - `log::debug` | ||
| /// - `log::trace` | ||
| /// - a file path: creates the file if it does not exist, or appends to it if it does. |
There was a problem hiding this comment.
I think it would make more sense to have an explicit enum or something here especially as this can do file I/O
I worry that if someone accidentally passes in log:error (one :) that will write to a file named log:error
What do you think about using something like
enum Output {
LogError,
LogWarn,
...
LogToFile(String),
}?
| function_factory, | ||
| cache_factory, | ||
| prepared_plans: HashMap::new(), | ||
| plan_observer: Some(Arc::new(DefaultPlanObserver::default())), |
There was a problem hiding this comment.
This should only be set when auto_explain is enabled, right?
There was a problem hiding this comment.
Ideally yes, but then as far as I'm aware the user wouldn't be able to turn on the default auto_explain without also having to first set a plan_observer. But if it is better that way I can set it to None, let me know.
| self.optimize_physical_plan(plan, session_state, |_, _| {}) | ||
| let mut plan = self.optimize_physical_plan(plan, session_state, |_, _| {})?; | ||
|
|
||
| // setup the auto explain mode if necessary |
There was a problem hiding this comment.
I wonder if this would be cleaner to add to handle_explain_or_analyze (or put it in its own method)?
There was a problem hiding this comment.
I moved this to the existing setup_auto_explain method so that everything is there. I'm not sure if it would fit in the handle_explain_or_analyze method since at that point the Analyze is still not in the plan.
| return_inner: bool, | ||
| } | ||
|
|
||
| /// Optionally used by the `AnalyzeExec` operator to callback it with the result. |
There was a problem hiding this comment.
We won't be able to easily add new things to this callback
It might also be useful to point out somewhere that setting return_inner is effectively going to buffer the entire query output into RAM (which could be quite large)
I wonder if it would make sense to define a trait
trait AnalyzeObserver {
...
}And then instead of
/// If Some, passes the output of the analyze once it completes, as well as the duration.
callback: Option<AnalyzeCallback>,Do something like
/// If Some, passes the output of the analyze once it completes, as well as the duration.
callback: Option<Arc<dyn AnalyzeObserver>>,That way if we want to add more methods (like adding something which sees the plan metrics) it would be easier to add without an API change.
There was a problem hiding this comment.
I replaced the callback with the following trait:
pub trait AnalyzeObserver: Debug + Send + Sync {
/// Provides the EXPLAIN ANALYZE output (annotated plan) and the total duration.
fn analyze_result_callback(
&self,
result: RecordBatch,
duration: std::time::Duration,
) -> Result<()>;
}| fn plan_executed( | ||
| &self, | ||
| id: &str, | ||
| explain_result: RecordBatch, |
There was a problem hiding this comment.
is this result the actual output batches?
Or is it the annotated explain plan?
There was a problem hiding this comment.
The annotated plan. I changed the variable name to annotated_plan to make it clearer.
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
|
Shoot -- I lost track of this one. I put it on my short list for review |
Which issue does this PR close?
auto_explainmode #19215.Rationale for this change
Allowing users to check the execution plans without needing to change the existing application.
The
auto_explainmode can be enabled with thedatafusion.explain.auto_explainconfig. In addition, there are two other configs:datafusion.explain.auto_explain_output: sets the output location of the plans. Supportsstdout,stderr, and a file path.datafusion.explain.auto_explain_min_duration: only outputs plans whose duration is greater than this value (similar to Postgres'auto_explain.log_min_duration).Example in
datafusion-cli:What changes are included in this PR?
AnalyzeExecoperator to support theauto_explainmode.AnalyzeExecoperator whenauto_explainis enabled.Are these changes tested?
Yes.
Are there any user-facing changes?
New feature, but it's completely optional.