@@ -164,6 +164,17 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
164164 Ok ( pointer)
165165 }
166166
167+ fn read_canonical_http_revision (
168+ & self ,
169+ lock_shard : & CacheShard ,
170+ hashes : HashPolicy < ' _ > ,
171+ ) -> Result < Option < Revision > , Error > {
172+ let entry = lock_shard. entry ( HTTP_REVISION ) ;
173+ Ok ( HttpRevisionPointer :: read_from ( & entry) ?
174+ . map ( HttpRevisionPointer :: into_revision)
175+ . filter ( |revision| revision. has_digests ( hashes) ) )
176+ }
177+
167178 /// Download and build a [`SourceDist`].
168179 pub ( crate ) async fn download_and_build (
169180 & self ,
@@ -595,6 +606,38 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
595606 // Acquire the concurrency permit and advisory lock.
596607 let _permit = self . acquire_concurrency_permit ( ) . await ;
597608 let _lock = lock_shard. lock ( ) . await . map_err ( Error :: CacheLock ) ?;
609+ let revision = self
610+ . read_canonical_http_revision ( lock_shard, hashes) ?
611+ . unwrap_or ( revision) ;
612+ let cache_shard = lock_shard. shard ( revision. id ( ) ) ;
613+ let source_dist_entry = cache_shard. entry ( SOURCE ) ;
614+ let revision = if source_dist_entry. path ( ) . is_dir ( ) {
615+ revision
616+ } else {
617+ self . heal_url_revision (
618+ source,
619+ ext,
620+ url,
621+ index,
622+ & source_dist_entry,
623+ revision,
624+ hashes,
625+ client,
626+ )
627+ . await ?
628+ } ;
629+ if let Some ( subdirectory) = subdirectory {
630+ if !source_dist_entry. path ( ) . join ( subdirectory) . is_dir ( ) {
631+ return Err ( Error :: MissingSubdirectory (
632+ url. clone ( ) ,
633+ subdirectory. to_path_buf ( ) ,
634+ ) ) ;
635+ }
636+ }
637+ let cache_shard = build_info
638+ . cache_shard ( )
639+ . map ( |digest| cache_shard. shard ( digest) )
640+ . unwrap_or ( cache_shard) ;
598641
599642 // Re-check the cache under lock to avoid duplicate builds across concurrent tasks.
600643 if let Some ( file) = BuiltWheelFile :: find_in_cache ( tags, & cache_shard)
@@ -739,6 +782,35 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
739782 // Acquire the concurrency permit and advisory lock.
740783 let _permit = self . acquire_concurrency_permit ( ) . await ;
741784 let _lock = lock_shard. lock ( ) . await . map_err ( Error :: CacheLock ) ?;
785+ let revision = self
786+ . read_canonical_http_revision ( lock_shard, hashes) ?
787+ . unwrap_or ( revision) ;
788+ let cache_shard = lock_shard. shard ( revision. id ( ) ) ;
789+ let source_dist_entry = cache_shard. entry ( SOURCE ) ;
790+ let revision = if source_dist_entry. path ( ) . is_dir ( ) {
791+ revision
792+ } else {
793+ self . heal_url_revision (
794+ source,
795+ ext,
796+ url,
797+ index,
798+ & source_dist_entry,
799+ revision,
800+ hashes,
801+ client,
802+ )
803+ . await ?
804+ } ;
805+ if let Some ( subdirectory) = subdirectory {
806+ if !source_dist_entry. path ( ) . join ( subdirectory) . is_dir ( ) {
807+ return Err ( Error :: MissingSubdirectory (
808+ url. clone ( ) ,
809+ subdirectory. to_path_buf ( ) ,
810+ ) ) ;
811+ }
812+ }
813+ let metadata_entry = cache_shard. entry ( METADATA ) ;
742814
743815 // Re-check the cache under lock to avoid duplicate builds across concurrent tasks.
744816 if let Some ( metadata) = self
0 commit comments