Skip to content

java.lang.InterruptedException when tring to read events from remote storage #696

@catalinmer

Description

@catalinmer

Hi,

After running it without problems for several weeks (I guess mostly we put events into remote storage, there wasn't much reading until now) we started to have the following errors in the logs.
Looks like it is trying to read some events from remote storage but it fails to do so, below it is the error:

ERROR Error occurred while reading the remote data for wallet-events-3 (kafka.log.remote.RemoteLogReader)
org.apache.kafka.common.KafkaException: org.apache.kafka.server.log.remote.storage.RemoteStorageException: java.lang.RuntimeException: java.lang.InterruptedException
	at org.apache.kafka.storage.internals.log.RemoteIndexCache.lambda$createCacheEntry$13(RemoteIndexCache.java:407)
	at org.apache.kafka.storage.internals.log.RemoteIndexCache.loadIndexFile(RemoteIndexCache.java:354)
	at org.apache.kafka.storage.internals.log.RemoteIndexCache.createCacheEntry(RemoteIndexCache.java:403)
	at org.apache.kafka.storage.internals.log.RemoteIndexCache.lambda$getIndexEntry$10(RemoteIndexCache.java:377)
	at com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2406)
	at java.base/java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1947)
	at com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2404)
	at com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2387)
	at com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108)
	at com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62)
	at org.apache.kafka.storage.internals.log.RemoteIndexCache.getIndexEntry(RemoteIndexCache.java:376)
	at kafka.log.remote.RemoteLogManager.lambda$collectAbortedTransactions$23(RemoteLogManager.java:1718)
	at java.base/java.util.Optional.map(Optional.java:265)
	at kafka.log.remote.RemoteLogManager.collectAbortedTransactions(RemoteLogManager.java:1718)
	at kafka.log.remote.RemoteLogManager.addAbortedTransactions(RemoteLogManager.java:1702)
	at kafka.log.remote.RemoteLogManager.read(RemoteLogManager.java:1666)
	at kafka.log.remote.RemoteLogReader.lambda$call$0(RemoteLogReader.java:66)
	at com.yammer.metrics.core.Timer.time(Timer.java:91)
	at kafka.log.remote.RemoteLogReader.call(RemoteLogReader.java:66)
	at kafka.log.remote.RemoteLogReader.call(RemoteLogReader.java:36)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.server.log.remote.storage.RemoteStorageException: java.lang.RuntimeException: java.lang.InterruptedException
	at io.aiven.kafka.tieredstorage.RemoteStorageManager.fetchIndex(RemoteStorageManager.java:620)
	at org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager.lambda$fetchIndex$5(ClassLoaderAwareRemoteStorageManager.java:88)
	at org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager.withClassLoader(ClassLoaderAwareRemoteStorageManager.java:65)
	at org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager.fetchIndex(ClassLoaderAwareRemoteStorageManager.java:88)
	at org.apache.kafka.storage.internals.log.RemoteIndexCache.lambda$createCacheEntry$13(RemoteIndexCache.java:405)
	... 23 more
Caused by: java.lang.RuntimeException: java.lang.InterruptedException
	at io.aiven.kafka.tieredstorage.fetch.index.MemorySegmentIndexesCache.get(MemorySegmentIndexesCache.java:132)
	at io.aiven.kafka.tieredstorage.RemoteStorageManager.fetchIndex(RemoteStorageManager.java:610)
	... 27 more
Caused by: java.lang.InterruptedException
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:385)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2028)
	at io.aiven.kafka.tieredstorage.fetch.index.MemorySegmentIndexesCache.get(MemorySegmentIndexesCache.java:110)
	... 28 more
DEBUG Deleted cached value for key SegmentIndexKey{indexesKey=wallet-events-KX2_hfceTraIuwXMZ0PQmA/6/00000000000103367174-mALEtIdVQXiVrv3l_fyGAg.indexes, indexType=TIMESTAMP} from cache. The reason of the deletion is SIZE (io.aiven.kafka.tieredstorage.fetch.index.MemorySegmentIndexesCache)
DEBUG Deleted cached value for key SegmentIndexKey{indexesKey=wallet-events-KX2_hfceTraIuwXMZ0PQmA/6/00000000000010536598-Bbi0rOhlTe2P1LQmzKOPDQ.indexes, indexType=OFFSET} from cache. The reason of the deletion is SIZE (io.aiven.kafka.tieredstorage.fetch.index.MemorySegmentIndexesCache)

The same message appears over and over, any thoughts?
Below is the configuration:
kafka 3.9.0 with S3 remote storage release 2024-10-23-1729694047

remote.log.storage.system.enable=true
remote.log.metadata.manager.class.name=org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager
remote.log.metadata.manager.listener.name=PLAINTEXT
remote.log.storage.manager.class.path=/opt/kafka/core/:/opt/kafka/s3/
remote.log.storage.manager.class.name=io.aiven.kafka.tieredstorage.RemoteStorageManager
rsm.config.chunk.size=4194304
rsm.config.storage.backend.class=io.aiven.kafka.tieredstorage.storage.s3.S3Storage
rsm.config.storage.s3.bucket.name=my-test-kafka-remote-tier
rsm.config.storage.s3.region=us-west-1
rsm.config.storage.s3.credentials.default=true
rsm.config.fetch.chunk.cache.class=io.aiven.kafka.tieredstorage.fetch.cache.DiskChunkCache
rsm.config.fetch.chunk.cache.path=/opt/kafka/rsm-cache
rsm.config.fetch.chunk.cache.size=98784247808
rsm.confi.fetch.chunk.cache.prefetch.max.size=16777216
rsm.config.fetch.chunk.cache.retention.ms=1200000
rsm.config.upload.rate.limit.bytes.per.second=104857600
remote.storage.enable=true

Thank you

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions