Skip to content

Commit 449b216

Browse files
committed
Merge branch 'main' into simplify-regex
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2 parents 173f626 + b488b94 commit 449b216

File tree

19 files changed

+620
-286
lines changed

19 files changed

+620
-286
lines changed

.github/workflows/extended.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ jobs:
8181
- name: Run tests (excluding doctests)
8282
env:
8383
RUST_BACKTRACE: 1
84-
run: cargo test --profile ci --exclude datafusion-examples --exclude datafusion-benchmarks --workspace --lib --tests --bins --features avro,json,backtrace,extended_tests
84+
run: cargo test --profile ci --exclude datafusion-examples --exclude datafusion-benchmarks --workspace --lib --tests --bins --features avro,json,backtrace,extended_tests,recursive_protection
8585
- name: Verify Working Directory Clean
8686
run: git diff --exit-code
8787
- name: Cleanup

Cargo.lock

Lines changed: 16 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ datafusion-sql = { path = "datafusion/sql", version = "46.0.1" }
140140
doc-comment = "0.3"
141141
env_logger = "0.11"
142142
futures = "0.3"
143-
half = { version = "2.2.1", default-features = false }
143+
half = { version = "2.5.0", default-features = false }
144144
hashbrown = { version = "0.14.5", features = ["raw"] }
145145
indexmap = "2.7.1"
146146
itertools = "0.14"

datafusion/catalog/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,17 +35,17 @@ arrow = { workspace = true }
3535
async-trait = { workspace = true }
3636
dashmap = { workspace = true }
3737
datafusion-common = { workspace = true }
38+
datafusion-common-runtime = { workspace = true }
3839
datafusion-execution = { workspace = true }
3940
datafusion-expr = { workspace = true }
41+
datafusion-physical-expr = { workspace = true }
4042
datafusion-physical-plan = { workspace = true }
4143
datafusion-sql = { workspace = true }
4244
futures = { workspace = true }
4345
itertools = { workspace = true }
4446
log = { workspace = true }
4547
object_store = { workspace = true }
4648
parking_lot = { workspace = true }
47-
48-
[dev-dependencies]
4949
tokio = { workspace = true }
5050

5151
[lints]

datafusion/catalog/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,6 @@ pub use r#async::*;
4949
pub use schema::*;
5050
pub use session::*;
5151
pub use table::*;
52+
pub mod stream;
5253
pub mod streaming;
54+
pub mod view;
Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,23 +25,21 @@ use std::path::PathBuf;
2525
use std::str::FromStr;
2626
use std::sync::Arc;
2727

28-
use crate::catalog::{TableProvider, TableProviderFactory};
29-
use crate::datasource::create_ordering;
30-
28+
use crate::{Session, TableProvider, TableProviderFactory};
3129
use arrow::array::{RecordBatch, RecordBatchReader, RecordBatchWriter};
3230
use arrow::datatypes::SchemaRef;
3331
use datafusion_common::{config_err, plan_err, Constraints, DataFusionError, Result};
3432
use datafusion_common_runtime::SpawnedTask;
3533
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
3634
use datafusion_expr::dml::InsertOp;
3735
use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType};
36+
use datafusion_physical_expr::create_ordering;
3837
use datafusion_physical_plan::insert::{DataSink, DataSinkExec};
3938
use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder;
4039
use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
4140
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
4241

4342
use async_trait::async_trait;
44-
use datafusion_catalog::Session;
4543
use futures::StreamExt;
4644

4745
/// A [`TableProviderFactory`] for [`StreamTable`]
@@ -292,7 +290,7 @@ impl StreamConfig {
292290
/// data stored in object storage, should instead consider [`ListingTable`].
293291
///
294292
/// [Hadoop]: https://hadoop.apache.org/
295-
/// [`ListingTable`]: crate::datasource::listing::ListingTable
293+
/// [`ListingTable`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html
296294
#[derive(Debug)]
297295
pub struct StreamTable(Arc<StreamConfig>);
298296

datafusion/catalog/src/view.rs

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! View data source which uses a LogicalPlan as it's input.
19+
20+
use std::{any::Any, borrow::Cow, sync::Arc};
21+
22+
use crate::Session;
23+
use crate::TableProvider;
24+
25+
use arrow::datatypes::SchemaRef;
26+
use async_trait::async_trait;
27+
use datafusion_common::error::Result;
28+
use datafusion_common::Column;
29+
use datafusion_expr::TableType;
30+
use datafusion_expr::{Expr, LogicalPlan};
31+
use datafusion_expr::{LogicalPlanBuilder, TableProviderFilterPushDown};
32+
use datafusion_physical_plan::ExecutionPlan;
33+
34+
/// An implementation of `TableProvider` that uses another logical plan.
35+
#[derive(Debug)]
36+
pub struct ViewTable {
37+
/// LogicalPlan of the view
38+
logical_plan: LogicalPlan,
39+
/// File fields + partition columns
40+
table_schema: SchemaRef,
41+
/// SQL used to create the view, if available
42+
definition: Option<String>,
43+
}
44+
45+
impl ViewTable {
46+
/// Create new view that is executed at query runtime.
47+
///
48+
/// Takes a `LogicalPlan` and optionally the SQL text of the `CREATE`
49+
/// statement.
50+
///
51+
/// Notes: the `LogicalPlan` is not validated or type coerced. If this is
52+
/// needed it should be done after calling this function.
53+
pub fn new(logical_plan: LogicalPlan, definition: Option<String>) -> Self {
54+
let table_schema = logical_plan.schema().as_ref().to_owned().into();
55+
Self {
56+
logical_plan,
57+
table_schema,
58+
definition,
59+
}
60+
}
61+
62+
#[deprecated(
63+
since = "47.0.0",
64+
note = "Use `ViewTable::new` instead and apply TypeCoercion to the logical plan if needed"
65+
)]
66+
pub fn try_new(
67+
logical_plan: LogicalPlan,
68+
definition: Option<String>,
69+
) -> Result<Self> {
70+
Ok(Self::new(logical_plan, definition))
71+
}
72+
73+
/// Get definition ref
74+
pub fn definition(&self) -> Option<&String> {
75+
self.definition.as_ref()
76+
}
77+
78+
/// Get logical_plan ref
79+
pub fn logical_plan(&self) -> &LogicalPlan {
80+
&self.logical_plan
81+
}
82+
}
83+
84+
#[async_trait]
85+
impl TableProvider for ViewTable {
86+
fn as_any(&self) -> &dyn Any {
87+
self
88+
}
89+
90+
fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
91+
Some(Cow::Borrowed(&self.logical_plan))
92+
}
93+
94+
fn schema(&self) -> SchemaRef {
95+
Arc::clone(&self.table_schema)
96+
}
97+
98+
fn table_type(&self) -> TableType {
99+
TableType::View
100+
}
101+
102+
fn get_table_definition(&self) -> Option<&str> {
103+
self.definition.as_deref()
104+
}
105+
fn supports_filters_pushdown(
106+
&self,
107+
filters: &[&Expr],
108+
) -> Result<Vec<TableProviderFilterPushDown>> {
109+
// A filter is added on the View when given
110+
Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
111+
}
112+
113+
async fn scan(
114+
&self,
115+
state: &dyn Session,
116+
projection: Option<&Vec<usize>>,
117+
filters: &[Expr],
118+
limit: Option<usize>,
119+
) -> Result<Arc<dyn ExecutionPlan>> {
120+
let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
121+
let plan = self.logical_plan().clone();
122+
let mut plan = LogicalPlanBuilder::from(plan);
123+
124+
if let Some(filter) = filter {
125+
plan = plan.filter(filter)?;
126+
}
127+
128+
let mut plan = if let Some(projection) = projection {
129+
// avoiding adding a redundant projection (e.g. SELECT * FROM view)
130+
let current_projection =
131+
(0..plan.schema().fields().len()).collect::<Vec<usize>>();
132+
if projection == &current_projection {
133+
plan
134+
} else {
135+
let fields: Vec<Expr> = projection
136+
.iter()
137+
.map(|i| {
138+
Expr::Column(Column::from(
139+
self.logical_plan.schema().qualified_field(*i),
140+
))
141+
})
142+
.collect();
143+
plan.project(fields)?
144+
}
145+
} else {
146+
plan
147+
};
148+
149+
if let Some(limit) = limit {
150+
plan = plan.limit(0, Some(limit))?;
151+
}
152+
153+
state.create_physical_plan(&plan.build()?).await
154+
}
155+
}

0 commit comments

Comments
 (0)