forked from apache/datafusion
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy paththread_pools.rs
More file actions
355 lines (316 loc) · 14.6 KB
/
thread_pools.rs
File metadata and controls
355 lines (316 loc) · 14.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! See `main.rs` for how to run it.
//!
//! This example shows how to use separate thread pools (tokio [`Runtime`]))s to
//! run the IO and CPU intensive parts of DataFusion plans.
//!
//! # Background
//!
//! DataFusion, by default, plans and executes all operations (both CPU and IO)
//! on the same thread pool. This makes it fast and easy to get started, but
//! can cause issues when running at scale, especially when fetching and operating
//! on data directly from remote sources.
//!
//! Specifically, without configuration such as in this example, DataFusion
//! plans and executes everything the same thread pool (Tokio Runtime), including
//! any I/O, such as reading Parquet files from remote object storage
//! (e.g. AWS S3), catalog access, and CPU intensive work. Running this diverse
//! workload can lead to issues described in the [Architecture section] such as
//! throttled network bandwidth (due to congestion control) and increased
//! latencies or timeouts while processing network messages.
//!
//! [Architecture section]: https://docs.rs/datafusion/latest/datafusion/index.html#thread-scheduling-cpu--io-thread-pools-and-tokio-runtimes
use std::sync::Arc;
use arrow::util::pretty::pretty_format_batches;
use datafusion::common::runtime::JoinSet;
use datafusion::error::Result;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::prelude::*;
use datafusion_examples::utils::{datasets::ExampleDataset, write_csv_to_parquet};
use futures::stream::StreamExt;
use object_store::client::SpawnedReqwestConnector;
use object_store::http::HttpBuilder;
use tokio::runtime::Handle;
use tokio::sync::Notify;
use url::Url;
/// Normally, you don't need to worry about the details of the tokio
/// [`Runtime`], but for this example it is important to understand how the
/// [`Runtime`]s work.
///
/// Each thread has "current" runtime that is installed in a thread local
/// variable which is used by the `tokio::spawn` function.
///
/// The `#[tokio::main]` macro creates a [`Runtime`] and installs it as
/// as the "current" runtime in a thread local variable, on which any `async`
/// [`Future`], [`Stream]`s and [`Task]`s are run.
///
/// This example uses the runtime created by [`tokio::main`] to do I/O and spawn
/// CPU intensive tasks on a separate [`Runtime`], mirroring the common pattern
/// when using Rust libraries such as `tonic`. Using a separate `Runtime` for
/// CPU bound tasks will often be simpler in larger applications, even though it
/// makes this example slightly more complex.
pub async fn thread_pools() -> Result<()> {
// The first two examples read local files. Enabling the URL table feature
// lets us treat filenames as tables in SQL.
let ctx = SessionContext::new().enable_url_table();
// Convert the CSV input into a temporary Parquet directory for querying
let dataset = ExampleDataset::Cars;
let parquet_temp = write_csv_to_parquet(&ctx, &dataset.path()).await?;
let sql = format!("SELECT * FROM '{}'", parquet_temp.path_str()?);
// Run a query on the current runtime. Calling `await` means the future
// (in this case the `async` function and all spawned work in DataFusion
// plans) on the current runtime.
same_runtime(&ctx, &sql).await?;
// Run the same query but this time on a different runtime.
//
// Since we call `await` here, the `async` function itself runs on the
// current runtime, but internally `different_runtime_basic` executes the
// DataFusion plan on a different Runtime.
different_runtime_basic(ctx, sql).await?;
// Run the same query on a different runtime, including remote IO.
//
// NOTE: This is best practice for production systems
different_runtime_advanced().await?;
Ok(())
}
/// Run queries directly on the current tokio `Runtime`
///
/// This is how most examples in DataFusion are written and works well for
/// development, local query processing, and non latency sensitive workloads.
async fn same_runtime(ctx: &SessionContext, sql: &str) -> Result<()> {
// Calling .sql is an async function as it may also do network
// I/O, for example to contact a remote catalog or do an object store LIST
let df = ctx.sql(sql).await?;
// While many examples call `collect` or `show()`, those methods buffers the
// results. Internally DataFusion generates output a RecordBatch at a time
// Calling `execute_stream` return a `SendableRecordBatchStream`. Depending
// on the plan, this may also do network I/O, for example to begin reading a
// parquet file from a remote object store.
let mut stream: SendableRecordBatchStream = df.execute_stream().await?;
// `next()` drives the plan, incrementally producing new `RecordBatch`es
// using the current runtime.
//
// Perhaps somewhat non obviously, calling `next()` can also result in other
// tasks being spawned on the current runtime (e.g. for `RepartitionExec` to
// read data from each of its input partitions in parallel).
//
// Executing the plan using this pattern intermixes any IO and CPU intensive
// work on same Runtime
while let Some(batch) = stream.next().await {
println!("{}", pretty_format_batches(&[batch?])?);
}
Ok(())
}
/// Run queries on a **different** Runtime dedicated for CPU bound work
///
/// This example is suitable for running DataFusion plans against local data
/// sources (e.g. files) and returning results to an async destination, as might
/// be done to return query results to a remote client.
///
/// Production systems which also read data locally or require very low latency
/// should follow the recommendations on [`different_runtime_advanced`] when
/// processing data from a remote source such as object storage.
async fn different_runtime_basic(ctx: SessionContext, sql: String) -> Result<()> {
// Since we are already in the context of runtime (installed by
// #[tokio::main]), we need a new Runtime (threadpool) for CPU bound tasks
let cpu_runtime = CpuRuntime::try_new()?;
// Prepare a task that runs the plan on cpu_runtime and sends
// the results back to the original runtime via a channel.
let (tx, mut rx) = tokio::sync::mpsc::channel(2);
let driver_task = async move {
// Plan the query (which might require CPU work to evaluate statistics)
let df = ctx.sql(&sql).await?;
let mut stream: SendableRecordBatchStream = df.execute_stream().await?;
// Calling `next()` to drive the plan in this task drives the
// execution from the cpu runtime the other thread pool
//
// NOTE any IO run by this plan (for example, reading from an
// `ObjectStore`) will be done on this new thread pool as well.
while let Some(batch) = stream.next().await {
if tx.send(batch).await.is_err() {
// error means dropped receiver, so nothing will get results anymore
return Ok(());
}
}
Ok(()) as Result<()>
};
// Run the driver task on the cpu runtime. Use a JoinSet to
// ensure the spawned task is canceled on error/drop
let mut join_set = JoinSet::new();
join_set.spawn_on(driver_task, cpu_runtime.handle());
// Retrieve the results in the original (IO) runtime. This requires only
// minimal work (pass pointers around).
while let Some(batch) = rx.recv().await {
println!("{}", pretty_format_batches(&[batch?])?);
}
// wait for completion of the driver task
drain_join_set(join_set).await;
Ok(())
}
/// Run CPU intensive work on a different runtime but do IO operations (object
/// store access) on the current runtime.
async fn different_runtime_advanced() -> Result<()> {
// In this example, we will query a file via https, reading
// the data directly from the plan
// The current runtime (created by tokio::main) is used for IO
//
// Note this handle should be used for *ALL* remote IO operations in your
// systems, including remote catalog access, which is not included in this
// example.
let cpu_runtime = CpuRuntime::try_new()?;
let io_handle = Handle::current();
let ctx = SessionContext::new();
// By default, the HttpStore use the same runtime that calls `await` for IO
// operations. This means that if the DataFusion plan is called from the
// cpu_runtime, the HttpStore IO operations will *also* run on the CPU
// runtime, which will error.
//
// To avoid this, we use a `SpawnedReqwestConnector` to configure the
// `ObjectStore` to run the HTTP requests on the IO runtime.
let base_url = Url::parse("https://github.com").unwrap();
let http_store = HttpBuilder::new()
.with_url(base_url.clone())
// Use the io_runtime to run the HTTP requests. Without this line,
// you will see an error such as:
// A Tokio 1.x context was found, but IO is disabled.
.with_http_connector(SpawnedReqwestConnector::new(io_handle))
.build()?;
// Tell DataFusion to process `http://` urls with this wrapped object store
ctx.register_object_store(&base_url, Arc::new(http_store));
// As above, plan and execute the query on the cpu runtime.
let (tx, mut rx) = tokio::sync::mpsc::channel(2);
let driver_task = async move {
// Plan / execute the query
let url = "https://github.com/apache/arrow-testing/raw/master/data/csv/aggregate_test_100.csv";
let df = ctx
.sql(&format!("SELECT c1,c2,c3 FROM '{url}' LIMIT 5"))
.await?;
let mut stream: SendableRecordBatchStream = df.execute_stream().await?;
// Note you can do other non trivial CPU work on the results of the
// stream before sending it back to the original runtime. For example,
// calling a FlightDataEncoder to convert the results to flight messages
// to send over the network
// send results, as above
while let Some(batch) = stream.next().await {
if tx.send(batch).await.is_err() {
return Ok(());
}
}
Ok(()) as Result<()>
};
let mut join_set = JoinSet::new();
join_set.spawn_on(driver_task, cpu_runtime.handle());
while let Some(batch) = rx.recv().await {
println!("{}", pretty_format_batches(&[batch?])?);
}
Ok(())
}
/// Waits for all tasks in the JoinSet to complete and reports any errors that
/// occurred.
///
/// If we don't do this, any errors that occur in the task (such as IO errors)
/// are not reported.
async fn drain_join_set(mut join_set: JoinSet<Result<()>>) {
// retrieve any errors from the tasks
while let Some(result) = join_set.join_next().await {
match result {
Ok(Ok(())) => {} // task completed successfully
Ok(Err(e)) => eprintln!("Task failed: {e}"), // task failed
Err(e) => eprintln!("JoinSet error: {e}"), // JoinSet error
}
}
}
/// Creates a Tokio [`Runtime`] for use with CPU bound tasks
///
/// Tokio forbids dropping `Runtime`s in async contexts, so creating a separate
/// `Runtime` correctly is somewhat tricky. This structure manages the creation
/// and shutdown of a separate thread.
///
/// # Notes
/// On drop, the thread will wait for all remaining tasks to complete.
///
/// Depending on your application, more sophisticated shutdown logic may be
/// required, such as ensuring that no new tasks are added to the runtime.
///
/// # Credits
/// This code is derived from code originally written for [InfluxDB 3.0]
///
/// [InfluxDB 3.0]: https://github.com/influxdata/influxdb3_core/tree/6fcbb004232738d55655f32f4ad2385523d10696/executor
struct CpuRuntime {
/// Handle is the tokio structure for interacting with a Runtime.
handle: Handle,
/// Signal to start shutting down
notify_shutdown: Arc<Notify>,
/// When thread is active, is Some
thread_join_handle: Option<std::thread::JoinHandle<()>>,
}
impl Drop for CpuRuntime {
fn drop(&mut self) {
// Notify the thread to shutdown.
self.notify_shutdown.notify_one();
// In a production system you also need to ensure your code stops adding
// new tasks to the underlying runtime after this point to allow the
// thread to complete its work and exit cleanly.
if let Some(thread_join_handle) = self.thread_join_handle.take() {
// If the thread is still running, we wait for it to finish
print!("Shutting down CPU runtime thread...");
if let Err(e) = thread_join_handle.join() {
eprintln!("Error joining CPU runtime thread: {e:?}",);
} else {
println!("CPU runtime thread shutdown successfully.");
}
}
}
}
impl CpuRuntime {
/// Create a new Tokio Runtime for CPU bound tasks
pub fn try_new() -> Result<Self> {
let cpu_runtime = tokio::runtime::Builder::new_multi_thread()
.enable_time()
.build()?;
let handle = cpu_runtime.handle().clone();
let notify_shutdown = Arc::new(Notify::new());
let notify_shutdown_captured = Arc::clone(¬ify_shutdown);
// The cpu_runtime runs and is dropped on a separate thread
let thread_join_handle = std::thread::spawn(move || {
cpu_runtime.block_on(async move {
notify_shutdown_captured.notified().await;
});
// Note: cpu_runtime is dropped here, which will wait for all tasks
// to complete
});
Ok(Self {
handle,
notify_shutdown,
thread_join_handle: Some(thread_join_handle),
})
}
/// Return a handle suitable for spawning CPU bound tasks
///
/// # Notes
///
/// If a task spawned on this handle attempts to do IO, it will error with a
/// message such as:
///
/// ```text
/// A Tokio 1.x context was found, but IO is disabled.
/// ```
pub fn handle(&self) -> &Handle {
&self.handle
}
}