-
Notifications
You must be signed in to change notification settings - Fork 29.2k
[SPARK-40398][CORE][SQL] Use Loop instead of Arrays.stream api #37843
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
5c92faa
71ff5ff
5c5365f
bc19399
1ecf017
9c979c4
e90991c
8ff3b77
a16a89b
1ee7c4a
46d4a57
a64f608
e00330f
4892423
1a6a3c8
528f3c5
2343dc8
db94018
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -113,10 +113,28 @@ public OneForOneBlockFetcher( | |
| * @return whether the array contains only shuffle block IDs | ||
| */ | ||
| private boolean areShuffleBlocksOrChunks(String[] blockIds) { | ||
| if (Arrays.stream(blockIds).anyMatch(blockId -> !blockId.startsWith(SHUFFLE_BLOCK_PREFIX))) { | ||
| if (isAnyBlockNotStartWithShuffleBlockPrefix(blockIds)) { | ||
| // It comes here because there is a blockId which doesn't have "shuffle_" prefix so we | ||
| // check if all the block ids are shuffle chunk Ids. | ||
| return Arrays.stream(blockIds).allMatch(blockId -> blockId.startsWith(SHUFFLE_CHUNK_PREFIX)); | ||
| return isAllBlocksStartWithShuffleChunkPrefix(blockIds); | ||
| } | ||
| return true; | ||
| } | ||
|
|
||
| private boolean isAnyBlockNotStartWithShuffleBlockPrefix(String[] blockIds) { | ||
|
||
| for (String blockId : blockIds) { | ||
| if (!blockId.startsWith(SHUFFLE_BLOCK_PREFIX)) { | ||
| return true; | ||
| } | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| private boolean isAllBlocksStartWithShuffleChunkPrefix(String[] blockIds) { | ||
LuciferYang marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| for (String blockId : blockIds) { | ||
| if (!blockId.startsWith(SHUFFLE_CHUNK_PREFIX)) { | ||
| return false; | ||
| } | ||
mridulm marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| return true; | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,12 +42,20 @@ class FilteredObjectInputStream extends ObjectInputStream { | |
| protected Class<?> resolveClass(ObjectStreamClass desc) | ||
| throws IOException, ClassNotFoundException { | ||
|
|
||
| boolean isValid = ALLOWED_PACKAGES.stream().anyMatch(p -> desc.getName().startsWith(p)); | ||
| boolean isValid = isValid(desc); | ||
| if (!isValid) { | ||
| throw new IllegalArgumentException( | ||
| String.format("Unexpected class in stream: %s", desc.getName())); | ||
| } | ||
| return super.resolveClass(desc); | ||
| } | ||
|
|
||
| private boolean isValid(ObjectStreamClass desc) { | ||
| for (String p : ALLOWED_PACKAGES) { | ||
| if (desc.getName().startsWith(p)) { | ||
| return true; | ||
| } | ||
| } | ||
| return false; | ||
| } | ||
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,7 +17,7 @@ | |
|
|
||
| package org.apache.spark.sql.connector.expressions; | ||
|
|
||
| import java.util.Arrays; | ||
| import java.util.*; | ||
|
||
|
|
||
| import org.apache.spark.annotation.Evolving; | ||
|
|
||
|
|
@@ -44,7 +44,16 @@ public interface Expression { | |
| * List of fields or columns that are referenced by this expression. | ||
| */ | ||
| default NamedReference[] references() { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Compare and
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use following method build test object and test -1, 5, 100
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Java 8 Java 11 Java 17
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For Java 11 and 17, using loop looks more better, |
||
| return Arrays.stream(children()).map(e -> e.references()) | ||
| .flatMap(Arrays::stream).distinct().toArray(NamedReference[]::new); | ||
| List<NamedReference> list = new ArrayList<>(); | ||
| Set<NamedReference> uniqueValues = new HashSet<>(); | ||
| for (Expression e : children()) { | ||
| NamedReference[] references = e.references(); | ||
| for (NamedReference reference : references) { | ||
| if (uniqueValues.add(reference)) { | ||
| list.add(reference); | ||
| } | ||
| } | ||
| } | ||
| return list.toArray(new NamedReference[0]); | ||
|
||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,10 +17,9 @@ | |
|
|
||
| package org.apache.spark.sql.connector.util; | ||
|
|
||
| import java.util.Arrays; | ||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.StringJoiner; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| import org.apache.spark.sql.connector.expressions.Cast; | ||
| import org.apache.spark.sql.connector.expressions.Expression; | ||
|
|
@@ -62,8 +61,7 @@ public String build(Expression expr) { | |
| String name = e.name(); | ||
| switch (name) { | ||
| case "IN": { | ||
| List<String> children = | ||
| Arrays.stream(e.children()).map(c -> build(c)).collect(Collectors.toList()); | ||
| List<String> children = expressionsToStringList(e.children()); | ||
|
||
| return visitIn(children.get(0), children.subList(1, children.size())); | ||
| } | ||
| case "IS_NULL": | ||
|
|
@@ -159,63 +157,56 @@ public String build(Expression expr) { | |
| case "BIT_LENGTH": | ||
| case "CHAR_LENGTH": | ||
| case "CONCAT": | ||
| return visitSQLFunction(name, | ||
| Arrays.stream(e.children()).map(c -> build(c)).toArray(String[]::new)); | ||
| return visitSQLFunction(name, expressionsToStringArray(e.children())); | ||
| case "CASE_WHEN": { | ||
| List<String> children = | ||
| Arrays.stream(e.children()).map(c -> build(c)).collect(Collectors.toList()); | ||
| return visitCaseWhen(children.toArray(new String[e.children().length])); | ||
| return visitCaseWhen(expressionsToStringArray(e.children())); | ||
| } | ||
| case "TRIM": | ||
| return visitTrim("BOTH", | ||
| Arrays.stream(e.children()).map(c -> build(c)).toArray(String[]::new)); | ||
| return visitTrim("BOTH", expressionsToStringArray(e.children())); | ||
| case "LTRIM": | ||
| return visitTrim("LEADING", | ||
| Arrays.stream(e.children()).map(c -> build(c)).toArray(String[]::new)); | ||
| return visitTrim("LEADING", expressionsToStringArray(e.children())); | ||
| case "RTRIM": | ||
| return visitTrim("TRAILING", | ||
| Arrays.stream(e.children()).map(c -> build(c)).toArray(String[]::new)); | ||
| return visitTrim("TRAILING", expressionsToStringArray(e.children())); | ||
| case "OVERLAY": | ||
| return visitOverlay( | ||
| Arrays.stream(e.children()).map(c -> build(c)).toArray(String[]::new)); | ||
| return visitOverlay(expressionsToStringArray(e.children())); | ||
| // TODO supports other expressions | ||
| default: | ||
| return visitUnexpectedExpr(expr); | ||
| } | ||
| } else if (expr instanceof Min) { | ||
| Min min = (Min) expr; | ||
| return visitAggregateFunction("MIN", false, | ||
| Arrays.stream(min.children()).map(c -> build(c)).toArray(String[]::new)); | ||
| expressionsToStringArray(min.children())); | ||
| } else if (expr instanceof Max) { | ||
| Max max = (Max) expr; | ||
| return visitAggregateFunction("MAX", false, | ||
| Arrays.stream(max.children()).map(c -> build(c)).toArray(String[]::new)); | ||
| expressionsToStringArray(max.children())); | ||
| } else if (expr instanceof Count) { | ||
| Count count = (Count) expr; | ||
| return visitAggregateFunction("COUNT", count.isDistinct(), | ||
| Arrays.stream(count.children()).map(c -> build(c)).toArray(String[]::new)); | ||
| expressionsToStringArray(count.children())); | ||
| } else if (expr instanceof Sum) { | ||
| Sum sum = (Sum) expr; | ||
| return visitAggregateFunction("SUM", sum.isDistinct(), | ||
| Arrays.stream(sum.children()).map(c -> build(c)).toArray(String[]::new)); | ||
| expressionsToStringArray(sum.children())); | ||
| } else if (expr instanceof CountStar) { | ||
| return visitAggregateFunction("COUNT", false, new String[]{"*"}); | ||
| } else if (expr instanceof Avg) { | ||
| Avg avg = (Avg) expr; | ||
| return visitAggregateFunction("AVG", avg.isDistinct(), | ||
| Arrays.stream(avg.children()).map(c -> build(c)).toArray(String[]::new)); | ||
| expressionsToStringArray(avg.children())); | ||
| } else if (expr instanceof GeneralAggregateFunc) { | ||
| GeneralAggregateFunc f = (GeneralAggregateFunc) expr; | ||
| return visitAggregateFunction(f.name(), f.isDistinct(), | ||
| Arrays.stream(f.children()).map(c -> build(c)).toArray(String[]::new)); | ||
| expressionsToStringArray(f.children())); | ||
| } else if (expr instanceof UserDefinedScalarFunc) { | ||
| UserDefinedScalarFunc f = (UserDefinedScalarFunc) expr; | ||
| return visitUserDefinedScalarFunction(f.name(), f.canonicalName(), | ||
| Arrays.stream(f.children()).map(c -> build(c)).toArray(String[]::new)); | ||
| expressionsToStringArray(f.children())); | ||
| } else if (expr instanceof UserDefinedAggregateFunc) { | ||
| UserDefinedAggregateFunc f = (UserDefinedAggregateFunc) expr; | ||
| return visitUserDefinedAggregateFunction(f.name(), f.canonicalName(), f.isDistinct(), | ||
| Arrays.stream(f.children()).map(c -> build(c)).toArray(String[]::new)); | ||
| expressionsToStringArray(f.children())); | ||
| } else { | ||
| return visitUnexpectedExpr(expr); | ||
| } | ||
|
|
@@ -393,4 +384,20 @@ private String joinListToString( | |
| } | ||
| return joiner.toString(); | ||
| } | ||
|
|
||
| private String[] expressionsToStringArray(Expression[] expressions) { | ||
LuciferYang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| String[] result = new String[expressions.length]; | ||
| for (int i = 0; i < expressions.length; i++) { | ||
| result[i] = build(expressions[i]); | ||
| } | ||
| return result; | ||
| } | ||
|
|
||
| private List<String> expressionsToStringList(Expression[] expressions) { | ||
| List<String> list = new ArrayList<>(expressions.length); | ||
LuciferYang marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| for (Expression expression : expressions) { | ||
| list.add(build(expression)); | ||
| } | ||
| return list; | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.