Skip to content

Commit 1369a7b

Browse files
committed
Use global semaphores for concurrency limits
Avoid problems such as #15307, follow-up to #18054. See also #17633, for which this should be helpful.
1 parent a91bcf2 commit 1369a7b

28 files changed

Lines changed: 184 additions & 111 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/uv-bench/benches/uv.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ mod resolver {
203203
exclude_newer,
204204
sources,
205205
workspace_cache,
206-
concurrency,
206+
concurrency.clone(),
207207
Preview::default(),
208208
);
209209

@@ -226,7 +226,11 @@ mod resolver {
226226
&hashes,
227227
&build_context,
228228
installed_packages,
229-
DistributionDatabase::new(client, &build_context, concurrency.downloads),
229+
DistributionDatabase::new(
230+
client,
231+
&build_context,
232+
concurrency.downloads_semaphore.clone(),
233+
),
230234
)?;
231235

232236
Ok(resolver.resolve().await?)

crates/uv-build-frontend/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -223,11 +223,11 @@ pub struct SourceBuildContext {
223223
}
224224

225225
impl SourceBuildContext {
226-
/// Create a [`SourceBuildContext`] with the given concurrent build limit.
227-
pub fn new(concurrent_builds: usize) -> Self {
226+
/// Create a [`SourceBuildContext`] with the given shared concurrency semaphore.
227+
pub fn new(concurrent_build_slots: Arc<Semaphore>) -> Self {
228228
Self {
229229
default_resolution: Arc::default(),
230-
concurrent_build_slots: Arc::new(Semaphore::new(concurrent_builds)),
230+
concurrent_build_slots,
231231
}
232232
}
233233
}

crates/uv-configuration/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ rustc-hash = { workspace = true }
3737
same-file = { workspace = true }
3838
schemars = { workspace = true, optional = true }
3939
serde = { workspace = true }
40+
tokio = { workspace = true }
4041
serde-untagged = { workspace = true }
4142
thiserror = { workspace = true }
4243
tracing = { workspace = true }

crates/uv-configuration/src/concurrency.rs

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
1+
use std::fmt;
12
use std::num::NonZeroUsize;
3+
use std::sync::Arc;
4+
5+
use tokio::sync::Semaphore;
26

37
/// Concurrency limit settings.
4-
#[derive(Copy, Clone, Debug)]
8+
// TODO(konsti): We should find a pattern that doesn't require having both semaphores and counts.
9+
#[derive(Clone)]
510
pub struct Concurrency {
611
/// The maximum number of concurrent downloads.
712
///
@@ -15,22 +20,45 @@ pub struct Concurrency {
1520
///
1621
/// Note this value must be non-zero.
1722
pub installs: usize,
23+
/// A global semaphore to limit the number of concurrent downloads.
24+
pub downloads_semaphore: Arc<Semaphore>,
25+
/// A global semaphore to limit the number of concurrent builds.
26+
pub builds_semaphore: Arc<Semaphore>,
27+
}
28+
29+
/// Custom `Debug` to hide semaphore fields from `--show-settings` output.
30+
#[expect(clippy::missing_fields_in_debug)]
31+
impl fmt::Debug for Concurrency {
32+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
33+
f.debug_struct("Concurrency")
34+
.field("downloads", &self.downloads)
35+
.field("builds", &self.builds)
36+
.field("installs", &self.installs)
37+
.finish()
38+
}
1839
}
1940

2041
impl Default for Concurrency {
2142
fn default() -> Self {
22-
Self {
23-
downloads: Self::DEFAULT_DOWNLOADS,
24-
builds: Self::threads(),
25-
installs: Self::threads(),
26-
}
43+
Self::new(Self::DEFAULT_DOWNLOADS, Self::threads(), Self::threads())
2744
}
2845
}
2946

3047
impl Concurrency {
3148
// The default concurrent downloads limit.
3249
pub const DEFAULT_DOWNLOADS: usize = 50;
3350

51+
/// Create a new [`Concurrency`] with the given limits.
52+
pub fn new(downloads: usize, builds: usize, installs: usize) -> Self {
53+
Self {
54+
downloads,
55+
builds,
56+
installs,
57+
downloads_semaphore: Arc::new(Semaphore::new(downloads)),
58+
builds_semaphore: Arc::new(Semaphore::new(builds)),
59+
}
60+
}
61+
3462
// The default concurrent builds and install limit.
3563
pub fn threads() -> usize {
3664
std::thread::available_parallelism()

crates/uv-dispatch/src/lib.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ impl<'a> BuildDispatch<'a> {
147147
build_options,
148148
hasher,
149149
exclude_newer,
150-
source_build_context: SourceBuildContext::new(concurrency.builds),
150+
source_build_context: SourceBuildContext::new(concurrency.builds_semaphore.clone()),
151151
build_extra_env_vars: FxHashMap::default(),
152152
sources,
153153
workspace_cache,
@@ -264,8 +264,12 @@ impl BuildContext for BuildDispatch<'_> {
264264
self.hasher,
265265
self,
266266
EmptyInstalledPackages,
267-
DistributionDatabase::new(self.client, self, self.concurrency.downloads)
268-
.with_build_stack(build_stack),
267+
DistributionDatabase::new(
268+
self.client,
269+
self,
270+
self.concurrency.downloads_semaphore.clone(),
271+
)
272+
.with_build_stack(build_stack),
269273
)?;
270274
let resolution = Resolution::from(resolver.resolve().await.with_context(|| {
271275
format!(
@@ -353,8 +357,12 @@ impl BuildContext for BuildDispatch<'_> {
353357
tags,
354358
self.hasher,
355359
self.build_options,
356-
DistributionDatabase::new(self.client, self, self.concurrency.downloads)
357-
.with_build_stack(build_stack),
360+
DistributionDatabase::new(
361+
self.client,
362+
self,
363+
self.concurrency.downloads_semaphore.clone(),
364+
)
365+
.with_build_stack(build_stack),
358366
);
359367

360368
debug!(

crates/uv-distribution/src/distribution_database.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,12 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
5858
pub fn new(
5959
client: &'a RegistryClient,
6060
build_context: &'a Context,
61-
concurrent_downloads: usize,
61+
downloads_semaphore: Arc<Semaphore>,
6262
) -> Self {
6363
Self {
6464
build_context,
6565
builder: SourceDistributionBuilder::new(build_context),
66-
client: ManagedClient::new(client, concurrent_downloads),
66+
client: ManagedClient::new(client, downloads_semaphore),
6767
reporter: None,
6868
}
6969
}
@@ -1151,15 +1151,15 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
11511151
/// A wrapper around `RegistryClient` that manages a concurrency limit.
11521152
pub struct ManagedClient<'a> {
11531153
pub unmanaged: &'a RegistryClient,
1154-
control: Semaphore,
1154+
control: Arc<Semaphore>,
11551155
}
11561156

11571157
impl<'a> ManagedClient<'a> {
1158-
/// Create a new `ManagedClient` using the given client and concurrency limit.
1159-
fn new(client: &'a RegistryClient, concurrency: usize) -> Self {
1158+
/// Create a new `ManagedClient` using the given client and concurrency semaphore.
1159+
fn new(client: &'a RegistryClient, control: Arc<Semaphore>) -> Self {
11601160
ManagedClient {
11611161
unmanaged: client,
1162-
control: Semaphore::new(concurrency),
1162+
control,
11631163
}
11641164
}
11651165

crates/uv/src/commands/build_frontend.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ pub(crate) async fn build_frontend(
146146
no_config,
147147
python_preference,
148148
python_downloads,
149-
concurrency,
149+
&concurrency,
150150
cache,
151151
printer,
152152
preview,
@@ -193,7 +193,7 @@ async fn build_impl(
193193
no_config: bool,
194194
python_preference: PythonPreference,
195195
python_downloads: PythonDownloads,
196-
concurrency: Concurrency,
196+
concurrency: &Concurrency,
197197
cache: &Cache,
198198
printer: Printer,
199199
preview: Preview,
@@ -469,7 +469,7 @@ async fn build_package(
469469
keyring_provider: KeyringProviderType,
470470
exclude_newer: ExcludeNewer,
471471
sources: NoSources,
472-
concurrency: Concurrency,
472+
concurrency: &Concurrency,
473473
build_options: &BuildOptions,
474474
sdist: bool,
475475
wheel: bool,
@@ -630,7 +630,7 @@ async fn build_package(
630630
exclude_newer,
631631
sources.clone(),
632632
workspace_cache,
633-
concurrency,
633+
concurrency.clone(),
634634
preview,
635635
);
636636

crates/uv/src/commands/pip/compile.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -540,7 +540,7 @@ pub(crate) async fn pip_compile(
540540
exclude_newer.clone(),
541541
sources,
542542
WorkspaceCache::default(),
543-
concurrency,
543+
concurrency.clone(),
544544
preview,
545545
);
546546

@@ -580,7 +580,7 @@ pub(crate) async fn pip_compile(
580580
&flat_index,
581581
&top_level_index,
582582
&build_dispatch,
583-
concurrency,
583+
&concurrency,
584584
options,
585585
Box::new(DefaultResolveLogger),
586586
printer,

crates/uv/src/commands/pip/install.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,7 @@ pub(crate) async fn pip_install(
486486
exclude_newer.clone(),
487487
sources.clone(),
488488
WorkspaceCache::default(),
489-
concurrency,
489+
concurrency.clone(),
490490
preview,
491491
);
492492

@@ -595,7 +595,7 @@ pub(crate) async fn pip_install(
595595
&flat_index,
596596
state.index(),
597597
&build_dispatch,
598-
concurrency,
598+
&concurrency,
599599
options,
600600
Box::new(DefaultResolveLogger),
601601
printer,
@@ -640,7 +640,7 @@ pub(crate) async fn pip_install(
640640
exclude_newer.clone(),
641641
sources,
642642
WorkspaceCache::default(),
643-
concurrency,
643+
concurrency.clone(),
644644
preview,
645645
);
646646

@@ -658,7 +658,7 @@ pub(crate) async fn pip_install(
658658
&tags,
659659
&client,
660660
state.in_flight(),
661-
concurrency,
661+
&concurrency,
662662
&build_dispatch,
663663
&cache,
664664
&environment,

0 commit comments

Comments
 (0)