Skip to content

Commit 4506998

Browse files
authored
feat: gRPC compression support for flight CLI (#8240)
# Which issue does this PR close? \- # Rationale for this change Some services support gRPC compression. Expose this to the CLI client for: - testing - more efficient data transfer over slow internet connections # What changes are included in this PR? CLI argument wiring. # Are these changes tested? No automated tests. I think we can assume that the libraries we use do what they promise to do. But I also verified that this works by inspecting the traffic using Wireshark. # Are there any user-facing changes? They now have more options.
1 parent 3dcd23f commit 4506998

2 files changed

Lines changed: 57 additions & 3 deletions

File tree

arrow-flight/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ tls-ring = ["tonic/tls-ring"]
7070
tls-webpki-roots = ["tonic/tls-webpki-roots"]
7171

7272
# Enable CLI tools
73-
cli = ["arrow-array/chrono-tz", "arrow-cast/prettyprint", "tonic/tls-webpki-roots", "dep:anyhow", "dep:clap", "dep:tracing-log", "dep:tracing-subscriber", "dep:tokio"]
73+
cli = ["arrow-array/chrono-tz", "arrow-cast/prettyprint", "tonic/tls-webpki-roots", "tonic/gzip", "tonic/deflate", "tonic/zstd", "dep:anyhow", "dep:clap", "dep:tracing-log", "dep:tracing-subscriber", "dep:tokio"]
7474

7575
[dev-dependencies]
7676
arrow-cast = { workspace = true, features = ["prettyprint"] }

arrow-flight/src/bin/flight_sql_client.rs

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,12 @@ use anyhow::{bail, Context, Result};
2121
use arrow_array::{ArrayRef, Datum, RecordBatch, StringArray};
2222
use arrow_cast::{cast_with_options, pretty::pretty_format_batches, CastOptions};
2323
use arrow_flight::{
24+
flight_service_client::FlightServiceClient,
2425
sql::{client::FlightSqlServiceClient, CommandGetDbSchemas, CommandGetTables},
2526
FlightInfo,
2627
};
2728
use arrow_schema::Schema;
28-
use clap::{Parser, Subcommand};
29+
use clap::{Parser, Subcommand, ValueEnum};
2930
use core::str;
3031
use futures::TryStreamExt;
3132
use tonic::{
@@ -53,6 +54,24 @@ pub struct LoggingArgs {
5354
log_verbose_count: u8,
5455
}
5556

57+
/// gRPC/HTTP compression algorithms.
58+
#[derive(Clone, Copy, Debug, PartialEq, Eq, ValueEnum)]
59+
pub enum CompressionEncoding {
60+
Gzip,
61+
Deflate,
62+
Zstd,
63+
}
64+
65+
impl From<CompressionEncoding> for tonic::codec::CompressionEncoding {
66+
fn from(encoding: CompressionEncoding) -> Self {
67+
match encoding {
68+
CompressionEncoding::Gzip => Self::Gzip,
69+
CompressionEncoding::Deflate => Self::Deflate,
70+
CompressionEncoding::Zstd => Self::Zstd,
71+
}
72+
}
73+
}
74+
5675
#[derive(Debug, Parser)]
5776
struct ClientArgs {
5877
/// Additional headers.
@@ -96,6 +115,34 @@ struct ClientArgs {
96115
/// Defaults to `443` if `tls` is set, otherwise defaults to `80`.
97116
#[clap(long)]
98117
port: Option<u16>,
118+
119+
/// Compression accepted by the client for responses sent by the server.
120+
///
121+
/// The client will send this information to the server as part of the request. The server is free to pick an
122+
/// algorithm from that list or use no compression (called "identity" encoding).
123+
///
124+
/// You may define multiple algorithms by using a comma-separated list.
125+
#[clap(long, value_delimiter = ',')]
126+
accept_compression: Vec<CompressionEncoding>,
127+
128+
/// Compression of requests sent by the client to the server.
129+
///
130+
/// Since the client needs to decide on the compression before sending the request, there is no client<->server
131+
/// negotiation. If the server does NOT support the chosen compression, it will respond with an error a la:
132+
///
133+
/// ```
134+
/// Ipc error: Status {
135+
/// code: Unimplemented,
136+
/// message: "Content is compressed with `zstd` which isn't supported",
137+
/// metadata: MetadataMap { headers: {"grpc-accept-encoding": "identity", ...} },
138+
/// ...
139+
/// }
140+
/// ```
141+
///
142+
/// Based on the algorithms listed in the `grpc-accept-encoding` header, you may make a more educated guess for
143+
/// your next request. Note that `identity` is a synonym for "no compression".
144+
#[clap(long)]
145+
send_compression: Option<CompressionEncoding>,
99146
}
100147

101148
#[derive(Debug, Parser)]
@@ -365,7 +412,14 @@ async fn setup_client(args: ClientArgs) -> Result<FlightSqlServiceClient<Channel
365412

366413
let channel = endpoint.connect().await.context("connect to endpoint")?;
367414

368-
let mut client = FlightSqlServiceClient::new(channel);
415+
let mut client = FlightServiceClient::new(channel);
416+
for encoding in args.accept_compression {
417+
client = client.accept_compressed(encoding.into());
418+
}
419+
if let Some(encoding) = args.send_compression {
420+
client = client.send_compressed(encoding.into());
421+
}
422+
let mut client = FlightSqlServiceClient::new_from_inner(client);
369423
info!("connected");
370424

371425
for (k, v) in args.headers {

0 commit comments

Comments
 (0)