feat: Add Expanded Cloud Storage Plugin for single block file cloud bucket upload#2413
feat: Add Expanded Cloud Storage Plugin for single block file cloud bucket upload#2413
Conversation
32b4317 to
6982de3
Compare
jsync-swirlds
left a comment
There was a problem hiding this comment.
Quite a few questions and possible improvements.
| /** | ||
| * Constructor for the BlockNodeApp class with a custom environment variable getter. | ||
| * | ||
| * <p><b>Note:</b> This overload exists to support integration and E2E tests that need to | ||
| * inject S3 or other environment-variable-driven config without mutating the real JVM | ||
| * environment. It should not be used in production code; production should use | ||
| * {@link #BlockNodeApp(ServiceLoaderFunction, boolean)} which defaults to | ||
| * {@code System::getenv}. | ||
| * | ||
| * @param serviceLoader Optional function to load the service loader, if null then the default will be used | ||
| * @param shouldExitJvmOnShutdown if true, the JVM will exit on shutdown, otherwise it will not | ||
| * @param envVarGetter function to retrieve environment variable values; {@code System::getenv} | ||
| * in production, or a custom map lookup in tests | ||
| * @throws IOException if there is an error starting the server | ||
| */ | ||
| public BlockNodeApp( | ||
| final ServiceLoaderFunction serviceLoader, | ||
| final boolean shouldExitJvmOnShutdown, | ||
| final Function<String, String> envVarGetter) | ||
| throws IOException { |
There was a problem hiding this comment.
Why are we adding this?
We don't directly load environment variables and should not do so.
The configuration library handles reading environment variables and already has built-in test configuration support...
There was a problem hiding this comment.
Same question. This seems like something we should not be doing.
There was a problem hiding this comment.
E2E API tests needed a way to set env variable override. I believe the current process doesn't make it easy to change configs in the JVM once the process is running.
I was experimenting an easy approach here but will circle back to the build in test config approach
There was a problem hiding this comment.
I am referring to the test configuration support in the config library. It's specifically designed for tests.
There was a problem hiding this comment.
Looking into this. Import path is a little annoying so will come back to it
...e/app/src/testFixtures/java/org/hiero/block/node/app/fixtures/plugintest/PluginTestBase.java
Outdated
Show resolved
Hide resolved
...ge/src/main/java/org/hiero/block/node/expanded/cloud/storage/ExpandedCloudStorageConfig.java
Outdated
Show resolved
Hide resolved
| // Inject S3Mock coordinates via envVarGetter so config picks them up without | ||
| // touching real env vars or system properties. | ||
| final Map<String, String> overrides = Map.of( | ||
| "EXPANDED_CLOUD_STORAGE_ENDPOINT_URL", s3Endpoint, | ||
| "EXPANDED_CLOUD_STORAGE_BUCKET_NAME", BUCKET, | ||
| "EXPANDED_CLOUD_STORAGE_OBJECT_KEY_PREFIX", PREFIX, | ||
| "EXPANDED_CLOUD_STORAGE_ACCESS_KEY", ACCESS_KEY, | ||
| "EXPANDED_CLOUD_STORAGE_SECRET_KEY", SECRET_KEY); | ||
| final Function<String, String> envVarGetter = key -> overrides.getOrDefault(key, System.getenv(key)); | ||
|
|
||
| // Clear local block data directory (same pattern as BlockNodeAPITests) | ||
| final Path dataDir = Paths.get("build/tmp/data").toAbsolutePath(); | ||
| if (Files.exists(dataDir)) { | ||
| Files.walk(dataDir) | ||
| .sorted(Comparator.reverseOrder()) | ||
| .map(Path::toFile) | ||
| .forEach(File::delete); | ||
| } | ||
|
|
||
| app = new BlockNodeApp(new ServiceLoaderFunction(), false, envVarGetter); | ||
| app.start(); |
There was a problem hiding this comment.
This is a rather convoluted mechanism that I don't think we need.
We could just use the test configuration classes to set the configuration rather than try to fake the environment variable access (and try to get env ourselves, which we shouldn't be doing...)
There was a problem hiding this comment.
Need to get back to this, pulling in swirlds test config is a bit involving
| * <h2>Enable / disable</h2> | ||
| * The plugin is disabled when {@code expanded.cloud.storage.endpointUrl} is blank (the | ||
| * default). Set it to a non-empty URL to activate uploads. |
There was a problem hiding this comment.
We shouldn't be enabling/disabling with configuration.
Enable is "plugin present" and Disable is "plugin not present".
Anything else means we're still running init and start (and possibly other calls) for code that has no reason to be on the server.
| </dl> | ||
|
|
||
| <dl> |
There was a problem hiding this comment.
No reason to end the list only to restart it.
| </dl> | |
| <dl> |
| <dl> | ||
| <dt>S3-compatible object store</dt> | ||
| <dd>Any storage service that implements the AWS S3 REST API, including AWS S3, Google Cloud | ||
| Storage (via S3 interoperability), and MinIO.</dd> |
There was a problem hiding this comment.
Probably shouldn't call out MinIO, given it's defunct (and the replacement commercial service has a different name).
There was a problem hiding this comment.
Missed spot, removed it from other places
| 3. Uploading via `S3UploadClient.uploadFile()`, with a timeout enforced by submitting the | ||
| upload to a nested virtual-thread future and calling `Future.get(uploadTimeoutSeconds, SECONDS)`. |
There was a problem hiding this comment.
We shouldn't call an additional thread. Timeout can be enforced in the Completion service calls if necessary (and, to be clear, it's very likely not necessary because the cloud service will timeout the upload if it stalls).
There was a problem hiding this comment.
Can rely on S3 service timeout then
| ### Enabled / disabled guard | ||
|
|
||
| If `expanded.cloud.storage.endpointUrl` is blank (the default), the plugin logs an INFO | ||
| message and returns from `init()` without registering a notification handler. The | ||
| `virtualThreadExecutor` field remains `null`, which acts as the disabled sentinel throughout | ||
| `start()`, `stop()`, and `handleVerification()`. |
There was a problem hiding this comment.
Rather than a soft disable (which is not ideal), we should treat this as an error and mark the plugin unhealthy.
At the moment, that's not possible (we are missing the health checks by plugin), but we should note the intent to make that change.
| A follow-on issue will extend the plugin to implement `BlockProviderPlugin`, enabling blocks | ||
| stored in S3 to be retrieved by the block node for gap-fill or disaster recovery. The MVP | ||
| is write-only. | ||
|
|
There was a problem hiding this comment.
I don't think we want to provide read for this. The intent of the expanded cloud plugin is to be pure write-only forever.
Read should always come from block nodes with local storage under normal operation.
In a DR scenario, it's the archive plugin that will provide network recovery, not the expanded (short lived) plugin (which doesn't have everything).
There was a problem hiding this comment.
Left over design considerations but as you see it was not implemented.ill remove mention
ata-nas
left a comment
There was a problem hiding this comment.
Some comments from first pass.
| /** | ||
| * Constructor for the BlockNodeApp class with a custom environment variable getter. | ||
| * | ||
| * <p><b>Note:</b> This overload exists to support integration and E2E tests that need to | ||
| * inject S3 or other environment-variable-driven config without mutating the real JVM | ||
| * environment. It should not be used in production code; production should use | ||
| * {@link #BlockNodeApp(ServiceLoaderFunction, boolean)} which defaults to | ||
| * {@code System::getenv}. | ||
| * | ||
| * @param serviceLoader Optional function to load the service loader, if null then the default will be used | ||
| * @param shouldExitJvmOnShutdown if true, the JVM will exit on shutdown, otherwise it will not | ||
| * @param envVarGetter function to retrieve environment variable values; {@code System::getenv} | ||
| * in production, or a custom map lookup in tests | ||
| * @throws IOException if there is an error starting the server | ||
| */ | ||
| public BlockNodeApp( | ||
| final ServiceLoaderFunction serviceLoader, | ||
| final boolean shouldExitJvmOnShutdown, | ||
| final Function<String, String> envVarGetter) | ||
| throws IOException { |
There was a problem hiding this comment.
Same question. This seems like something we should not be doing.
...e/app/src/testFixtures/java/org/hiero/block/node/app/fixtures/plugintest/PluginTestBase.java
Outdated
Show resolved
Hide resolved
| * Set {@code expanded.cloud.storage.endpointUrl} to a non-blank value to enable the plugin. | ||
| * Leave it blank (the default) to disable the plugin at startup. |
There was a problem hiding this comment.
I think that leaving the endpoint blank is generally a misuse. If the plugin is present, it should be because it is intended to be used. A non-present plugin is one that we can treat effectively as "disabled".
My suggestion is that here, we describe situations in which, due to misconfiguration, the plugin will be inactive, but it is not disabled, per se.
There was a problem hiding this comment.
Agreed, i explored out of plugin prior to sharing bucky and missed removing this logic
| String bucketName, | ||
|
|
||
| @Loggable @ConfigProperty(defaultValue = "blocks") String objectKeyPrefix, | ||
| @Loggable @ConfigProperty(defaultValue = "STANDARD") String storageClass, |
There was a problem hiding this comment.
This string for storageClass could be an enum. This will help detect wrong values as well.
| * BN is scoped to STANDARD and relies on bucket lifecycle policies for archive tiers. | ||
| * To expand this set, add options from the S3 API {@code x-amz-storage-class} header docs. | ||
| */ | ||
| static final Set<String> VALID_STORAGE_CLASSES = Set.of("STANDARD"); |
There was a problem hiding this comment.
If we have this an enum, validation will be automatic.
| * | ||
| * @param s3Client the upload client to use instead of creating one from config | ||
| */ | ||
| ExpandedCloudStoragePlugin(@NonNull final S3UploadClient s3Client) { |
There was a problem hiding this comment.
We should be able to remove this constructor. We should probably have a way to call the noargs constructor and still be able to inject a test client. Something to think about in a future improvement. We generally try to remove all constructors, especially test-only ones.
| if (s3Client == null || completionService == null) { | ||
| return; | ||
| } | ||
|
|
||
| // Drain results from previously submitted tasks before queuing new work. | ||
| drainCompletedTasks(); | ||
|
|
||
| if (!notification.success()) { | ||
| LOGGER.log( | ||
| TRACE, "Skipping upload for block {0}: verification did not succeed.", notification.blockNumber()); | ||
| return; | ||
| } | ||
| final long blockNumber = notification.blockNumber(); | ||
| if (blockNumber < 0) { | ||
| LOGGER.log(TRACE, "Skipping upload: invalid block number {0}.", blockNumber); | ||
| return; | ||
| } | ||
| final BlockUnparsed block = notification.block(); | ||
| if (block == null) { | ||
| LOGGER.log(WARNING, "Skipping upload for block {0}: block payload is null.", blockNumber); | ||
| return; | ||
| } |
There was a problem hiding this comment.
This whole construct would be much easier to read as an if-else. Then, as a final else clause, we can submit the task, if we've reached this far. In this way we will also get rid of returns. I find that much safer.
There was a problem hiding this comment.
Hmm, each guard covers a distinct precondition failure. . Collapsing it into nested makes it harder to protect against I think
There was a problem hiding this comment.
Multiple ifs that contain a return inside them is already an if-else. The way I read this is for instance:
if (s3Client == null || completionService == null) {
... do something
} else if (!notification.success()) {
... do something
}
...
else {
your main logic here
}
It is ok to leave it like you have it. This is a nit and a personal opinion. I think if-else construct is much easier to read and will not have returns, making it less error prone for future modification.
There was a problem hiding this comment.
Another way to improve readability, in my opinion, would be to nest some of these, starting with notification result at the top, if successful, then inside check for null client or service, if not, then inside check for block number et.c etc.
Again, this is a suggestion, not strictly asking for changes.
| } catch (final InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| LOGGER.log(WARNING, "Interrupted while draining upload results.", e); | ||
| } catch (final ExecutionException e) { |
There was a problem hiding this comment.
We neglect to catch a cancellation exception. If a task is cancelled, it is still done, but will throw a cancellation exception when we call Task#get().
Another way to do this is before calling get, we can check if the task was cancelled, we know it is done because of the poll on the completion service.
| * source tree override {@link #uploadFile} to capture or throw as needed — no Docker | ||
| * or real S3 endpoint required for unit tests. | ||
| */ | ||
| abstract class S3UploadClient implements AutoCloseable { |
There was a problem hiding this comment.
Curious, why an abstract class here with an anonymous instance?
There was a problem hiding this comment.
This was an initial approach to make it easy to support the use of bucky whiles making it easy to test the flow also
There was a problem hiding this comment.
Couldn't you have declared it an interface?
| abstract class S3UploadClient implements AutoCloseable { | |
| interface S3UploadClient extends AutoCloseable { |
5e3394d to
4ab9f62
Compare
Add build.gradle.kts and module-info.java for the new expanded-cloud-storage plugin module (org.hiero.block.node.expanded.cloud.storage). Module mirrors s3-archive structure: exports config class to com.swirlds.config.impl / com.swirlds.config.extensions / app, and provides BlockNodePlugin. Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
Defines the S3Client interface mirroring hedera-bucky's public API so the plugin can be fully built and tested without a hedera-bucky dependency. When hedera-bucky ships, a thin HederaBuckyS3ClientAdapter implementing this interface is all that is needed — no plugin logic changes required. S3ClientException is the checked exception base. NoOpS3Client is the no-op stub: logs at INFO, returns empty/null, never throws — used in unit tests. Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
Production bridge between the new S3Client interface and the concrete org.hiero.block.node.base.s3.S3Client. Translates base-module S3ClientException at the adapter boundary so plugin logic never imports base-module exceptions. This adapter provides real S3 connectivity until hedera-bucky is available on Maven Central, at which point it is replaced by a HederaBuckyS3ClientAdapter — zero changes to plugin logic needed. Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
ExpandedCloudStorageConfig: @ConfigData("expanded.cloud.storage")
record with endpointUrl (blank = disabled), bucketName, objectKeyPrefix,
storageClass, regionName, accessKey, secretKey.
ExpandedCloudStoragePlugin: BlockNodePlugin + BlockNotificationHandler.
Listens for PersistedNotification, fetches ZSTD-compressed block bytes,
and uploads one .blk.zstd object per block to any S3-compatible store.
Object key format: {objectKeyPrefix}/{19-digit-zero-padded-number}.blk.zstd
e.g. blocks/0000000000001234567.blk.zstd
Guards: blank endpointUrl disables at startup; failed notifications and
negative block numbers are skipped; S3ClientException is logged as
WARNING and never rethrown.
Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
NoOpS3ClientTest: 7 contract tests — every method must not throw and must return the documented empty/null value. ExpandedCloudStoragePluginTest: - Unit tests (CapturingS3Client injected via package-private ctor): correct 19-digit zero-padded object key, storageClass, contentType; failed-notification skip; S3ClientException swallowed; blank endpointUrl disables plugin. - Integration tests (Testcontainers + MinIO): blocks appear as individual .blk.zstd objects in bucket; downloaded bytes are non-empty. Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
Adds project(":expanded-cloud-storage") to the blockNodePlugins
configuration in block-node/app/build.gradle.kts so the plugin is
included in the application distribution.
Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
Captures purpose, goals, terms, entity catalogue, upload flow, object-key format, enabled/disabled guard, hedera-bucky swap path, Mermaid sequence + class diagrams, configuration table, metrics (MVP + follow-on), exception-handling table, and 8 acceptance tests. Follows docs/design/design-doc-template.md and is placed under docs/design/persistence/ alongside block-persistence.md. Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
…lause ExpandedCloudStoragePlugin now implements handleVerification() instead of handlePersisted(). This allows cloud upload and local file storage (blocks-file-recent) to run in parallel — each registered handler gets its own virtual thread. Block bytes come directly from notification.block() via BlockUnparsed.PROTOBUF.toBytes() + CompressionType.ZSTD.compress(), removing the historicalBlockProvider dependency and any data-not-yet-written race. NoOpS3Client: add "throws S3ClientException, IOException" to all overridden methods to match the S3Client interface. Without this, anonymous subclasses in tests could not re-declare S3ClientException in their override — a compile error that caused s3ExceptionSwallowed() to fail. Tests updated throughout: PersistedNotification → VerificationNotification; start() is called before assertDoesNotThrow (not inside it) so the assertion only wraps handleVerification(); CapturingS3Client.uploadFile() now correctly declares the throws clause to match its parent. Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
…lidation, and credential docs to ExpandedCloudStorageConfig Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
…PersistedNotification publishing Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
…tedNotification assertions Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
…priority 0, remove dead TimeoutException catch Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
Replace the interim BaseS3ClientAdapter (wrapping the base-module S3Client) with BuckyS3ClientAdapter, a thin delegation wrapper around com.hedera.bucky.S3Client (bucky-client 0.1.0-rc1). Changes: - Add com.hedera.bucky:bucky-client:0.1.0-rc1 to hiero-dependency-versions - Add requires com.hedera.bucky to expanded-cloud-storage module-info - Add BuckyS3ClientAdapter (production S3Client implementation) - Delete BaseS3ClientAdapter and local S3ClientException - Update S3Client interface throws to com.hedera.bucky.S3ClientException - Update NoOpS3Client, SingleBlockStoreTask, ExpandedCloudStoragePlugin to use com.hedera.bucky exception hierarchy - Update design doc: reflect BuckyS3ClientAdapter, S3ResponseException richer details (status code, body, headers), updated diagrams/tables No plugin logic changes required — the S3Client interface abstraction held exactly as designed. Closes the hedera-bucky swap path noted in the design doc. Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
…rchy - Replace local S3ClientException throws with com.hedera.bucky.S3ClientException and com.hedera.bucky.S3ResponseException in anonymous/inner test clients - Replace the single 'failurePublishesPersistedNotificationWithFalse' test with responseExceptionProducesFailedNotification (HTTP 503 via S3ResponseException) - Add responseExceptionForbiddenNotRethrown: verifies HTTP 403 S3ResponseException does not propagate from handleVerification - Add responseExceptionDetailsAreNotRethrown: verifies all common 4xx/5xx status codes are swallowed, and asserts getResponseStatusCode()/getResponseBody() on the richer S3ResponseException carry the expected values - Update integration test comment: BuckyS3ClientAdapter replaces BaseS3ClientAdapter Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
Reduce S3Client to uploadFile + close — the only two methods called by ExpandedCloudStoragePlugin and SingleBlockStoreTask. Remove the five unused methods (uploadTextFile, downloadTextFile, listObjects, listMultipartUploads, abortMultipartUpload) from the interface and from BuckyS3ClientAdapter. Move NoOpS3Client from src/main/java to src/test/java; it is a test helper and has no place in the production module. Make BuckyS3ClientAdapter package-private since it is only ever instantiated by the plugin. Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
Collapse the seven per-method test cases into one test that covers the two remaining methods (uploadFile and close). Remove tests for the five methods that were deleted from the interface. Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
Add six tests covering previously untested paths: - configBoundsValidation: uploadTimeoutSeconds=0 and maxConcurrentUploads=0 both reject at config construction time - allValidStorageClassesAccepted: all seven S3 storage class values pass config validation (only STANDARD was exercised before) - handleVerificationGuardsSkipUpload: null block payload and negative block number are both silently skipped with no upload and no notification - ioExceptionProducesFailedNotification: IOException from uploadFile is caught and surfaces as PersistedNotification(succeeded=false) - s3ClientExceptionProducesFailedNotification: base S3ClientException similarly produces succeeded=false (previously only no-rethrow was asserted) - stopClosesS3Client: plugin.stop() must call close() on the injected client Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
4ab9f62 to
064e210
Compare
Not up to standards ⛔🔴 Issues
|
| Category | Results |
|---|---|
| BestPractice | 2 medium 16 minor |
| Documentation | 4 minor |
| Security | 2 critical |
| CodeStyle | 13 minor |
🟢 Metrics 169 complexity · 35 duplication
Metric Results Complexity 169 Duplication 35
TIP This summary will be updated as you push new changes. Give us feedback
Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
ata-nas
left a comment
There was a problem hiding this comment.
Great progress! Some additional cleanup items and suggestions.
Can be considered as a follow up, if preferred.
block-node/app/build.gradle.kts
Outdated
| // Extended functionality | ||
| blockNodePlugins(project(":backfill")) | ||
| blockNodePlugins(project(":s3-archive")) | ||
| blockNodePlugins(project(":expanded-cloud-storage")) |
There was a problem hiding this comment.
Do we really want to add this one immediately?
Also, maybe a shorter name for the project would be:
| blockNodePlugins(project(":expanded-cloud-storage")) | |
| blockNodePlugins(project(":cloud-expanded")) |
This should fit in well with cloud-archive
There was a problem hiding this comment.
I like a shorter name. @jsync-swirlds any concerns.
Also yeah I'll drop it from plugins so it doesn't get immediately loaded. Let me double check how that affects testing
There was a problem hiding this comment.
I think we need all three words.
"cloud-expanded" doesn't really tell me what the plugin does (and the ordering is backward for english grammar).
We could flip to "cloud-storage-expanded" if we prefer the different ordering, but I don't think the name is sufficiently descriptive in the shorter form.
| @@ -0,0 +1,62 @@ | |||
| // SPDX-License-Identifier: Apache-2.0 | |||
| package org.hiero.block.node.expanded.cloud.storage; | |||
There was a problem hiding this comment.
@ivannov is currently working on the archive variant of cloud.
In his PR, we found out that the convention for package naming that fits best is:
org.hiero.block.node.cloud.archive
So, have the base org.hiero.block.node, then cloud and then followed by the cloud type, for instance he has archive. You have expanded.
We can consider to consolidate those package conventions as it will be better for modules as well:
| package org.hiero.block.node.expanded.cloud.storage; | |
| package org.hiero.block.node.cloud.expanded; |
Your modlue name would be: org.hiero.block.node.cloud.expanded
There was a problem hiding this comment.
I'd recommend org.hiero.block.node.storage.cloud.expanded
Similar to other comments, leaving out storage makes it ambiguous.
archive implies storage, but expanded doesn't.
If we could find a single word that indicates both storage and individual files better; that would be good, but without that I think expanded by itself just isn't enough.
| /// @param uploadTimeoutSeconds maximum seconds to wait for in-flight uploads during | ||
| /// `stop()` before treating them as failed. Default: 60. | ||
| // spotless:off | ||
| @ConfigData("expanded.cloud.storage") |
There was a problem hiding this comment.
As config key name, I would suggest:
| @ConfigData("expanded.cloud.storage") | |
| @ConfigData("cloud.expanded") |
And @ivannov's archive would be cloud.archive
There was a problem hiding this comment.
Same here, I think we need to keep storage because it's just not clear enough without that.
| public void handleVerification(@NonNull final VerificationNotification notification) { | ||
| if (s3Client == null || completionService == null) { | ||
| return; | ||
| } else if (!notification.success()) { | ||
| LOGGER.log( | ||
| TRACE, "Skipping upload for block {0}: verification did not succeed.", notification.blockNumber()); | ||
| return; | ||
| } else if (notification.blockNumber() < 0) { | ||
| LOGGER.log(TRACE, "Skipping upload: invalid block number {0}.", notification.blockNumber()); | ||
| return; | ||
| } else if (notification.block() == null) { | ||
| LOGGER.log(WARNING, "Skipping upload for block {0}: block payload is null.", notification.blockNumber()); | ||
| return; | ||
| } |
There was a problem hiding this comment.
Nit: we can omit all returns if we simply put the logic below this construct in an else.
| final Future<SingleBlockStoreTask.UploadResult> completed = | ||
| completionService.poll(Math.min(remainingMs, 200L), TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Nit: is it better if we keep polling until we return null? Or is the idea here that we want to ensure some things are awaited for?
There was a problem hiding this comment.
Yeah, polling is intentional here as we want to give in flight tasks an opportunity to complete gracefully before we close the s3 client.
|
|
||
| s3Client.uploadFile(objectKey, storageClass, new PayloadIterator(compressed), CONTENT_TYPE); | ||
| LOGGER.log(TRACE, "Block {0}: uploaded to {1}", blockNumber, objectKey); | ||
| return new UploadResult(blockNumber, true, compressed.length, blockSource, System.nanoTime() - uploadStartNs); |
There was a problem hiding this comment.
Tip: consider to use an enum for the result instead of a boolean. As we've been recently discovering, results and notifications are rarely binary. In fact, even here we see two different types of failures, one IO and the other S3. Having an enum allows us to handle different failures (or successes) in a different manner. Also, changes to the enum in the future will not compile until handled everywhere. Since we've just introduced this, now is a great time to think about this, even if currently we only need 2 options.
| private static final class PayloadIterator implements Iterator<byte[]> { | ||
| private final byte[] payload; | ||
| private boolean delivered = false; | ||
|
|
||
| PayloadIterator(final byte[] payload) { | ||
| this.payload = payload; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean hasNext() { | ||
| return !delivered; | ||
| } | ||
|
|
||
| @Override | ||
| public byte[] next() { | ||
| if (delivered) throw new NoSuchElementException(); | ||
| delivered = true; | ||
| return payload; | ||
| } | ||
| } |
There was a problem hiding this comment.
We can consider to extend bucky to allow the upload to be done without such iterator. Could be a follow up.
| | M | `verification_block_time` | If value exceeds 20s, otherwise, configure as needed | | ||
| | M | `files_recent_persistence_time_latency_ns` | If value exceeds 20s, otherwise, configure as needed | | ||
|
|
||
| **Archive**: Alerts for metrics regarding file archival |
There was a problem hiding this comment.
| **Archive**: Alerts for metrics regarding file archival | |
| **Cloud Storage**: Alerts for metrics regarding cloud storage. |
This is not archive storage type.
| appCoreRuntime(project(":app")) | ||
| testPlugins(project(path = ":app", configuration = "blockNodePlugins")) | ||
| implementation(project(":common")) | ||
| implementation(project(":base")) |
There was a problem hiding this comment.
Why do we need the base here?
There was a problem hiding this comment.
The E2E uses the S3Client from base to verify existence of some files, hence the dependency.
| runtimeOnly("org.hiero.block.node.backfill") | ||
| runtimeOnly("org.hiero.block.node.archive.s3cloud") | ||
| runtimeOnly("org.hiero.block.node.expanded.cloud.storage") | ||
| runtimeOnly("s3mock.testcontainers") |
There was a problem hiding this comment.
Are we sticking with s3mock?
There was a problem hiding this comment.
Yes, from my exploration (I looked at LocalStack, Adobe S3Mock and Zenko CloudServer amongst others) s3Mock seems like a good balance and was chosen for the following reasons:
- Focused scope — implements S3-only, so it starts faster and consumes fewer
resources than LocalStack (which emulates ~70 AWS services). - JUnit 5 testcontainers integration — the
S3MockContainerclass integrates
natively with@BeforeAll/@AfterAll, matching our test pattern. - Realistic compatibility — it faithfully implements multipart PUT, bucket
list, and the S3 REST error codes we need to test.
I can share a comparison doc but it's also easy enough to replace if we find an even better solution
...d-expanded/src/main/java/org/hiero/block/node/cloud/expanded/ExpandedCloudStorageConfig.java
Show resolved
Hide resolved
Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
Anonymous-class overrides of BlockNodeApp.getEnvVar() do not work for config injection because Java sets captured local variables (val$field) AFTER the super() constructor call. AutomaticEnvironmentVariableConfigSource is built inside BlockNodeApp's constructor, so the captured map is still null when getEnvVar is invoked — causing the config source to see blank values for all cloud.expanded.* properties, which makes BuckyS3UploadClient throw S3ClientInitializationException and the plugin become inactive. Fix: replace the anonymous-class pattern with System.setProperty using the raw config property names (e.g. cloud.expanded.endpointUrl). These are read by SystemPropertiesConfigSource whose ordinal is 400, outranking the AutomaticEnvironmentVariableConfigSource (300) and file sources (50-100). Additionally adds the missing cloud.expanded.regionName property (us-east-1), which bucky's S3Client requires to be non-blank, and cleans up tearDown to always clear the properties even when the blank-endpoint test overrides them. The nodeRemainsHealthyWhenCloudPluginEndpointIsBlank test is also corrected: it now specifically sets endpointUrl='' while keeping other properties valid, accurately testing the scenario where only the endpoint is absent. Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
…E test Each block's BlockProof must reference the previous block's actual hash. Without chaining, block 1+ fail verification: the server tracks block 0's computed hash and rejects a proof that claims the previous hash was all-zeros. This caused the server to send END_STREAM after block 1, leaving block 2 unacknowledged and the test timing out. Fix uses BlockItemBuilderUtils.createSimpleBlockWithNumber(blockNumber, previousHash) and threads computeBlockHash(blockNumber, previousHash) forward across the loop. The duplicate of block 2 also uses the same previousHash to match exact content. Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
The protected getEnvVar() override point was added to support E2E test config injection via anonymous subclass. That approach was broken: Java assigns anonymous-class captured fields only after super() returns, so the captured map was null when AutomaticEnvironmentVariableConfigSource was built inside the BlockNodeApp constructor. SystemPropertiesConfigSource (ordinal 400) has always been registered in the config builder and reliably outranks all other sources, making System.setProperty the correct injection mechanism. getEnvVar() provided no working benefit and its Javadoc invited repeating the same broken pattern. Replace this::getEnvVar with System::getenv directly and delete the method. Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
…nd tests
Production code:
- BuckyS3UploadClient: add /// {@inheritdoc} on uploadFile override; add
closing comment on close(); document the bucky field
- ExpandedCloudStoragePlugin: add one-line /// Javadoc to each public metric
key constant so their purpose is clear at the call site
- ExpandedCloudStorageConfig: correct env-var names in credential docs
(AutomaticEnvironmentVariableConfigSource maps cloud.expanded.accessKey
to CLOUD_EXPANDED_ACCESS_KEY, not AWS_ACCESS_KEY_ID); remove stale
"Enabling the plugin" duplicate blurb already covered by the record Javadoc
Test code:
- SingleBlockStoreTaskTest: document START_TIME and ONE_DAY constants;
add Javadoc to successClient(), throwingS3Client(), throwingIoClient()
- ExpandedCloudStoragePluginTest: add Javadoc to testBlock(),
verifiedNotification(), failedNotification(); document the constructor
executor choice
- ExpandedCloudStoragePluginIntegrationTest: add Javadoc to startS3Mock(),
stopS3Mock(), constructor, testBlock(), verifiedNotification(),
awaitNotifications(), listAllObjects(); add inline comment on the
@disabled test explaining why S3Mock does not enforce auth
Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
…adClient directly The factory method violated dependency inversion: the abstraction (S3UploadClient) knew about its concrete implementation (BuckyS3UploadClient). It provided no selection logic — always returning the same type — so it added coupling with no benefit. The real testing seam is the package-private ExpandedCloudStoragePlugin(S3UploadClient) constructor, which is unaffected. Production start() now calls new BuckyS3UploadClient(config) directly; S3UploadClient is now a pure abstract type with no implementation knowledge. Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
Nana-EC
left a comment
There was a problem hiding this comment.
1st pass of complete logic
| /// | ||
| /// @param config the plugin configuration supplying endpoint, bucket, region, and credentials | ||
| /// @throws com.hedera.bucky.S3ClientInitializationException if the underlying bucky client | ||
| /// cannot be initialised (e.g. invalid credentials, unreachable endpoint) |
There was a problem hiding this comment.
I don't think bucky has support for credential verification
| @Loggable @ConfigProperty(defaultValue = "") String regionName, | ||
| @ConfigProperty(defaultValue = "") String accessKey, | ||
| @ConfigProperty(defaultValue = "") String secretKey, | ||
| @Loggable @ConfigProperty(defaultValue = "60") @Min(1) int uploadTimeoutSeconds) { |
There was a problem hiding this comment.
Need to investigate this config validation as the test doesn't fail when uploadTimeoutSeconds is 0
There was a problem hiding this comment.
Validation is handled by the library load mechanisms.
Are you creating an instance of this class directly, or using the TestConfigBuilder to initialize the library?
Note that the validation is not enabled when directly setting the value, only when loaded from properties, environment, or other sources.
The library mechanisms are thoroughly tested in the library itself; so it may not be worth testing that validation ourselves.
block-node/cloud-expanded/src/main/java/org/hiero/block/node/cloud/expanded/S3UploadClient.java
Outdated
Show resolved
Hide resolved
…adClient API S3UploadClient.uploadFile previously declared throws com.hedera.bucky.S3ClientException, leaking a concrete library type through the abstraction into every caller (SingleBlockStoreTask, all test S3UploadClient subclasses). BuckyS3UploadClient's constructor had the same leak into ExpandedCloudStoragePlugin.start(). Fix: add package-private UploadException that wraps any bucky exception. BuckyS3UploadClient now catches S3ClientInitializationException (constructor) and S3ClientException (uploadFile) and rethrows as UploadException. No other class in the package imports com.hedera.bucky. Test impact: - Fake S3UploadClient subclasses now throw UploadException instead of S3ClientException - Four bucky-specific tests consolidated into two (uploadExceptionProducesFailedNotification, uploadExceptionNotRethrown) — tests that verified S3ResponseException HTTP status-code properties were testing bucky internals through the wrong layer and are removed - Unused bucky imports removed from all test files Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
…ctive test Design doc updates: - Terms: add BuckyS3UploadClient and UploadException entries; update S3UploadClient entry to remove the now-deleted getInstance factory; clarify that bucky types are contained within BuckyS3UploadClient only; remove S3ResponseException as a term (it is now a bucky implementation detail) - Entities: add BuckyS3UploadClient and UploadException sections; update S3UploadClient to show UploadException/IOException signature (no longer bucky types); update SingleBlockStoreTask to reference UploadException/IOException; add UploadStatus table - Exceptions table: replace three bucky-type rows with two rows (UploadException, IOException); add note that BuckyS3UploadClient is the sole bucky importer - Class diagram: replace anonymous ProductionS3UploadClient with BuckyS3UploadClient; remove getInstance factory; add UploadException node; update uploadFile signatures - Configuration: fix all defaults (bucketName, objectKeyPrefix, regionName all default to "" not the values previously documented); add credential options section - Acceptance tests: replace S3ResponseException/S3ClientException items with UploadException/IOException items; add item 7 (blank endpoint → plugin inactive); add item 12 (stop drains before close) Test: add pluginInactiveOnBlankEndpoint — exercises the production BuckyS3UploadClient constructor path: blank endpointUrl → UploadException → plugin inactive → no-op on handleVerification. Covers acceptance test item 7. Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
Signed-off-by: Nana Essilfie-Conduah <nana@swirldslabs.com>
| LOGGER.log( | ||
| TRACE, "Skipping upload for block {0}: verification did not succeed.", notification.blockNumber()); | ||
| } else if (notification.blockNumber() < 0) { | ||
| LOGGER.log(TRACE, "Skipping upload: invalid block number {0}.", notification.blockNumber()); |
There was a problem hiding this comment.
This is a bug. The log should be INFO.
| } | ||
| Future<SingleBlockStoreTask.UploadResult> completed; | ||
| while ((completed = completionService.poll()) != null) { | ||
| publishResult(completed); |
There was a problem hiding this comment.
We might want to (perhaps in future work) ensure these results are published in block number order so we don't accidentally publish success for (e.g.) block 467 and then fail to upload block 440.
| while (inFlightCount.get() > 0 && System.currentTimeMillis() < deadline) { | ||
| final long remainingMs = deadline - System.currentTimeMillis(); | ||
| final Future<SingleBlockStoreTask.UploadResult> completed = | ||
| completionService.poll(Math.min(remainingMs, 200L), TimeUnit.MILLISECONDS); | ||
| if (completed != null) { | ||
| publishResult(completed); | ||
| } | ||
| } |
There was a problem hiding this comment.
This is somewhat common, and not ideal.
Instead of trying to poll every task in a Completion service (and having to track how many tasks you think are running), we can take advantage of the executor service that the CompletionService uses to actually execute tasks.
That has slightly more complexity in the drain process, but it's also significantly more robust and doesn't require directly tracking in flight tasks.
| Thread.currentThread().interrupt(); | ||
| LOGGER.log(WARNING, "Interrupted while draining upload results.", e); | ||
| } catch (final ExecutionException e) { | ||
| LOGGER.log(WARNING, "Unexpected exception in upload task: ", e); |
There was a problem hiding this comment.
For execution exception, there is always a cause.
we could log that here for better clarity in logs.
| LOGGER.log(WARNING, "Unexpected exception in upload task: ", e); | |
| LOGGER.log(WARNING, "Unexpected exception in upload task: ", e.getCause()); |
Reviewer Notes
Introduces the
expanded-cloud-storageplugin — a newBlockNodePluginthat uploads eachindividually-verified block as a single ZSTD-compressed Protobuf object (
.blk.zstd) to anyS3-compatible object store (AWS S3, GCS S3-interop, etc.).
Unlike
s3-archive, which batches blocks into tar archives, this plugin uploads one objectper block, making blocks immediately queryable at block-level granularity in the cloud.
BlockNodeAppgains a 3-arg constructor accepting a customenvVarGetter(Function<String, String>) to allow E2E tests to inject S3Mock coordinates without mutating JVM environmentvariables.
PluginTestBase.tearDown()— defensive null check onmetricsProviderto handle test setupsthat don't initialise it.
ExpandedCloudStoragePlugin—BlockNodePlugin+BlockNotificationHandler; reacts toVerificationNotification, builds the S3 object key, and submits each block as aSingleBlockStoreTaskto aCompletionServicebacked by a virtual-thread executor.SingleBlockStoreTask—Callablethat serialises the block to Protobuf, ZSTD-compresses it,and uploads it with a configurable timeout enforced via
Future.get(timeout, SECONDS).S3UploadClient— package-private abstract class wrappingcom.hedera.bucky.S3Client; testssubclass it directly (no Mockito, bucky is
final).ExpandedCloudStorageConfig— Swirlds config record underexpanded.cloud.storage.*; validatedat construction (storageClass, timeout bounds).
Configuration reference
expanded.cloud.storage.endpointUrl""expanded.cloud.storage.bucketNameblock-node-blocksexpanded.cloud.storage.objectKeyPrefixblocksexpanded.cloud.storage.storageClassSTANDARDexpanded.cloud.storage.regionNameus-east-1expanded.cloud.storage.accessKey""expanded.cloud.storage.secretKey""expanded.cloud.storage.uploadTimeoutSeconds60expanded.cloud.storage.maxConcurrentUploads4Testing
PersistedNotificationsuccess/failure,stop()closes clientassumeTrueif unavailable)PersistedNotificationpublished on success, bare key when prefix is emptyBlockNodeApp+ gRPC publish + S3MockFollow-on work
uploads_total,upload_failures_total,upload_bytes_total) are planned for a follow-on issue.Related Issue(s)
Fixes #2414