Skip to content

Commit 1cdec0d

Browse files
authored
feat: add multiple constraints at once (delta-io#3879)
# Description This PR allow users to add multiple constraints at once using an HashMap. # Related Issue(s) <!--- For example: - closes delta-io#106 ---> - closes delta-io#1986 # Documentation <!--- Share links to useful documentation ---> --------- Signed-off-by: JustinRush80 <69156844+JustinRush80@users.noreply.github.com>
1 parent 52ada46 commit 1cdec0d

9 files changed

Lines changed: 232 additions & 75 deletions

File tree

crates/core/src/operations/constraints.rs

Lines changed: 188 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,14 @@ use crate::operations::datafusion_utils::Expression;
2323
use crate::protocol::DeltaOperation;
2424
use crate::table::Constraint;
2525
use crate::{DeltaResult, DeltaTable, DeltaTableError};
26+
use std::collections::HashMap;
2627

2728
/// Build a constraint to add to a table
2829
pub struct ConstraintBuilder {
2930
/// A snapshot of the table's state
3031
snapshot: Option<EagerSnapshot>,
31-
/// Name of the constraint
32-
name: Option<String>,
33-
/// Constraint expression
34-
expr: Option<Expression>,
32+
/// Hashmap containing an name of the constraint and expression
33+
check_constraints: HashMap<String, Expression>,
3534
/// Delta object store for handling data files
3635
log_store: LogStoreRef,
3736
/// Datafusion session state relevant for executing the input plan
@@ -54,8 +53,7 @@ impl ConstraintBuilder {
5453
/// Create a new builder
5554
pub(crate) fn new(log_store: LogStoreRef, snapshot: Option<EagerSnapshot>) -> Self {
5655
Self {
57-
name: None,
58-
expr: None,
56+
check_constraints: Default::default(),
5957
snapshot,
6058
log_store,
6159
session: None,
@@ -70,8 +68,21 @@ impl ConstraintBuilder {
7068
name: S,
7169
expression: E,
7270
) -> Self {
73-
self.name = Some(name.into());
74-
self.expr = Some(expression.into());
71+
self.check_constraints
72+
.insert(name.into(), expression.into());
73+
self
74+
}
75+
76+
/// Specify multiple constraints to be added
77+
pub fn with_constraints<S: Into<String>, E: Into<Expression>>(
78+
mut self,
79+
constraints: HashMap<S, E>,
80+
) -> Self {
81+
self.check_constraints.extend(
82+
constraints
83+
.into_iter()
84+
.map(|(name, expr)| (name.into(), expr.into())),
85+
);
7586
self
7687
}
7788

@@ -108,23 +119,29 @@ impl std::future::IntoFuture for ConstraintBuilder {
108119
let operation_id = this.get_operation_id();
109120
this.pre_execute(operation_id).await?;
110121

111-
let name = match this.name {
112-
Some(v) => v,
113-
None => return Err(DeltaTableError::Generic("No name provided".to_string())),
114-
};
115-
116-
let expr = this
117-
.expr
118-
.ok_or_else(|| DeltaTableError::Generic("No Expression provided".to_string()))?;
122+
if this.check_constraints.is_empty() {
123+
return Err(DeltaTableError::Generic(
124+
"No check constraint (Name and Expression) provided".to_string(),
125+
));
126+
}
119127

120128
let mut metadata = snapshot.metadata().clone();
121-
let configuration_key = format!("delta.constraints.{name}");
122129

123-
if metadata.configuration().contains_key(&configuration_key) {
124-
return Err(DeltaTableError::Generic(format!(
125-
"Constraint with name: {name} already exists"
126-
)));
127-
}
130+
let configuration_key_mapper: HashMap<String, String> = HashMap::from_iter(
131+
this.check_constraints
132+
.iter()
133+
.map(|(name, _)| (name.clone(), format!("delta.constraints.{name}"))),
134+
);
135+
136+
// Hold all the conflicted constraints
137+
let preexisting_constraints =
138+
configuration_key_mapper
139+
.iter()
140+
.filter(|(_, configuration_key)| {
141+
metadata
142+
.configuration()
143+
.contains_key(configuration_key.as_str())
144+
});
128145

129146
let session = this
130147
.session
@@ -136,13 +153,36 @@ impl std::future::IntoFuture for ConstraintBuilder {
136153
.await?;
137154

138155
let schema = scan.schema().to_dfschema()?;
139-
let expr = into_expr(expr, &schema, session.as_ref())?;
140-
let expr_str = fmt_expr_to_sql(&expr)?;
141156

142-
// Checker built here with the one time constraint to check.
143-
let checker =
144-
DeltaDataChecker::new_with_constraints(vec![Constraint::new("*", &expr_str)]);
157+
// Create an Hashmap of the name to the processed expression
158+
let mut constraints_sql_mapper = HashMap::with_capacity(this.check_constraints.len());
159+
for (name, _) in configuration_key_mapper.iter() {
160+
let converted_expr = into_expr(
161+
this.check_constraints[name].clone(),
162+
&schema,
163+
session.as_ref(),
164+
)?;
165+
let constraint_sql = fmt_expr_to_sql(&converted_expr)?;
166+
constraints_sql_mapper.insert(name, constraint_sql);
167+
}
145168

169+
for (name, configuration_key) in preexisting_constraints {
170+
// when the expression is different in the conflicted constraint --> error out due not knowing how to resolve it
171+
if !metadata.configuration()[configuration_key].eq(&constraints_sql_mapper[name]) {
172+
return Err(DeltaTableError::Generic(format!(
173+
"Cannot add constraint '{name}': a constraint with this name already exists with a different expression. Existing: '{}', New: '{}'",
174+
metadata.configuration()[configuration_key],constraints_sql_mapper[name]
175+
)));
176+
}
177+
tracing::warn!("Skipping constraint '{name}': identical constraint already exists with expression '{}'",constraints_sql_mapper[name]);
178+
}
179+
let constraints_checker: Vec<Constraint> = constraints_sql_mapper
180+
.iter()
181+
.map(|(_, sql)| Constraint::new("*", sql))
182+
.collect();
183+
184+
// Checker built here with the one time constraint to check.
185+
let checker = DeltaDataChecker::new_with_constraints(constraints_checker);
146186
let plan: Arc<dyn ExecutionPlan> = Arc::new(scan);
147187
let mut tasks = vec![];
148188
for p in 0..plan.properties().output_partitioning().partition_count() {
@@ -170,8 +210,12 @@ impl std::future::IntoFuture for ConstraintBuilder {
170210

171211
// We have validated the table passes it's constraints, now to add the constraint to
172212
// the table.
173-
metadata =
174-
metadata.add_config_key(format!("delta.constraints.{name}"), expr_str.clone())?;
213+
for (name, configuration_key) in configuration_key_mapper.iter() {
214+
metadata = metadata.add_config_key(
215+
configuration_key.to_string(),
216+
constraints_sql_mapper[&name].clone(),
217+
)?;
218+
}
175219

176220
let old_protocol = snapshot.protocol();
177221
let protocol = ProtocolInner {
@@ -199,10 +243,12 @@ impl std::future::IntoFuture for ConstraintBuilder {
199243
},
200244
}
201245
.as_kernel();
202-
246+
// Put all the constraint into one commit
203247
let operation = DeltaOperation::AddConstraint {
204-
name: name.clone(),
205-
expr: expr_str.clone(),
248+
constraints: constraints_sql_mapper
249+
.into_iter()
250+
.map(|(name, sql)| Constraint::new(name, &sql))
251+
.collect(),
206252
};
207253

208254
let actions = vec![metadata.into(), protocol.into()];
@@ -234,6 +280,7 @@ mod tests {
234280
use arrow_array::{Array, Int32Array, RecordBatch, StringArray};
235281
use arrow_schema::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
236282
use datafusion::logical_expr::{col, lit};
283+
use std::collections::HashMap;
237284

238285
use crate::table::config::TablePropertiesExt as _;
239286
use crate::writer::test_utils::{create_bare_table, get_arrow_schema, get_record_batch};
@@ -250,17 +297,28 @@ mod tests {
250297
.unwrap()
251298
}
252299

253-
async fn get_constraint_op_params(table: &mut DeltaTable) -> String {
300+
async fn get_constraint_op_params(table: &mut DeltaTable) -> HashMap<String, String> {
254301
let last_commit = table.last_commit().await.unwrap();
255-
last_commit
302+
let constraints_str = last_commit
256303
.operation_parameters
257304
.as_ref()
258305
.unwrap()
259-
.get("expr")
306+
.get("constraints")
260307
.unwrap()
261308
.as_str()
309+
.unwrap();
310+
311+
let constraints: serde_json::Value = serde_json::from_str(constraints_str).unwrap();
312+
constraints
313+
.as_array()
262314
.unwrap()
263-
.to_owned()
315+
.iter()
316+
.map(|value| {
317+
let name = value.get("name").unwrap().as_str().unwrap().to_owned();
318+
let expr = value.get("expr").unwrap().as_str().unwrap().to_owned();
319+
(name, expr)
320+
})
321+
.collect()
264322
}
265323

266324
#[tokio::test]
@@ -290,7 +348,7 @@ mod tests {
290348
}
291349

292350
#[tokio::test]
293-
async fn add_constraint_with_invalid_data() -> DeltaResult<()> {
351+
async fn test_add_constraint_with_invalid_data() -> DeltaResult<()> {
294352
let batch = get_record_batch(None, false);
295353
let write = DeltaOps(create_bare_table())
296354
.write(vec![batch.clone()])
@@ -306,7 +364,7 @@ mod tests {
306364
}
307365

308366
#[tokio::test]
309-
async fn add_valid_constraint() -> DeltaResult<()> {
367+
async fn test_add_valid_constraint() -> DeltaResult<()> {
310368
let batch = get_record_batch(None, false);
311369
let write = DeltaOps(create_bare_table())
312370
.write(vec![batch.clone()])
@@ -320,17 +378,53 @@ mod tests {
320378
let version = table.version();
321379
assert_eq!(version, Some(1));
322380

323-
let expected_expr = "value < 1000";
324-
assert_eq!(get_constraint_op_params(&mut table).await, expected_expr);
381+
let expected_expr = vec!["value < 1000"];
325382
assert_eq!(
326-
get_constraint(&table, "delta.constraints.id"),
383+
get_constraint_op_params(&mut table)
384+
.await
385+
.into_values()
386+
.collect::<Vec<String>>(),
327387
expected_expr
328388
);
389+
assert_eq!(
390+
get_constraint(&table, "delta.constraints.id"),
391+
expected_expr[0]
392+
);
329393
Ok(())
330394
}
331395

332396
#[tokio::test]
333-
async fn add_constraint_datafusion() -> DeltaResult<()> {
397+
async fn test_add_valid_multiple_constraints() -> DeltaResult<()> {
398+
let batch = get_record_batch(None, false);
399+
let write = DeltaOps(create_bare_table())
400+
.write(vec![batch.clone()])
401+
.await?;
402+
let table = DeltaOps(write);
403+
404+
let constraints = HashMap::from([("id", "value < 1000"), ("id2", "value < 20")]);
405+
406+
let mut table = table.add_constraint().with_constraints(constraints).await?;
407+
let version = table.version();
408+
assert_eq!(version, Some(1));
409+
410+
let expected_exprs = HashMap::from([
411+
("id".to_string(), "value < 1000".to_string()),
412+
("id2".to_string(), "value < 20".to_string()),
413+
]);
414+
assert_eq!(get_constraint_op_params(&mut table).await, expected_exprs);
415+
assert_eq!(
416+
get_constraint(&table, "delta.constraints.id"),
417+
expected_exprs["id"]
418+
);
419+
assert_eq!(
420+
get_constraint(&table, "delta.constraints.id2"),
421+
expected_exprs["id2"]
422+
);
423+
Ok(())
424+
}
425+
426+
#[tokio::test]
427+
async fn test_add_constraint_datafusion() -> DeltaResult<()> {
334428
// Add constraint by providing a datafusion expression.
335429
let batch = get_record_batch(None, false);
336430
let write = DeltaOps(create_bare_table())
@@ -345,12 +439,18 @@ mod tests {
345439
let version = table.version();
346440
assert_eq!(version, Some(1));
347441

348-
let expected_expr = "value < 1000";
349-
assert_eq!(get_constraint_op_params(&mut table).await, expected_expr);
442+
let expected_expr = vec!["value < 1000"];
350443
assert_eq!(
351-
get_constraint(&table, "delta.constraints.valid_values"),
444+
get_constraint_op_params(&mut table)
445+
.await
446+
.into_values()
447+
.collect::<Vec<String>>(),
352448
expected_expr
353449
);
450+
assert_eq!(
451+
get_constraint(&table, "delta.constraints.valid_values"),
452+
expected_expr[0]
453+
);
354454

355455
Ok(())
356456
}
@@ -386,18 +486,24 @@ mod tests {
386486
let version = table.version();
387487
assert_eq!(version, Some(1));
388488

389-
let expected_expr = "\"vAlue\" < 1000"; // spellchecker:disable-line
390-
assert_eq!(get_constraint_op_params(&mut table).await, expected_expr);
489+
let expected_expr = vec!["\"vAlue\" < 1000"]; // spellchecker:disable-line
391490
assert_eq!(
392-
get_constraint(&table, "delta.constraints.valid_values"),
491+
get_constraint_op_params(&mut table)
492+
.await
493+
.into_values()
494+
.collect::<Vec<String>>(),
393495
expected_expr
394496
);
497+
assert_eq!(
498+
get_constraint(&table, "delta.constraints.valid_values"),
499+
expected_expr[0]
500+
);
395501

396502
Ok(())
397503
}
398504

399505
#[tokio::test]
400-
async fn add_conflicting_named_constraint() -> DeltaResult<()> {
506+
async fn test_add_conflicting_named_constraint() -> DeltaResult<()> {
401507
let batch = get_record_batch(None, false);
402508
let write = DeltaOps(create_bare_table())
403509
.write(vec![batch.clone()])
@@ -419,7 +525,7 @@ mod tests {
419525
}
420526

421527
#[tokio::test]
422-
async fn write_data_that_violates_constraint() -> DeltaResult<()> {
528+
async fn test_write_data_that_violates_constraint() -> DeltaResult<()> {
423529
let batch = get_record_batch(None, false);
424530
let write = DeltaOps(create_bare_table())
425531
.write(vec![batch.clone()])
@@ -440,9 +546,40 @@ mod tests {
440546
assert!(err.is_err());
441547
Ok(())
442548
}
549+
#[tokio::test]
550+
async fn test_write_data_that_violates_multiple_constraint() -> DeltaResult<()> {
551+
let batch = get_record_batch(None, false);
552+
let write = DeltaOps(create_bare_table())
553+
.write(vec![batch.clone()])
554+
.await?;
555+
556+
let table = DeltaOps(write)
557+
.add_constraint()
558+
.with_constraints(HashMap::from([
559+
("id", "value > 0"),
560+
("custom_cons", "value < 30"),
561+
]))
562+
.await?;
563+
let table = DeltaOps(table);
564+
let invalid_values: Vec<Arc<dyn Array>> = vec![
565+
Arc::new(StringArray::from(vec!["A"])),
566+
Arc::new(Int32Array::from(vec![-10])),
567+
Arc::new(StringArray::from(vec!["2021-02-02"])),
568+
];
569+
let invalid_values_2: Vec<Arc<dyn Array>> = vec![
570+
Arc::new(StringArray::from(vec!["B"])),
571+
Arc::new(Int32Array::from(vec![30])),
572+
Arc::new(StringArray::from(vec!["2021-02-02"])),
573+
];
574+
let batch = RecordBatch::try_new(get_arrow_schema(&None), invalid_values)?;
575+
let batch2 = RecordBatch::try_new(get_arrow_schema(&None), invalid_values_2)?;
576+
let err = table.write(vec![batch, batch2]).await;
577+
assert!(err.is_err());
578+
Ok(())
579+
}
443580

444581
#[tokio::test]
445-
async fn write_data_that_does_not_violate_constraint() -> DeltaResult<()> {
582+
async fn test_write_data_that_does_not_violate_constraint() -> DeltaResult<()> {
446583
let batch = get_record_batch(None, false);
447584
let write = DeltaOps(create_bare_table())
448585
.write(vec![batch.clone()])

crates/core/src/operations/drop_constraints.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,8 @@ impl std::future::IntoFuture for DropConstraintBuilder {
135135
#[cfg(feature = "datafusion")]
136136
#[cfg(test)]
137137
mod tests {
138+
use std::collections::HashMap;
139+
138140
use crate::writer::test_utils::{create_bare_table, get_record_batch};
139141
use crate::{DeltaOps, DeltaResult, DeltaTable};
140142

0 commit comments

Comments
 (0)