Skip to content

Enable REST protocol between Aggregator and Collaborator#1500

Merged
teoparvanov merged 1 commit intosecurefederatedai:developfrom
ishaileshpant:native_rest_api
May 21, 2025
Merged

Enable REST protocol between Aggregator and Collaborator#1500
teoparvanov merged 1 commit intosecurefederatedai:developfrom
ishaileshpant:native_rest_api

Conversation

@ishaileshpant
Copy link
Copy Markdown
Collaborator

@ishaileshpant ishaileshpant commented Mar 27, 2025

  • add new AggregatorClientInterface to allow switching b/w grpc and rest

  • endhance existing AggregatorGRPCClient to start using AggregatorClientInterface

  • added new transport package for rest with AggregatorRESTClient implementing AggregatorClientInterface

  • added streaming api support with custom content-type

  • added various connection flag for streaming request

  • send additional header key "Sender" for better request logging at server side

  • aligned Rest and gRPC client for most of the init params

  • added AggregatorRESTServer and necesary changes in aggregator cli and federated/plan get_server method

  • added transport_protocol settings in defaults/network.yaml, defaulted the same to 'grpc'

  • reduced cyclomatic complexity of Rest Server

  • fixed protobuf streaming issue for v1/task/results API

  • added more detailed logging for task progression and metadata for each api calls

  • pinned Flask version to latest stable 3.1.0

  • addressing review comments - 13th-May

  • added ping api and collaborato constructor hint for AggregatorClientInterface

  • added send_message_to_server in client and AggregatorClientInterface, Rest Server is already at parity

  • changed base uri for REST server to 'experimental/v1', adjusted the client and tests accordingly

  • fixed issue related to mTLS in REST server/client

  • disabled TLS 1.2 in both server/client
    rebased 21st.May.1

  • Add AggregatorRESTServer

  • Remove collaborator cli support for protocol switch, this should be done via Plan/Aggregator

  • Add extensive logging for both REST server and client

  • Transparently create either REST or gRPC server and client binding when running a federation based on Plan config

  • Add Unit level test for existing modules

  • Add unit test for new modules that are added

  • Remove debug logs added for testing purposed

  • Manual testing of various config and status update in PR

  • Refactor verbose debug/info level log messages and metadata that gets logged

  • Known issue - mTLS communication is not working - when tls and client_auth both are enabled - Debug in progress

  • Validate that FedEval feature works as well with REST protocol

  • Validate with at-least two workspaces manually for REST :

Summary

Enable rest protocol support for TaskRunner API

Type of Change (Mandatory)

  • Feature enhancement

Description (Mandatory)

This PR adds REST protocol support for TaskRunner API there by allowing running federation with REST protocol for seamless integration with managed/public API gateways

Unit Test report

image

Full Unit test report

image
image

@ishaileshpant ishaileshpant force-pushed the native_rest_api branch 6 times, most recently from 541fe87 to af43e36 Compare April 2, 2025 19:39
@ishaileshpant ishaileshpant marked this pull request as ready for review April 15, 2025 08:48
@ishaileshpant ishaileshpant force-pushed the native_rest_api branch 4 times, most recently from cc563d9 to ff9e063 Compare April 17, 2025 20:05
@ishaileshpant ishaileshpant force-pushed the native_rest_api branch 4 times, most recently from 6683e0d to e615e1d Compare April 24, 2025 08:59
@ishaileshpant ishaileshpant changed the title [WIP - do not review] Enable REST protocol between Aggregator and Collaborator Enable REST protocol between Aggregator and Collaborator Apr 24, 2025
@ishaileshpant ishaileshpant force-pushed the native_rest_api branch 2 times, most recently from d77306c to 0beb7fd Compare April 25, 2025 09:54
@ishaileshpant
Copy link
Copy Markdown
Collaborator Author

@psfoley / @teoparvanov is it fine if this refactor / task 'Retry mechanism config pullout, remove the hardcoding' be taken in a subsequent PR ?

@ishaileshpant ishaileshpant force-pushed the native_rest_api branch 2 times, most recently from 77ba225 to bde8f0c Compare May 13, 2025 10:39
@payalcha
Copy link
Copy Markdown
Collaborator

@ishaileshpant
https://github.com/securefederatedai/openfl/pull/1612/files#diff-13d501ef02760f7d8b8de7d82adc709762a4d2bb56aef8cdbbf8ddf06b4daafbR54
Please uncomment the test once PR get merged. So that basic no-op connectivity will get tested with your PR.

@ishaileshpant ishaileshpant force-pushed the native_rest_api branch 5 times, most recently from 83aad3c to 74f2340 Compare May 14, 2025 11:42
@kminhta
Copy link
Copy Markdown
Collaborator

kminhta commented May 14, 2025

Thanks @ishaileshpant ! This is looking good. Btw, can you manually run https://github.com/ishaileshpant/openfl/actions/workflows/task_runner_flower_e2e.yml on your branch? I don't anticipate any major issues since it'll just run gRPC, but it'll be good to catch any unforeseen issues early

@ishaileshpant
Copy link
Copy Markdown
Collaborator Author

Thanks @ishaileshpant ! This is looking good. Btw, can you manually run https://github.com/ishaileshpant/openfl/actions/workflows/task_runner_flower_e2e.yml on your branch? I don't anticipate any major issues since it'll just run gRPC, but it'll be good to catch any unforeseen issues early

Thanks for the inputs @kta-intel - have just validated and the default workflow works https://github.com/ishaileshpant/openfl/actions/runs/15036703510/job/42259707667 better to validate this with protocol set to 'rest' - wdyt?

@kminhta
Copy link
Copy Markdown
Collaborator

kminhta commented May 18, 2025

Thanks for the inputs @kta-intel - have just validated and the default workflow works https://github.com/ishaileshpant/openfl/actions/runs/15036703510/job/42259707667 better to validate this with protocol set to 'rest' - wdyt?

thanks for checking, and yes I agree it would be good to validate with rest

@ishaileshpant
Copy link
Copy Markdown
Collaborator Author

Thanks for the inputs @kta-intel - have just validated and the default workflow works https://github.com/ishaileshpant/openfl/actions/runs/15036703510/job/42259707667 better to validate this with protocol set to 'rest' - wdyt?

thanks for checking, and yes I agree it would be good to validate with rest

Validated with 'rest' protocol (w/o client auth) https://github.com/ishaileshpant/openfl/actions/runs/15099298504/job/42438127648

@payalcha
Copy link
Copy Markdown
Collaborator

RestAPI is not working as expected - Collaborator is not able to connect with Aggregator

[15:54:02] [plan.py:240] INFO Building `openfl.pipelines.NoCompressionPipeline` Module.
[15:54:02] [aggregator_client.py:130] WARNING Initializing Aggregator REST Client (EXPERIMENTAL API - Not for production use)
[15:54:02] [plan.py:240] INFO Building `openfl.component.Collaborator` Module.
[15:54:02] [collaborator.py:130] WARNING Argument `device_assignment_policy` is deprecated and will be removed in the future.
[15:54:02] [aggregator_client.py:295] ERROR Certificate unknown error - this usually means the server's certificate is not trusted
[15:54:02] [aggregator_client.py:299] ERROR Please verify that:
[15:54:02] [aggregator_client.py:300] ERROR 1. The root certificate contains all necessary intermediate certificates
[15:54:02] [aggregator_client.py:301] ERROR 2. The server's certificate is properly signed by a trusted CA
[15:54:02] [aggregator_client.py:302] ERROR 3. The hostname matches the certificate's subject
[15:54:02] [aggregator_client.py:332] ERROR Connection error: HTTPSConnectionPool(host='localhost', port=49467): Max retries exceeded with url: /experimental/v1/tasks?collaborator_id=collaborator1&federation_uuid=plan.yaml_1cb6b3e5 (Caused by SSLError(SSLError(1, '[SSL: SSLV3_ALERT_CERTIFICATE_UNKNOWN] sslv3 alert certificate unknown (_ssl.c:2578)')))
[15:54:02] [aggregator_client.py:334] ERROR Connection error details: HTTPSConnectionPool(host='localhost', port=49467): Max retries exceeded with url: /experimental/v1/tasks?collaborator_id=collaborator1&federation_uuid=plan.yaml_1cb6b3e5 (Caused by SSLError(SSLError(1, '[SSL: SSLV3_ALERT_CERTIFICATE_UNKNOWN] sslv3 alert certificate unknown (_ssl.c:2578)')))

@ishaileshpant
Copy link
Copy Markdown
Collaborator Author

RestAPI is not working as expected - Collaborator is not able to connect with Aggregator

[15:54:02] [plan.py:240] INFO Building `openfl.pipelines.NoCompressionPipeline` Module.
[15:54:02] [aggregator_client.py:130] WARNING Initializing Aggregator REST Client (EXPERIMENTAL API - Not for production use)
[15:54:02] [plan.py:240] INFO Building `openfl.component.Collaborator` Module.
[15:54:02] [collaborator.py:130] WARNING Argument `device_assignment_policy` is deprecated and will be removed in the future.
[15:54:02] [aggregator_client.py:295] ERROR Certificate unknown error - this usually means the server's certificate is not trusted
[15:54:02] [aggregator_client.py:299] ERROR Please verify that:
[15:54:02] [aggregator_client.py:300] ERROR 1. The root certificate contains all necessary intermediate certificates
[15:54:02] [aggregator_client.py:301] ERROR 2. The server's certificate is properly signed by a trusted CA
[15:54:02] [aggregator_client.py:302] ERROR 3. The hostname matches the certificate's subject
[15:54:02] [aggregator_client.py:332] ERROR Connection error: HTTPSConnectionPool(host='localhost', port=49467): Max retries exceeded with url: /experimental/v1/tasks?collaborator_id=collaborator1&federation_uuid=plan.yaml_1cb6b3e5 (Caused by SSLError(SSLError(1, '[SSL: SSLV3_ALERT_CERTIFICATE_UNKNOWN] sslv3 alert certificate unknown (_ssl.c:2578)')))
[15:54:02] [aggregator_client.py:334] ERROR Connection error details: HTTPSConnectionPool(host='localhost', port=49467): Max retries exceeded with url: /experimental/v1/tasks?collaborator_id=collaborator1&federation_uuid=plan.yaml_1cb6b3e5 (Caused by SSLError(SSLError(1, '[SSL: SSLV3_ALERT_CERTIFICATE_UNKNOWN] sslv3 alert certificate unknown (_ssl.c:2578)')))

This has been fixed in latest push - the above error comes only in mTLS mode - TLS w/o client auth worked

Now with the fix all the cases should be fine

Copy link
Copy Markdown
Contributor

@psfoley psfoley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking good, @ishaileshpant. I've tested this locally with mTLS and I think this is in pretty good shape overall. I have a few minor comments from my review. Two other things I would request before merge:

  • A REST E2E test for CI. This is a big enough feature I want to be confident this is in a continuous working state.
  • Basic documentation. There should either be a minor page that explains how to toggle between gRPC / REST in the plan, or a note of this in the real world federation page (related to proxies that don't support gRPC).

After this is merged for OpenFL 1.9 I see an opportunity for:

  • More function / constant reuse between gRPC and REST implementations. I called out the possibility of more consistency between the gRPC / REST datastream messages, but other areas we could streamline are disallowed TLS versions. This feels like it should be defined once somewhere else and imported.
  • Additional tests for TLS / mTLS plan configurations. You are pretty well covered for functional tests for mTLS, but it would be good to test around just TLS as well (no client auth)

Comment thread openfl/transport/rest/aggregator_client.py Outdated
Comment thread openfl/transport/rest/aggregator_server.py
Comment on lines +508 to +532
# Serialize the TaskResults first
task_results_bytes = task_results.SerializeToString()
logger.debug(f"TaskResults serialized size: {len(task_results_bytes)} bytes")

# Create a DataStream message containing the TaskResults bytes
data_stream = base_pb2.DataStream(size=len(task_results_bytes), npbytes=task_results_bytes)

# Create an empty DataStream to signal end of stream
end_stream = base_pb2.DataStream(size=0, npbytes=b"")

# Serialize both messages
data_bytes = data_stream.SerializeToString()
end_bytes = end_stream.SerializeToString()

# Create length-prefixed stream format
stream_data = (
struct.pack(">I", len(data_bytes)) # Length prefix for first message
+ data_bytes # First message
+ struct.pack(">I", len(end_bytes)) # Length prefix for second message
+ end_bytes # Second message (empty message signals end)
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to encode the data stream this way? If so that's fine. I'm asking because it would be nice to have more symmetry with the gRPC implementation.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation is more like a batched RPC call with a custom binary protocol format rather than true streaming and i kept it this way to support both HTTP/1.1 and enhance for HTTP/2 true streaming later on - if you agree then post this iteration we should look at chunked transfer to better support both http/1.1 and eventually http/2 by enhancing when needs arises - wdyt?

@ishaileshpant
Copy link
Copy Markdown
Collaborator Author

  • A REST E2E test for CI. This is a big enough feature I want to be confident this is in a continuous working state.

We already have this in place as @payalcha has pointed out - we can uncomment this post the PR is merged

@ishaileshpant
Copy link
Copy Markdown
Collaborator Author

After this is merged for OpenFL 1.9 I see an opportunity for:

Totally agree, already have some ideas on doing this post the PR merge

- add new AggregatorClientInterface to allow switching b/w grpc and rest
- endhance existing AggregatorGRPCClient to start using AggregatorClientInterface
- added new transport package for rest with AggregatorRESTClient implementing AggregatorClientInterface
- added streaming api support with custom content-type
- added various connection flag for streaming request
- send additional header key "Sender" for better request logging at server side
- aligned Rest and gRPC client for most of the init params
- added AggregatorRESTServer and necesary changes in aggregator cli and federated/plan get_server method
- added transport_protocol settings in defaults/network.yaml, defaulted the same to 'grpc'
- reduced cyclomatic complexity of Rest Server
- fixed protobuf streaming issue for v1/task/results API
- added more detailed logging for task progression and metadata for each api calls
- pinned Flask version to latest stable 3.1.0

- addressing review comments - 13th-May
- added ping api and `collaborato` constructor hint for `AggregatorClientInterface`
- added send_message_to_server in client and AggregatorClientInterface, Rest Server is already at parity
- changed base uri for REST server to 'experimental/v1', adjusted the client and tests accordingly
- fixed issue related to mTLS in REST server/client
- disabled TLS 1.2 in both server/client
rebased 21st.May.1

Signed-off-by: Shailesh Pant <shailesh.pant@intel.com>
@teoparvanov teoparvanov merged commit 7fcecd3 into securefederatedai:develop May 21, 2025
37 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants