Skip to content

Commit aedf04d

Browse files
authored
fix(client): serialize create-push requests for go interop (#35)
* fix(client): serialize create-push requests for go interop Signed-off-by: Luca Muscariello <muscariello@ieee.org> * test(client): cover jsonrpc push-config transport paths Signed-off-by: Luca Muscariello <muscariello@ieee.org> * build(just): emit html report from coverage-lcov Signed-off-by: Luca Muscariello <muscariello@ieee.org> --------- Signed-off-by: Luca Muscariello <muscariello@ieee.org>
1 parent 51c64b6 commit aedf04d

File tree

3 files changed

+312
-14
lines changed

3 files changed

+312
-14
lines changed

a2a-client/src/jsonrpc.rs

Lines changed: 262 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use reqwest::Client;
99
use crate::push_config_compat::{
1010
deserialize_list_task_push_notification_configs_response,
1111
deserialize_task_push_notification_config,
12+
serialize_create_task_push_notification_config_request,
1213
};
1314
use crate::transport::{ServiceParams, Transport, TransportFactory};
1415

@@ -26,19 +27,13 @@ impl JsonRpcTransport {
2627
JsonRpcTransport { client, endpoint }
2728
}
2829

29-
async fn call_value<Req>(
30+
async fn call_value_with_payload(
3031
&self,
3132
params: &ServiceParams,
3233
method: &str,
33-
request_params: &Req,
34-
) -> Result<serde_json::Value, A2AError>
35-
where
36-
Req: ProtoJsonPayload,
37-
{
34+
payload: serde_json::Value,
35+
) -> Result<serde_json::Value, A2AError> {
3836
let id = JsonRpcId::String(uuid::Uuid::now_v7().to_string());
39-
let payload = protojson_conv::to_value(request_params).map_err(|e| {
40-
A2AError::internal(format!("failed to serialize request as ProtoJSON: {e}"))
41-
})?;
4237
let rpc_request = JsonRpcRequest::new(id, method, Some(payload));
4338

4439
let mut builder = self.client.post(&self.endpoint);
@@ -63,11 +58,25 @@ impl JsonRpcTransport {
6358
return Err(A2AError::new(err.code, err.message));
6459
}
6560

66-
let result = rpc_response
61+
rpc_response
6762
.result
68-
.ok_or_else(|| A2AError::internal("JSON-RPC response missing result"))?;
63+
.ok_or_else(|| A2AError::internal("JSON-RPC response missing result"))
64+
}
65+
66+
async fn call_value<Req>(
67+
&self,
68+
params: &ServiceParams,
69+
method: &str,
70+
request_params: &Req,
71+
) -> Result<serde_json::Value, A2AError>
72+
where
73+
Req: ProtoJsonPayload,
74+
{
75+
let payload = protojson_conv::to_value(request_params).map_err(|e| {
76+
A2AError::internal(format!("failed to serialize request as ProtoJSON: {e}"))
77+
})?;
6978

70-
Ok(result)
79+
self.call_value_with_payload(params, method, payload).await
7180
}
7281

7382
async fn call<Req, Resp>(
@@ -341,8 +350,9 @@ impl Transport for JsonRpcTransport {
341350
params: &ServiceParams,
342351
req: &CreateTaskPushNotificationConfigRequest,
343352
) -> Result<TaskPushNotificationConfig, A2AError> {
353+
let payload = serialize_create_task_push_notification_config_request(req)?;
344354
let result = self
345-
.call_value(params, methods::CREATE_PUSH_CONFIG, req)
355+
.call_value_with_payload(params, methods::CREATE_PUSH_CONFIG, payload)
346356
.await?;
347357
deserialize_task_push_notification_config(result)
348358
}
@@ -456,6 +466,10 @@ mod tests {
456466
use super::*;
457467
use a2a_pb::protojson_conv;
458468
use futures::StreamExt;
469+
use serde_json::{Value, json};
470+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
471+
use tokio::net::{TcpListener, TcpStream};
472+
use tokio::sync::oneshot;
459473

460474
/// Helper: build an SSE byte stream from raw text chunks.
461475
fn byte_stream(
@@ -469,6 +483,91 @@ mod tests {
469483
)
470484
}
471485

486+
async fn spawn_jsonrpc_server(response_body: String) -> (String, oneshot::Receiver<String>) {
487+
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
488+
let addr = listener.local_addr().unwrap();
489+
let (request_tx, request_rx) = oneshot::channel();
490+
491+
tokio::spawn(async move {
492+
let (mut socket, _) = listener.accept().await.unwrap();
493+
let request = read_http_request(&mut socket).await;
494+
let response = format!(
495+
"HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}",
496+
response_body.len(),
497+
response_body,
498+
);
499+
500+
let _ = request_tx.send(request);
501+
socket.write_all(response.as_bytes()).await.unwrap();
502+
});
503+
504+
(format!("http://{addr}"), request_rx)
505+
}
506+
507+
async fn read_http_request(socket: &mut TcpStream) -> String {
508+
let mut buffer = Vec::new();
509+
let mut chunk = [0_u8; 1024];
510+
let mut expected_len = None;
511+
512+
loop {
513+
let read = socket.read(&mut chunk).await.unwrap();
514+
if read == 0 {
515+
break;
516+
}
517+
buffer.extend_from_slice(&chunk[..read]);
518+
519+
if expected_len.is_none() {
520+
if let Some(header_end) = find_header_end(&buffer) {
521+
let headers = String::from_utf8_lossy(&buffer[..header_end]);
522+
expected_len = Some(header_end + parse_content_length(&headers));
523+
}
524+
}
525+
526+
if let Some(total_len) = expected_len {
527+
if buffer.len() >= total_len {
528+
break;
529+
}
530+
}
531+
}
532+
533+
String::from_utf8(buffer).unwrap()
534+
}
535+
536+
fn find_header_end(buffer: &[u8]) -> Option<usize> {
537+
buffer
538+
.windows(4)
539+
.position(|window| window == b"\r\n\r\n")
540+
.map(|position| position + 4)
541+
}
542+
543+
fn parse_content_length(headers: &str) -> usize {
544+
headers
545+
.lines()
546+
.find_map(|line| {
547+
let (name, value) = line.split_once(':')?;
548+
name.eq_ignore_ascii_case("content-length")
549+
.then(|| value.trim().parse::<usize>().ok())
550+
.flatten()
551+
})
552+
.unwrap_or(0)
553+
}
554+
555+
fn sample_create_push_config_request() -> CreateTaskPushNotificationConfigRequest {
556+
CreateTaskPushNotificationConfigRequest {
557+
task_id: "task-1".into(),
558+
config: PushNotificationConfig {
559+
url: "https://example.invalid/webhook".into(),
560+
id: Some("cfg-1".into()),
561+
token: Some("secret-token".into()),
562+
authentication: Some(AuthenticationInfo {
563+
scheme: "Bearer".into(),
564+
credentials: Some("credential".into()),
565+
}),
566+
},
567+
tenant: Some("tenant-1".into()),
568+
}
569+
}
570+
472571
#[tokio::test]
473572
async fn test_parse_sse_stream_jsonrpc_envelope() {
474573
// Build a JSON-RPC response wrapping a StreamResponse (StatusUpdate)
@@ -705,4 +804,154 @@ mod tests {
705804
// Just verify it was created (it's a real transport but we can't call it without a server)
706805
transport.destroy().await.unwrap();
707806
}
807+
808+
#[tokio::test]
809+
async fn test_create_push_config_sends_nested_request_shape() {
810+
let response = json!({
811+
"jsonrpc": "2.0",
812+
"id": "1",
813+
"result": {
814+
"taskId": "task-1",
815+
"config": {
816+
"url": "https://example.invalid/webhook",
817+
"id": "cfg-1",
818+
"token": "secret-token",
819+
"authentication": {
820+
"scheme": "Bearer",
821+
"credentials": "credential"
822+
}
823+
},
824+
"tenant": "tenant-1"
825+
}
826+
})
827+
.to_string();
828+
let (endpoint, request_rx) = spawn_jsonrpc_server(response).await;
829+
let transport = JsonRpcTransport::new(Client::new(), endpoint);
830+
let mut params = ServiceParams::new();
831+
params.insert("x-trace".into(), vec!["alpha".into(), "beta".into()]);
832+
833+
let result = transport
834+
.create_push_config(&params, &sample_create_push_config_request())
835+
.await
836+
.unwrap();
837+
838+
assert_eq!(result.task_id, "task-1");
839+
assert_eq!(result.config.id.as_deref(), Some("cfg-1"));
840+
841+
let request = request_rx.await.unwrap();
842+
let request_lower = request.to_ascii_lowercase();
843+
assert!(request_lower.contains("x-trace: alpha"));
844+
assert!(request_lower.contains("x-trace: beta"));
845+
846+
let body = request.split("\r\n\r\n").nth(1).unwrap();
847+
let payload: Value = serde_json::from_str(body).unwrap();
848+
assert_eq!(payload["method"], methods::CREATE_PUSH_CONFIG);
849+
assert_eq!(
850+
payload["params"],
851+
json!({
852+
"taskId": "task-1",
853+
"config": {
854+
"url": "https://example.invalid/webhook",
855+
"id": "cfg-1",
856+
"token": "secret-token",
857+
"authentication": {
858+
"scheme": "Bearer",
859+
"credentials": "credential"
860+
}
861+
},
862+
"tenant": "tenant-1"
863+
})
864+
);
865+
}
866+
867+
#[tokio::test]
868+
async fn test_create_push_config_surfaces_jsonrpc_error() {
869+
let response = json!({
870+
"jsonrpc": "2.0",
871+
"id": "1",
872+
"error": {
873+
"code": error_code::INVALID_PARAMS,
874+
"message": "invalid params",
875+
"data": null
876+
}
877+
})
878+
.to_string();
879+
let (endpoint, _request_rx) = spawn_jsonrpc_server(response).await;
880+
let transport = JsonRpcTransport::new(Client::new(), endpoint);
881+
882+
let error = transport
883+
.create_push_config(&ServiceParams::new(), &sample_create_push_config_request())
884+
.await
885+
.unwrap_err();
886+
887+
assert_eq!(error.code, error_code::INVALID_PARAMS);
888+
assert_eq!(error.message, "invalid params");
889+
}
890+
891+
#[tokio::test]
892+
async fn test_create_push_config_rejects_missing_result() {
893+
let response = json!({
894+
"jsonrpc": "2.0",
895+
"id": "1"
896+
})
897+
.to_string();
898+
let (endpoint, _request_rx) = spawn_jsonrpc_server(response).await;
899+
let transport = JsonRpcTransport::new(Client::new(), endpoint);
900+
901+
let error = transport
902+
.create_push_config(&ServiceParams::new(), &sample_create_push_config_request())
903+
.await
904+
.unwrap_err();
905+
906+
assert_eq!(error.code, error_code::INTERNAL_ERROR);
907+
assert_eq!(error.message, "JSON-RPC response missing result");
908+
}
909+
910+
#[tokio::test]
911+
async fn test_get_push_config_uses_protojson_request_path() {
912+
let response = json!({
913+
"jsonrpc": "2.0",
914+
"id": "1",
915+
"result": {
916+
"taskId": "task-1",
917+
"config": {
918+
"url": "https://example.invalid/webhook",
919+
"id": "cfg-1",
920+
"token": "secret-token"
921+
},
922+
"tenant": "tenant-1"
923+
}
924+
})
925+
.to_string();
926+
let (endpoint, request_rx) = spawn_jsonrpc_server(response).await;
927+
let transport = JsonRpcTransport::new(Client::new(), endpoint);
928+
929+
let result = transport
930+
.get_push_config(
931+
&ServiceParams::new(),
932+
&GetTaskPushNotificationConfigRequest {
933+
task_id: "task-1".into(),
934+
id: "cfg-1".into(),
935+
tenant: Some("tenant-1".into()),
936+
},
937+
)
938+
.await
939+
.unwrap();
940+
941+
assert_eq!(result.task_id, "task-1");
942+
assert_eq!(result.config.id.as_deref(), Some("cfg-1"));
943+
944+
let request = request_rx.await.unwrap();
945+
let body = request.split("\r\n\r\n").nth(1).unwrap();
946+
let payload: Value = serde_json::from_str(body).unwrap();
947+
assert_eq!(payload["method"], methods::GET_PUSH_CONFIG);
948+
assert_eq!(
949+
payload["params"],
950+
json!({
951+
"taskId": "task-1",
952+
"id": "cfg-1",
953+
"tenant": "tenant-1"
954+
})
955+
);
956+
}
708957
}

a2a-client/src/push_config_compat.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,16 @@ use a2a::*;
44
use a2a_pb::protojson_conv;
55
use serde_json::Value;
66

7+
pub(crate) fn serialize_create_task_push_notification_config_request(
8+
request: &CreateTaskPushNotificationConfigRequest,
9+
) -> Result<Value, A2AError> {
10+
serde_json::to_value(request).map_err(|error| {
11+
A2AError::internal(format!(
12+
"failed to serialize create-push-config request: {error}"
13+
))
14+
})
15+
}
16+
717
pub(crate) fn deserialize_task_push_notification_config(
818
payload: Value,
919
) -> Result<TaskPushNotificationConfig, A2AError> {
@@ -40,6 +50,7 @@ pub(crate) fn deserialize_list_task_push_notification_configs_response(
4050
#[cfg(test)]
4151
mod tests {
4252
use super::*;
53+
use serde_json::json;
4354

4455
fn sample_task_push_config() -> TaskPushNotificationConfig {
4556
TaskPushNotificationConfig {
@@ -57,6 +68,40 @@ mod tests {
5768
}
5869
}
5970

71+
fn sample_create_task_push_config_request() -> CreateTaskPushNotificationConfigRequest {
72+
let config = sample_task_push_config();
73+
CreateTaskPushNotificationConfigRequest {
74+
task_id: config.task_id.clone(),
75+
config: config.config,
76+
tenant: config.tenant,
77+
}
78+
}
79+
80+
#[test]
81+
fn serializes_nested_create_task_push_config_request_shape() {
82+
let payload = serialize_create_task_push_notification_config_request(
83+
&sample_create_task_push_config_request(),
84+
)
85+
.unwrap();
86+
87+
assert_eq!(
88+
payload,
89+
json!({
90+
"taskId": "t1",
91+
"config": {
92+
"id": "cfg1",
93+
"url": "https://example.invalid/webhook",
94+
"token": "token-1",
95+
"authentication": {
96+
"scheme": "Bearer",
97+
"credentials": "secret"
98+
}
99+
},
100+
"tenant": "tenant-1"
101+
})
102+
);
103+
}
104+
60105
#[test]
61106
fn parses_nested_task_push_config_shape() {
62107
let payload = serde_json::to_value(sample_task_push_config()).unwrap();

0 commit comments

Comments
 (0)