3535import com .azure .core .http .HttpMethod ;
3636import com .azure .core .http .HttpRequest ;
3737import com .azure .core .http .HttpResponse ;
38+ import com .azure .core .http .rest .PagedResponse ;
3839import com .azure .core .http .rest .Response ;
3940import com .azure .core .util .Context ;
4041import com .azure .storage .blob .BlobClient ;
5152import com .azure .storage .blob .options .BlobParallelUploadOptions ;
5253import com .azure .storage .common .implementation .Constants ;
5354
55+ import org .apache .commons .lang3 .StringUtils ;
5456import org .apache .logging .log4j .LogManager ;
5557import org .apache .logging .log4j .Logger ;
5658import org .apache .logging .log4j .core .util .Throwables ;
8284import java .util .HashMap ;
8385import java .util .HashSet ;
8486import java .util .Map ;
87+ import java .util .Optional ;
8588import java .util .Set ;
8689import java .util .concurrent .Executor ;
8790import java .util .concurrent .atomic .AtomicLong ;
@@ -217,50 +220,71 @@ public DeleteResult deleteBlobDirectory(String path, Executor executor) throws U
217220 final ListBlobsOptions listBlobsOptions = new ListBlobsOptions ().setPrefix (path );
218221
219222 SocketAccess .doPrivilegedVoidException (() -> {
220- for (final BlobItem blobItem : blobContainer .listBlobs (listBlobsOptions , timeout ())) {
221- // Skipping prefixes as those are not deletable and should not be there
222- assert (blobItem .isPrefix () == null || !blobItem .isPrefix ()) : "Only blobs (not prefixes) are expected" ;
223-
224- outstanding .incrementAndGet ();
225- executor .execute (new AbstractRunnable () {
226- @ Override
227- protected void doRun () throws Exception {
228- final long len = blobItem .getProperties ().getContentLength ();
229-
230- final BlobClient azureBlob = blobContainer .getBlobClient (blobItem .getName ());
231- logger .trace (
232- () -> new ParameterizedMessage ("container [{}]: blob [{}] found. removing." , container , blobItem .getName ())
233- );
234- final Response <Void > response = azureBlob .deleteWithResponse (null , null , timeout (), client .v2 ().get ());
235- logger .trace (
236- () -> new ParameterizedMessage (
237- "container [{}]: blob [{}] deleted status [{}]." ,
238- container ,
239- blobItem .getName (),
240- response .getStatusCode ()
241- )
242- );
243-
244- blobsDeleted .incrementAndGet ();
245- if (len >= 0 ) {
246- bytesDeleted .addAndGet (len );
223+ String continuationToken = null ;
224+
225+ do {
226+ // Fetch one page at a time, others are going to be fetched by continuation token
227+ // TODO: reconsider reverting to simplified approach once https://github.com/Azure/azure-sdk-for-java/issues/26064
228+ // gets addressed.
229+ final Optional <PagedResponse <BlobItem >> pageOpt = blobContainer .listBlobs (listBlobsOptions , timeout ())
230+ .streamByPage (continuationToken )
231+ .findFirst ();
232+
233+ if (!pageOpt .isPresent ()) {
234+ // No more pages, should never happen
235+ break ;
236+ }
237+
238+ final PagedResponse <BlobItem > page = pageOpt .get ();
239+ for (final BlobItem blobItem : page .getValue ()) {
240+ // Skipping prefixes as those are not deletable and should not be there
241+ assert (blobItem .isPrefix () == null || !blobItem .isPrefix ()) : "Only blobs (not prefixes) are expected" ;
242+
243+ outstanding .incrementAndGet ();
244+ executor .execute (new AbstractRunnable () {
245+ @ Override
246+ protected void doRun () throws Exception {
247+ final long len = blobItem .getProperties ().getContentLength ();
248+
249+ final BlobClient azureBlob = blobContainer .getBlobClient (blobItem .getName ());
250+ logger .trace (
251+ () -> new ParameterizedMessage ("container [{}]: blob [{}] found. removing." , container , blobItem .getName ())
252+ );
253+ final Response <Void > response = azureBlob .deleteWithResponse (null , null , timeout (), client .v2 ().get ());
254+ logger .trace (
255+ () -> new ParameterizedMessage (
256+ "container [{}]: blob [{}] deleted status [{}]." ,
257+ container ,
258+ blobItem .getName (),
259+ response .getStatusCode ()
260+ )
261+ );
262+
263+ blobsDeleted .incrementAndGet ();
264+ if (len >= 0 ) {
265+ bytesDeleted .addAndGet (len );
266+ }
247267 }
248- }
249268
250- @ Override
251- public void onFailure (Exception e ) {
252- exceptions .add (e );
253- }
269+ @ Override
270+ public void onFailure (Exception e ) {
271+ exceptions .add (e );
272+ }
254273
255- @ Override
256- public void onAfter () {
257- if (outstanding .decrementAndGet () == 0 ) {
258- result .onResponse (null );
274+ @ Override
275+ public void onAfter () {
276+ if (outstanding .decrementAndGet () == 0 ) {
277+ result .onResponse (null );
278+ }
259279 }
260- }
261- });
262- }
280+ });
281+ }
282+
283+ // Fetch next continuation token
284+ continuationToken = page .getContinuationToken ();
285+ } while (StringUtils .isNotBlank (continuationToken ));
263286 });
287+
264288 if (outstanding .decrementAndGet () == 0 ) {
265289 result .onResponse (null );
266290 }
@@ -301,20 +325,39 @@ public Map<String, BlobMetadata> listBlobsByPrefix(String keyPath, String prefix
301325 .setPrefix (keyPath + (prefix == null ? "" : prefix ));
302326
303327 SocketAccess .doPrivilegedVoidException (() -> {
304- for (final BlobItem blobItem : blobContainer .listBlobsByHierarchy ("/" , listBlobsOptions , timeout ())) {
305- // Skipping over the prefixes, only look for the blobs
306- if (blobItem .isPrefix () != null && blobItem .isPrefix ()) {
307- continue ;
328+ String continuationToken = null ;
329+
330+ do {
331+ // Fetch one page at a time, others are going to be fetched by continuation token
332+ // TODO: reconsider reverting to simplified approach once https://github.com/Azure/azure-sdk-for-java/issues/26064
333+ // gets addressed
334+ final Optional <PagedResponse <BlobItem >> pageOpt = blobContainer .listBlobsByHierarchy ("/" , listBlobsOptions , timeout ())
335+ .streamByPage (continuationToken )
336+ .findFirst ();
337+
338+ if (!pageOpt .isPresent ()) {
339+ // No more pages, should never happen
340+ break ;
308341 }
309342
310- final String name = getBlobName (blobItem .getName (), container , keyPath );
311- logger .trace (() -> new ParameterizedMessage ("blob name [{}]" , name ));
343+ final PagedResponse <BlobItem > page = pageOpt .get ();
344+ for (final BlobItem blobItem : page .getValue ()) {
345+ // Skipping over the prefixes, only look for the blobs
346+ if (blobItem .isPrefix () != null && blobItem .isPrefix ()) {
347+ continue ;
348+ }
312349
313- final BlobItemProperties properties = blobItem .getProperties ();
314- logger .trace (() -> new ParameterizedMessage ("blob name [{}], size [{}]" , name , properties .getContentLength ()));
315- blobsBuilder .put (name , new PlainBlobMetadata (name , properties .getContentLength ()));
316- }
350+ final String name = getBlobName (blobItem .getName (), container , keyPath );
351+ logger .trace (() -> new ParameterizedMessage ("blob name [{}]" , name ));
352+
353+ final BlobItemProperties properties = blobItem .getProperties ();
354+ logger .trace (() -> new ParameterizedMessage ("blob name [{}], size [{}]" , name , properties .getContentLength ()));
355+ blobsBuilder .put (name , new PlainBlobMetadata (name , properties .getContentLength ()));
356+ }
317357
358+ // Fetch next continuation token
359+ continuationToken = page .getContinuationToken ();
360+ } while (StringUtils .isNotBlank (continuationToken ));
318361 });
319362
320363 return MapBuilder .newMapBuilder (blobsBuilder ).immutableMap ();
@@ -330,18 +373,36 @@ public Map<String, BlobContainer> children(BlobPath path) throws URISyntaxExcept
330373 .setPrefix (keyPath );
331374
332375 SocketAccess .doPrivilegedVoidException (() -> {
333- for (final BlobItem blobItem : blobContainer .listBlobsByHierarchy ("/" , listBlobsOptions , timeout ())) {
334- // Skipping over the blobs, only look for prefixes
335- if (blobItem .isPrefix () != null && blobItem .isPrefix ()) {
336- // Expecting name in the form /container/keyPath.* and we want to strip off the /container/
337- // this requires 1 + container.length() + 1, with each 1 corresponding to one of the /.
338- // Lastly, we add the length of keyPath to the offset to strip this container's path.
339- final String name = getBlobName (blobItem .getName (), container , keyPath ).replaceAll ("/$" , "" );
340- logger .trace (() -> new ParameterizedMessage ("blob name [{}]" , name ));
341- blobsBuilder .add (name );
376+ String continuationToken = null ;
377+
378+ do {
379+ // Fetch one page at a time, others are going to be fetched by continuation token
380+ // TODO: reconsider reverting to simplified approach once https://github.com/Azure/azure-sdk-for-java/issues/26064
381+ // gets addressed
382+ final Optional <PagedResponse <BlobItem >> pageOpt = blobContainer .listBlobsByHierarchy ("/" , listBlobsOptions , timeout ())
383+ .streamByPage (continuationToken )
384+ .findFirst ();
385+
386+ if (!pageOpt .isPresent ()) {
387+ // No more pages, should never happen
388+ break ;
342389 }
343- }
344- ;
390+
391+ final PagedResponse <BlobItem > page = pageOpt .get ();
392+ for (final BlobItem blobItem : page .getValue ()) {
393+ // Skipping over the blobs, only look for prefixes
394+ if (blobItem .isPrefix () != null && blobItem .isPrefix ()) {
395+ // Expecting name in the form /container/keyPath.* and we want to strip off the /container/
396+ // this requires 1 + container.length() + 1, with each 1 corresponding to one of the /.
397+ // Lastly, we add the length of keyPath to the offset to strip this container's path.
398+ final String name = getBlobName (blobItem .getName (), container , keyPath ).replaceAll ("/$" , "" );
399+ logger .trace (() -> new ParameterizedMessage ("blob name [{}]" , name ));
400+ blobsBuilder .add (name );
401+ }
402+ }
403+ // Fetch next continuation token
404+ continuationToken = page .getContinuationToken ();
405+ } while (StringUtils .isNotBlank (continuationToken ));
345406 });
346407
347408 return Collections .unmodifiableMap (
0 commit comments