Skip to content
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,30 @@ public OneForOneBlockFetcher(
* @return whether the array contains only shuffle block IDs
*/
private boolean areShuffleBlocksOrChunks(String[] blockIds) {
if (Arrays.stream(blockIds).anyMatch(blockId -> !blockId.startsWith(SHUFFLE_BLOCK_PREFIX))) {
if (isAnyBlockNotStartWithShuffleBlockPrefix(blockIds)) {
// It comes here because there is a blockId which doesn't have "shuffle_" prefix so we
// check if all the block ids are shuffle chunk Ids.
return Arrays.stream(blockIds).allMatch(blockId -> blockId.startsWith(SHUFFLE_CHUNK_PREFIX));
return isAllBlocksStartWithShuffleChunkPrefix(blockIds);
}
return true;
}

// SPARK-40398: Replace `Arrays.stream().anyMatch()` with this method due to perf gain.
private static boolean isAnyBlockNotStartWithShuffleBlockPrefix(String[] blockIds) {
for (String blockId : blockIds) {
if (!blockId.startsWith(SHUFFLE_BLOCK_PREFIX)) {
return true;
}
}
return false;
}

// SPARK-40398: Replace `Arrays.stream().allMatch()` with this method due to perf gain.
private static boolean isAllBlocksStartWithShuffleChunkPrefix(String[] blockIds) {
for (String blockId : blockIds) {
if (!blockId.startsWith(SHUFFLE_CHUNK_PREFIX)) {
return false;
}
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.spark.sql.connector.expressions;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import org.apache.spark.annotation.Evolving;

Expand All @@ -44,7 +46,12 @@ public interface Expression {
* List of fields or columns that are referenced by this expression.
*/
default NamedReference[] references() {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compare

public static TestValue[] distinctUseStreamApi(TestObj[] input) {
      return Arrays.stream(input).map(s -> s.values)
        .flatMap(Arrays::stream).distinct().toArray(TestValue[]::new);
    }

and

 public static TestValue[] distinctUseLoopApi(TestObj[] input) {
        List<TestValue> list = new ArrayList<>();
        Set<TestValue> uniqueValues = new HashSet<>();
        for (TestObj s : input) {
            TestValue[] values = s.values;
            for (TestValue testValue : values) {
                if (uniqueValues.add(testValue)) {
                    list.add(testValue);
                }
            }
        }
        return list.toArray(new TestValue[0]);
    }

TestValue and TestObj define as follows:

public static class TestObj {
        TestValue[] values;

        public TestObj(int size, int range) {
            values = new TestValue[size];
            for (int i = 0; i < values.length; i++) {
                values[i] = new TestValue(RandomUtils.nextInt(0, range));
            }
        }
    }

    public static class TestValue {
        private int value;

        public TestValue(int value) {
            this.value = value;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            TestValue testValue = (TestValue) o;
            return value == testValue.value;
        }

        @Override
        public int hashCode() {
            return Objects.hashCode(value);
        }
    }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use following method build test object

 public static TestObj[] objs(int length, int size, int range) {
        TestObj[] objects = new TestObj[length];
        for (int i = 0; i < length; i++) {
            objects[i] = new TestObj(size, range);
        }
        return objects;
    }

and test length, size, range:

-1, 5, 100

  • 5, 5, 100
  • 10, 5, 100
  • 20, 5, 100
  • 50, 5, 100
  • 100, 5, 100
  • 500, 5, 100
  • 1000, 5, 100

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java 8

OpenJDK 64-Bit Server VM 1.8.0_345-b01 on Linux 5.15.0-1019-azure
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
Test for distinct with input size 1:      Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Use Arrays.steam api                                 35             35           1          2.8         351.9       1.0X
Use Loop api                                         18             18           0          5.5         180.6       1.9X

OpenJDK 64-Bit Server VM 1.8.0_345-b01 on Linux 5.15.0-1019-azure
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
Test for distinct with input size 5:      Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Use Arrays.steam api                                129            130           1          0.8        1288.7       1.0X
Use Loop api                                         82             83           1          1.2         824.4       1.6X

OpenJDK 64-Bit Server VM 1.8.0_345-b01 on Linux 5.15.0-1019-azure
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
Test for distinct with input size 10:     Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Use Arrays.steam api                                228            229           1          0.4        2280.0       1.0X
Use Loop api                                        160            161           1          0.6        1599.7       1.4X

OpenJDK 64-Bit Server VM 1.8.0_345-b01 on Linux 5.15.0-1019-azure
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
Test for distinct with input size 20:     Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Use Arrays.steam api                                430            431           1          0.2        4301.0       1.0X
Use Loop api                                        311            312           1          0.3        3109.9       1.4X

OpenJDK 64-Bit Server VM 1.8.0_345-b01 on Linux 5.15.0-1019-azure
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
Test for distinct with input size 50:     Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Use Arrays.steam api                                860            862           2          0.1        8597.6       1.0X
Use Loop api                                        701            702           1          0.1        7013.1       1.2X

OpenJDK 64-Bit Server VM 1.8.0_345-b01 on Linux 5.15.0-1019-azure
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
Test for distinct with input size 100:    Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Use Arrays.steam api                               1454           1456           3          0.1       14540.1       1.0X
Use Loop api                                       1317           1318           2          0.1       13168.9       1.1X

OpenJDK 64-Bit Server VM 1.8.0_345-b01 on Linux 5.15.0-1019-azure
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
Test for distinct with input size 500:    Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Use Arrays.steam api                               5584           5586           2          0.0       55841.2       1.0X
Use Loop api                                       5784           5786           3          0.0       57839.1       1.0X

OpenJDK 64-Bit Server VM 1.8.0_345-b01 on Linux 5.15.0-1019-azure
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
Test for distinct with input size 1000:   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Use Arrays.steam api                              10727          10728           2          0.0      107266.4       1.0X
Use Loop api                                      10534          10535           1          0.0      105342.5       1.0X

Java 11

OpenJDK 64-Bit Server VM 11.0.16+8-LTS on Linux 5.15.0-1019-azure
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
Test for distinct with input size 1:      Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Use Arrays.steam api                                 41             42           1          2.4         408.5       1.0X
Use Loop api                                         22             23           1          4.5         224.4       1.8X

OpenJDK 64-Bit Server VM 11.0.16+8-LTS on Linux 5.15.0-1019-azure
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
Test for distinct with input size 5:      Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Use Arrays.steam api                                159            160           1          0.6        1594.5       1.0X
Use Loop api                                         86             87           0          1.2         864.7       1.8X

OpenJDK 64-Bit Server VM 11.0.16+8-LTS on Linux 5.15.0-1019-azure
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
Test for distinct with input size 10:     Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Use Arrays.steam api                                275            276           2          0.4        2748.0       1.0X
Use Loop api                                        167            169           3          0.6        1673.5       1.6X

OpenJDK 64-Bit Server VM 11.0.16+8-LTS on Linux 5.15.0-1019-azure
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
Test for distinct with input size 20:     Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Use Arrays.steam api                                511            513           2          0.2        5113.5       1.0X
Use Loop api                                        315            317           2          0.3        3151.8       1.6X

OpenJDK 64-Bit Server VM 11.0.16+8-LTS on Linux 5.15.0-1019-azure
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
Test for distinct with input size 50:     Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Use Arrays.steam api                               1012           1014           2          0.1       10118.2       1.0X
Use Loop api                                        675            677           2          0.1        6747.0       1.5X

OpenJDK 64-Bit Server VM 11.0.16+8-LTS on Linux 5.15.0-1019-azure
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
Test for distinct with input size 100:    Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Use Arrays.steam api                               1665           1667           3          0.1       16645.2       1.0X
Use Loop api                                       1253           1254           1          0.1       12528.3       1.3X

OpenJDK 64-Bit Server VM 11.0.16+8-LTS on Linux 5.15.0-1019-azure
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
Test for distinct with input size 500:    Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Use Arrays.steam api                               6305           6308           5          0.0       63046.3       1.0X
Use Loop api                                       5375           5376           1          0.0       53751.0       1.2X

OpenJDK 64-Bit Server VM 11.0.16+8-LTS on Linux 5.15.0-1019-azure
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
Test for distinct with input size 1000:   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Use Arrays.steam api                              12081          12083           3          0.0      120806.6       1.0X
Use Loop api                                      10463          10467           5          0.0      104634.7       1.2X

Java 17

OpenJDK 64-Bit Server VM 17.0.4+8-LTS on Linux 5.15.0-1019-azure
Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
Test for distinct with input size 1:      Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Use Arrays.steam api                                 33             36           2          3.1         325.2       1.0X
Use Loop api                                         16             18           2          6.1         164.4       2.0X

OpenJDK 64-Bit Server VM 17.0.4+8-LTS on Linux 5.15.0-1019-azure
Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
Test for distinct with input size 5:      Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Use Arrays.steam api                                103            111           5          1.0        1032.9       1.0X
Use Loop api                                         75             80           3          1.3         746.4       1.4X

OpenJDK 64-Bit Server VM 17.0.4+8-LTS on Linux 5.15.0-1019-azure
Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
Test for distinct with input size 10:     Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Use Arrays.steam api                                202            210           5          0.5        2022.3       1.0X
Use Loop api                                        152            164           8          0.7        1522.6       1.3X

OpenJDK 64-Bit Server VM 17.0.4+8-LTS on Linux 5.15.0-1019-azure
Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
Test for distinct with input size 20:     Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Use Arrays.steam api                                345            362          14          0.3        3446.2       1.0X
Use Loop api                                        283            299          15          0.4        2827.3       1.2X

OpenJDK 64-Bit Server VM 17.0.4+8-LTS on Linux 5.15.0-1019-azure
Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
Test for distinct with input size 50:     Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Use Arrays.steam api                                729            767          33          0.1        7295.0       1.0X
Use Loop api                                        581            598          12          0.2        5811.8       1.3X

OpenJDK 64-Bit Server VM 17.0.4+8-LTS on Linux 5.15.0-1019-azure
Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
Test for distinct with input size 100:    Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Use Arrays.steam api                               1370           1381          16          0.1       13700.8       1.0X
Use Loop api                                       1107           1114          10          0.1       11070.0       1.2X

OpenJDK 64-Bit Server VM 17.0.4+8-LTS on Linux 5.15.0-1019-azure
Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
Test for distinct with input size 500:    Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Use Arrays.steam api                               6541           6545           7          0.0       65405.0       1.0X
Use Loop api                                       4694           4782         124          0.0       46939.4       1.4X

OpenJDK 64-Bit Server VM 17.0.4+8-LTS on Linux 5.15.0-1019-azure
Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
Test for distinct with input size 1000:   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Use Arrays.steam api                              11999          12185         263          0.0      119990.3       1.0X
Use Loop api                                       9282           9366         118          0.0       92822.1       1.3X

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Java 11 and 17, using loop looks more better,

return Arrays.stream(children()).map(e -> e.references())
.flatMap(Arrays::stream).distinct().toArray(NamedReference[]::new);
// SPARK-40398: Replace `Arrays.stream()...distinct()`
// to this for perf gain, the result order is not important.
Set<NamedReference> set = new HashSet<>();
for (Expression e : children()) {
Collections.addAll(set, e.references());
}
return set.toArray(new NamedReference[0]);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have one last tiny suggestion - either pass an array of size set.size(), or make a static final empty array and pass it here, to avoid allocating an empty array. It's tiny but hey we are micro-optimizing.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

choice make a static final empty array and pass it here due to using an empty array is more recommended and the empty array is only used to get the target array class type.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Install dependencies for documentation generation failed, not relate to this one, re-run it

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nstall dependencies for documentation generation still failed

https://github.com/LuciferYang/spark/actions/runs/3064658259/jobs/4947983207

 × Building wheel for pyzmq (pyproject.toml) did not run successfully.
  │ exit code: 1
  ╰─> [186 lines of output]
      /tmp/pip-build-env-812k46kb/overlay/lib/python3.9/site-packages/setuptools/_distutils/dist.py:262: UserWarning: Unknown distribution option: 'cffi_modules'
        warnings.warn(msg)
      running bdist_wheel
      running build
      running build_py
      copying zmq/error.py -> build/lib.linux-x86_64-cpython-39/zmq
      copying zmq/asyncio.py -> build/lib.linux-x86_64-cpython-39/zmq
      copying zmq/_future.py -> build/lib.linux-x86_64-cpython-39/zmq
      copying zmq/_typing.py -> build/lib.linux-x86_64-cpython-39/zmq
      copying zmq/decorators.py -> build/lib.linux-x86_64-cpython-39/zmq
      copying zmq/constants.py -> build/lib.linux-x86_64-cpython-39/zmq
      copying zmq/__init__.py -> build/lib.linux-x86_64-cpython-39/zmq
      creating build/lib.linux-x86_64-cpython-39/zmq/log
      copying zmq/log/__main__.py -> build/lib.linux-x86_64-cpython-39/zmq/log
      copying zmq/log/handlers.py -> build/lib.linux-x86_64-cpython-39/zmq/log
      copying zmq/log/__init__.py -> build/lib.linux-x86_64-cpython-39/zmq/log
      creating build/lib.linux-x86_64-cpython-39/zmq/green
      copying zmq/green/device.py -> build/lib.linux-x86_64-cpython-39/zmq/green
      copying zmq/green/poll.py -> build/lib.linux-x86_64-cpython-39/zmq/green
      copying zmq/green/core.py -> build/lib.linux-x86_64-cpython-39/zmq/green
      copying zmq/green/__init__.py -> build/lib.linux-x86_64-cpython-39/zmq/green
      creating build/lib.linux-x86_64-cpython-39/zmq/green/eventloop
      copying zmq/green/eventloop/ioloop.py -> build/lib.linux-x86_64-cpython-39/zmq/green/eventloop
      copying zmq/green/eventloop/zmqstream.py -> build/lib.linux-x86_64-cpython-39/zmq/green/eventloop
      copying zmq/green/eventloop/__init__.py -> build/lib.linux-x86_64-cpython-39/zmq/green/eventloop
      creating build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/test_imports.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/test_future.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/test_asyncio.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/test_win32_shim.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/test_etc.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/test_draft.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/test_message.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/test_monqueue.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/test_multipart.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/test_error.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/test_constants.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/test_poll.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/conftest.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/test_monitor.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/test_security.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/test_context.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/test_mypy.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/test_ssh.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/test_z85.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/test_cffi_backend.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/test_ioloop.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/test_auth.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/test_pair.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/test_device.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/test_includes.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/test_socket.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/test_cython.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/test_log.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/test_pubsub.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/test_ext.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/test_zmqstream.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/test_reqrep.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/test_proxy_steerable.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/__init__.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/test_version.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/test_retry_eintr.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      copying zmq/tests/test_decorators.py -> build/lib.linux-x86_64-cpython-39/zmq/tests
      creating build/lib.linux-x86_64-cpython-39/zmq/devices
      copying zmq/devices/proxydevice.py -> build/lib.linux-x86_64-cpython-39/zmq/devices
      copying zmq/devices/monitoredqueuedevice.py -> build/lib.linux-x86_64-cpython-39/zmq/devices
      copying zmq/devices/proxysteerabledevice.py -> build/lib.linux-x86_64-cpython-39/zmq/devices
      copying zmq/devices/monitoredqueue.py -> build/lib.linux-x86_64-cpython-39/zmq/devices
      copying zmq/devices/basedevice.py -> build/lib.linux-x86_64-cpython-39/zmq/devices
      copying zmq/devices/__init__.py -> build/lib.linux-x86_64-cpython-39/zmq/devices
      creating build/lib.linux-x86_64-cpython-39/zmq/auth
      copying zmq/auth/asyncio.py -> build/lib.linux-x86_64-cpython-39/zmq/auth
      copying zmq/auth/certs.py -> build/lib.linux-x86_64-cpython-39/zmq/auth
      copying zmq/auth/ioloop.py -> build/lib.linux-x86_64-cpython-39/zmq/auth
      copying zmq/auth/base.py -> build/lib.linux-x86_64-cpython-39/zmq/auth
      copying zmq/auth/__init__.py -> build/lib.linux-x86_64-cpython-39/zmq/auth
      copying zmq/auth/thread.py -> build/lib.linux-x86_64-cpython-39/zmq/auth
      creating build/lib.linux-x86_64-cpython-39/zmq/utils
      copying zmq/utils/interop.py -> build/lib.linux-x86_64-cpython-39/zmq/utils
      copying zmq/utils/garbage.py -> build/lib.linux-x86_64-cpython-39/zmq/utils
      copying zmq/utils/win32.py -> build/lib.linux-x86_64-cpython-39/zmq/utils
      copying zmq/utils/jsonapi.py -> build/lib.linux-x86_64-cpython-39/zmq/utils
      copying zmq/utils/monitor.py -> build/lib.linux-x86_64-cpython-39/zmq/utils
      copying zmq/utils/z85.py -> build/lib.linux-x86_64-cpython-39/zmq/utils
      copying zmq/utils/strtypes.py -> build/lib.linux-x86_64-cpython-39/zmq/utils
      copying zmq/utils/__init__.py -> build/lib.linux-x86_64-cpython-39/zmq/utils
      creating build/lib.linux-x86_64-cpython-39/zmq/backend
      copying zmq/backend/select.py -> build/lib.linux-x86_64-cpython-39/zmq/backend
      copying zmq/backend/__init__.py -> build/lib.linux-x86_64-cpython-39/zmq/backend
      creating build/lib.linux-x86_64-cpython-39/zmq/backend/cython
      copying zmq/backend/cython/__init__.py -> build/lib.linux-x86_64-cpython-39/zmq/backend/cython
      creating build/lib.linux-x86_64-cpython-39/zmq/backend/cffi
      copying zmq/backend/cffi/error.py -> build/lib.linux-x86_64-cpython-39/zmq/backend/cffi
      copying zmq/backend/cffi/context.py -> build/lib.linux-x86_64-cpython-39/zmq/backend/cffi
      copying zmq/backend/cffi/message.py -> build/lib.linux-x86_64-cpython-39/zmq/backend/cffi
      copying zmq/backend/cffi/_poll.py -> build/lib.linux-x86_64-cpython-39/zmq/backend/cffi
      copying zmq/backend/cffi/devices.py -> build/lib.linux-x86_64-cpython-39/zmq/backend/cffi
      copying zmq/backend/cffi/socket.py -> build/lib.linux-x86_64-cpython-39/zmq/backend/cffi
      copying zmq/backend/cffi/utils.py -> build/lib.linux-x86_64-cpython-39/zmq/backend/cffi
      copying zmq/backend/cffi/__init__.py -> build/lib.linux-x86_64-cpython-39/zmq/backend/cffi
      creating build/lib.linux-x86_64-cpython-39/zmq/sugar
      copying zmq/sugar/version.py -> build/lib.linux-x86_64-cpython-39/zmq/sugar
      copying zmq/sugar/tracker.py -> build/lib.linux-x86_64-cpython-39/zmq/sugar
      copying zmq/sugar/context.py -> build/lib.linux-x86_64-cpython-39/zmq/sugar
      copying zmq/sugar/attrsettr.py -> build/lib.linux-x86_64-cpython-39/zmq/sugar
      copying zmq/sugar/poll.py -> build/lib.linux-x86_64-cpython-39/zmq/sugar
      copying zmq/sugar/frame.py -> build/lib.linux-x86_64-cpython-39/zmq/sugar
      copying zmq/sugar/stopwatch.py -> build/lib.linux-x86_64-cpython-39/zmq/sugar
      copying zmq/sugar/socket.py -> build/lib.linux-x86_64-cpython-39/zmq/sugar
      copying zmq/sugar/__init__.py -> build/lib.linux-x86_64-cpython-39/zmq/sugar
      creating build/lib.linux-x86_64-cpython-39/zmq/ssh
      copying zmq/ssh/tunnel.py -> build/lib.linux-x86_64-cpython-39/zmq/ssh
      copying zmq/ssh/forward.py -> build/lib.linux-x86_64-cpython-39/zmq/ssh
      copying zmq/ssh/__init__.py -> build/lib.linux-x86_64-cpython-39/zmq/ssh
      creating build/lib.linux-x86_64-cpython-39/zmq/eventloop
      copying zmq/eventloop/ioloop.py -> build/lib.linux-x86_64-cpython-39/zmq/eventloop
      copying zmq/eventloop/zmqstream.py -> build/lib.linux-x86_64-cpython-39/zmq/eventloop
      copying zmq/eventloop/future.py -> build/lib.linux-x86_64-cpython-39/zmq/eventloop
      copying zmq/eventloop/_deprecated.py -> build/lib.linux-x86_64-cpython-39/zmq/eventloop
      copying zmq/eventloop/__init__.py -> build/lib.linux-x86_64-cpython-39/zmq/eventloop
      creating build/lib.linux-x86_64-cpython-39/zmq/eventloop/minitornado
      copying zmq/eventloop/minitornado/log.py -> build/lib.linux-x86_64-cpython-39/zmq/eventloop/minitornado
      copying zmq/eventloop/minitornado/stack_context.py -> build/lib.linux-x86_64-cpython-39/zmq/eventloop/minitornado
      copying zmq/eventloop/minitornado/concurrent.py -> build/lib.linux-x86_64-cpython-39/zmq/eventloop/minitornado
      copying zmq/eventloop/minitornado/ioloop.py -> build/lib.linux-x86_64-cpython-39/zmq/eventloop/minitornado
      copying zmq/eventloop/minitornado/util.py -> build/lib.linux-x86_64-cpython-39/zmq/eventloop/minitornado
      copying zmq/eventloop/minitornado/__init__.py -> build/lib.linux-x86_64-cpython-39/zmq/eventloop/minitornado
      creating build/lib.linux-x86_64-cpython-39/zmq/eventloop/minitornado/platform
      copying zmq/eventloop/minitornado/platform/interface.py -> build/lib.linux-x86_64-cpython-39/zmq/eventloop/minitornado/platform
      copying zmq/eventloop/minitornado/platform/auto.py -> build/lib.linux-x86_64-cpython-39/zmq/eventloop/minitornado/platform
      copying zmq/eventloop/minitornado/platform/common.py -> build/lib.linux-x86_64-cpython-39/zmq/eventloop/minitornado/platform
      copying zmq/eventloop/minitornado/platform/windows.py -> build/lib.linux-x86_64-cpython-39/zmq/eventloop/minitornado/platform
      copying zmq/eventloop/minitornado/platform/posix.py -> build/lib.linux-x86_64-cpython-39/zmq/eventloop/minitornado/platform
      copying zmq/eventloop/minitornado/platform/__init__.py -> build/lib.linux-x86_64-cpython-39/zmq/eventloop/minitornado/platform
      copying zmq/__init__.pxd -> build/lib.linux-x86_64-cpython-39/zmq
      copying zmq/__init__.pyi -> build/lib.linux-x86_64-cpython-39/zmq
      copying zmq/py.typed -> build/lib.linux-x86_64-cpython-39/zmq
      copying zmq/devices/monitoredqueue.pxd -> build/lib.linux-x86_64-cpython-39/zmq/devices
      copying zmq/utils/buffers.pxd -> build/lib.linux-x86_64-cpython-39/zmq/utils
      copying zmq/utils/zmq_compat.h -> build/lib.linux-x86_64-cpython-39/zmq/utils
      copying zmq/utils/pyversion_compat.h -> build/lib.linux-x86_64-cpython-39/zmq/utils
      copying zmq/utils/mutex.h -> build/lib.linux-x86_64-cpython-39/zmq/utils
      copying zmq/utils/ipcmaxlen.h -> build/lib.linux-x86_64-cpython-39/zmq/utils
      copying zmq/utils/getpid_compat.h -> build/lib.linux-x86_64-cpython-39/zmq/utils
      copying zmq/utils/compiler.json -> build/lib.linux-x86_64-cpython-39/zmq/utils
      copying zmq/utils/config.json -> build/lib.linux-x86_64-cpython-39/zmq/utils
      copying zmq/backend/__init__.pyi -> build/lib.linux-x86_64-cpython-39/zmq/backend
      copying zmq/backend/cython/context.pxd -> build/lib.linux-x86_64-cpython-39/zmq/backend/cython
      copying zmq/backend/cython/socket.pxd -> build/lib.linux-x86_64-cpython-39/zmq/backend/cython
      copying zmq/backend/cython/__init__.pxd -> build/lib.linux-x86_64-cpython-39/zmq/backend/cython
      copying zmq/backend/cython/message.pxd -> build/lib.linux-x86_64-cpython-39/zmq/backend/cython
      copying zmq/backend/cython/checkrc.pxd -> build/lib.linux-x86_64-cpython-39/zmq/backend/cython
      copying zmq/backend/cython/libzmq.pxd -> build/lib.linux-x86_64-cpython-39/zmq/backend/cython
      copying zmq/backend/cython/constant_enums.pxi -> build/lib.linux-x86_64-cpython-39/zmq/backend/cython
      copying zmq/backend/cffi/_cdefs.h -> build/lib.linux-x86_64-cpython-39/zmq/backend/cffi
      copying zmq/sugar/__init__.pyi -> build/lib.linux-x86_64-cpython-39/zmq/sugar
      running build_ext
      running configure
      Using bundled libzmq
      already have bundled/zeromq
      already have platform.hpp
      checking for timer_create
      ************************************************
      ************************************************
      x86_64-linux-gnu-gcc -pthread -Wno-unused-result -Wsign-compare -DNDEBUG -g -fwrapv -O2 -Wall -g -fstack-protector-strong -Wformat -Werror=format-security -g -fwrapv -O2 -fPIC -I/usr/include/python3.9 -c /tmp/timer_createwdqm_dcl.c -o tmp/timer_createwdqm_dcl.o
      /tmp/timer_createwdqm_dcl.c: In function ‘main’:
      /tmp/timer_createwdqm_dcl.c:2:5: warning: implicit declaration of function ‘timer_create’ [-Wimplicit-function-declaration]
          2 |     timer_create();
            |     ^~~~~~~~~~~~
      x86_64-linux-gnu-gcc -pthread tmp/timer_createwdqm_dcl.o -L/usr/lib/x86_64-linux-gnu -o a.out
      /usr/bin/ld: tmp/timer_createwdqm_dcl.o: in function `main':
      /tmp/timer_createwdqm_dcl.c:2: undefined reference to `timer_create'
      collect2: error: ld returned 1 exit status
      no timer_create, linking librt
      ************************************************
      building 'zmq.libzmq' extension
      creating build/temp.linux-x86_64-cpython-39/buildutils
      creating build/temp.linux-x86_64-cpython-39/bundled
      creating build/temp.linux-x86_64-cpython-39/bundled/zeromq
      creating build/temp.linux-x86_64-cpython-39/bundled/zeromq/src
      x86_64-linux-gnu-g++ -pthread -std=c++11 -pthread -Wno-unused-result -Wsign-compare -DNDEBUG -g -fwrapv -O2 -Wall -g -fstack-protector-strong -Wformat -Werror=format-security -g -fwrapv -O2 -fPIC -DZMQ_HAVE_CURVE=1 -DZMQ_USE_TWEETNACL=1 -DZMQ_USE_EPOLL=1 -DZMQ_IOTHREADS_USE_EPOLL=1 -DZMQ_POLL_BASED_ON_POLL=1 -Ibundled/zeromq/include -Ibundled -I/usr/include/python3.9 -c buildutils/initlibzmq.cpp -o build/temp.linux-x86_64-cpython-39/buildutils/initlibzmq.o
      buildutils/initlibzmq.cpp:10:10: fatal error: Python.h: No such file or directory
         10 | #include "Python.h"
            |          ^~~~~~~~~~
      compilation terminated.
      error: command '/usr/bin/x86_64-linux-gnu-g++' failed with exit code 1
      [end of output]
  
  note: This error originates from a subprocess, and is likely not a problem with pip.
  ERROR: Failed building wheel for pyzmq
ERROR: Could not build wheels for pyzmq, which is required to install pyproject.toml-based projects
Failed to build pyzmq
Error: Process completed with exit code 1.

friendly ping @HyukjinKwon @Yikun to help to check this

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess already addressed here: #37904

mind to rebase?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

has been rebased, thank you @Yikun

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change to Python linter check failed...
https://github.com/LuciferYang/spark/actions/runs/3065275580/jobs/4949221030

starting mypy annotations test...
annotations failed mypy checks:
python/pyspark/pandas/window.py:112: error: Module has no attribute "lit"  [attr-defined]
Found 1 error in 1 file (checked 340 source files)
1

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GA passed

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.spark.annotation.Evolving;

import java.util.Arrays;
import java.text.DecimalFormat;

/**
Expand All @@ -33,7 +32,11 @@ public abstract class CustomAvgMetric implements CustomMetric {
@Override
public String aggregateTaskMetrics(long[] taskMetrics) {
if (taskMetrics.length > 0) {
double average = ((double)Arrays.stream(taskMetrics).sum()) / taskMetrics.length;
long sum = 0L;
for (long taskMetric : taskMetrics) {
sum += taskMetric;
}
double average = ((double) sum) / taskMetrics.length;
return new DecimalFormat("#0.000").format(average);
} else {
return "0";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import org.apache.spark.annotation.Evolving;

import java.util.Arrays;

/**
* Built-in `CustomMetric` that sums up metric values. Note that please extend this class
* and override `name` and `description` to create your custom metric for real usage.
Expand All @@ -31,6 +29,10 @@
public abstract class CustomSumMetric implements CustomMetric {
@Override
public String aggregateTaskMetrics(long[] taskMetrics) {
return String.valueOf(Arrays.stream(taskMetrics).sum());
long sum = 0L;
for (long taskMetric : taskMetrics) {
sum += taskMetric;
}
return String.valueOf(sum);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@

package org.apache.spark.sql.connector.util;

import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import java.util.StringJoiner;
import java.util.stream.Collectors;

import org.apache.spark.sql.connector.expressions.Cast;
import org.apache.spark.sql.connector.expressions.Expression;
Expand Down Expand Up @@ -62,9 +61,9 @@ public String build(Expression expr) {
String name = e.name();
switch (name) {
case "IN": {
List<String> children =
Arrays.stream(e.children()).map(c -> build(c)).collect(Collectors.toList());
return visitIn(children.get(0), children.subList(1, children.size()));
Expression[] expressions = e.children();
List<String> children = expressionsToStringList(expressions, 1, expressions.length - 1);
return visitIn(build(expressions[0]), children);
}
case "IS_NULL":
return visitIsNull(build(e.children()[0]));
Expand Down Expand Up @@ -159,63 +158,56 @@ public String build(Expression expr) {
case "BIT_LENGTH":
case "CHAR_LENGTH":
case "CONCAT":
return visitSQLFunction(name,
Arrays.stream(e.children()).map(c -> build(c)).toArray(String[]::new));
return visitSQLFunction(name, expressionsToStringArray(e.children()));
case "CASE_WHEN": {
List<String> children =
Arrays.stream(e.children()).map(c -> build(c)).collect(Collectors.toList());
return visitCaseWhen(children.toArray(new String[e.children().length]));
return visitCaseWhen(expressionsToStringArray(e.children()));
}
case "TRIM":
return visitTrim("BOTH",
Arrays.stream(e.children()).map(c -> build(c)).toArray(String[]::new));
return visitTrim("BOTH", expressionsToStringArray(e.children()));
case "LTRIM":
return visitTrim("LEADING",
Arrays.stream(e.children()).map(c -> build(c)).toArray(String[]::new));
return visitTrim("LEADING", expressionsToStringArray(e.children()));
case "RTRIM":
return visitTrim("TRAILING",
Arrays.stream(e.children()).map(c -> build(c)).toArray(String[]::new));
return visitTrim("TRAILING", expressionsToStringArray(e.children()));
case "OVERLAY":
return visitOverlay(
Arrays.stream(e.children()).map(c -> build(c)).toArray(String[]::new));
return visitOverlay(expressionsToStringArray(e.children()));
// TODO supports other expressions
default:
return visitUnexpectedExpr(expr);
}
} else if (expr instanceof Min) {
Min min = (Min) expr;
return visitAggregateFunction("MIN", false,
Arrays.stream(min.children()).map(c -> build(c)).toArray(String[]::new));
expressionsToStringArray(min.children()));
} else if (expr instanceof Max) {
Max max = (Max) expr;
return visitAggregateFunction("MAX", false,
Arrays.stream(max.children()).map(c -> build(c)).toArray(String[]::new));
expressionsToStringArray(max.children()));
} else if (expr instanceof Count) {
Count count = (Count) expr;
return visitAggregateFunction("COUNT", count.isDistinct(),
Arrays.stream(count.children()).map(c -> build(c)).toArray(String[]::new));
expressionsToStringArray(count.children()));
} else if (expr instanceof Sum) {
Sum sum = (Sum) expr;
return visitAggregateFunction("SUM", sum.isDistinct(),
Arrays.stream(sum.children()).map(c -> build(c)).toArray(String[]::new));
expressionsToStringArray(sum.children()));
} else if (expr instanceof CountStar) {
return visitAggregateFunction("COUNT", false, new String[]{"*"});
} else if (expr instanceof Avg) {
Avg avg = (Avg) expr;
return visitAggregateFunction("AVG", avg.isDistinct(),
Arrays.stream(avg.children()).map(c -> build(c)).toArray(String[]::new));
expressionsToStringArray(avg.children()));
} else if (expr instanceof GeneralAggregateFunc) {
GeneralAggregateFunc f = (GeneralAggregateFunc) expr;
return visitAggregateFunction(f.name(), f.isDistinct(),
Arrays.stream(f.children()).map(c -> build(c)).toArray(String[]::new));
expressionsToStringArray(f.children()));
} else if (expr instanceof UserDefinedScalarFunc) {
UserDefinedScalarFunc f = (UserDefinedScalarFunc) expr;
return visitUserDefinedScalarFunction(f.name(), f.canonicalName(),
Arrays.stream(f.children()).map(c -> build(c)).toArray(String[]::new));
expressionsToStringArray(f.children()));
} else if (expr instanceof UserDefinedAggregateFunc) {
UserDefinedAggregateFunc f = (UserDefinedAggregateFunc) expr;
return visitUserDefinedAggregateFunction(f.name(), f.canonicalName(), f.isDistinct(),
Arrays.stream(f.children()).map(c -> build(c)).toArray(String[]::new));
expressionsToStringArray(f.children()));
} else {
return visitUnexpectedExpr(expr);
}
Expand Down Expand Up @@ -393,4 +385,21 @@ private String joinListToString(
}
return joiner.toString();
}

private String[] expressionsToStringArray(Expression[] expressions) {
String[] result = new String[expressions.length];
for (int i = 0; i < expressions.length; i++) {
result[i] = build(expressions[i]);
}
return result;
}

private List<String> expressionsToStringList(Expression[] expressions, int offset, int length) {
List<String> list = new ArrayList<>(length);
while (offset < offset + length) {
list.add(build(expressions[offset]));
offset++;
}
return list;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ class V2PredicateSuite extends SparkFunSuite {
new Predicate("=", Array[Expression](ref("a"), LiteralValue(1, IntegerType))),
new Predicate("=", Array[Expression](ref("b"), LiteralValue(1, IntegerType))))
assert(predicate1.equals(predicate2))
assert(predicate1.references.map(_.describe()).toSeq == Seq("a", "b"))
assert(predicate1.references.map(_.describe()).toSeq.sorted == Seq("a", "b"))
assert(predicate1.describe.equals("(a = 1) AND (b = 1)"))

val v1Filter = V1And(EqualTo("a", 1), EqualTo("b", 1))
Expand All @@ -287,7 +287,7 @@ class V2PredicateSuite extends SparkFunSuite {
new Predicate("=", Array[Expression](ref("a"), LiteralValue(1, IntegerType))),
new Predicate("=", Array[Expression](ref("b"), LiteralValue(1, IntegerType))))
assert(predicate1.equals(predicate2))
assert(predicate1.references.map(_.describe()).toSeq == Seq("a", "b"))
assert(predicate1.references.map(_.describe()).toSeq.sorted == Seq("a", "b"))
assert(predicate1.describe.equals("(a = 1) OR (b = 1)"))

val v1Filter = V1Or(EqualTo("a", 1), EqualTo("b", 1))
Expand Down