Skip to content

Commit 1a6711f

Browse files
committed
test(client): cover jsonrpc push-config transport paths
Signed-off-by: Luca Muscariello <[email protected]>
1 parent 7389ebe commit 1a6711f

File tree

1 file changed

+239
-0
lines changed

1 file changed

+239
-0
lines changed

a2a-client/src/jsonrpc.rs

Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,10 @@ mod tests {
466466
use super::*;
467467
use a2a_pb::protojson_conv;
468468
use futures::StreamExt;
469+
use serde_json::{Value, json};
470+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
471+
use tokio::net::{TcpListener, TcpStream};
472+
use tokio::sync::oneshot;
469473

470474
/// Helper: build an SSE byte stream from raw text chunks.
471475
fn byte_stream(
@@ -479,6 +483,91 @@ mod tests {
479483
)
480484
}
481485

486+
async fn spawn_jsonrpc_server(response_body: String) -> (String, oneshot::Receiver<String>) {
487+
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
488+
let addr = listener.local_addr().unwrap();
489+
let (request_tx, request_rx) = oneshot::channel();
490+
491+
tokio::spawn(async move {
492+
let (mut socket, _) = listener.accept().await.unwrap();
493+
let request = read_http_request(&mut socket).await;
494+
let response = format!(
495+
"HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}",
496+
response_body.len(),
497+
response_body,
498+
);
499+
500+
let _ = request_tx.send(request);
501+
socket.write_all(response.as_bytes()).await.unwrap();
502+
});
503+
504+
(format!("http://{addr}"), request_rx)
505+
}
506+
507+
async fn read_http_request(socket: &mut TcpStream) -> String {
508+
let mut buffer = Vec::new();
509+
let mut chunk = [0_u8; 1024];
510+
let mut expected_len = None;
511+
512+
loop {
513+
let read = socket.read(&mut chunk).await.unwrap();
514+
if read == 0 {
515+
break;
516+
}
517+
buffer.extend_from_slice(&chunk[..read]);
518+
519+
if expected_len.is_none() {
520+
if let Some(header_end) = find_header_end(&buffer) {
521+
let headers = String::from_utf8_lossy(&buffer[..header_end]);
522+
expected_len = Some(header_end + parse_content_length(&headers));
523+
}
524+
}
525+
526+
if let Some(total_len) = expected_len {
527+
if buffer.len() >= total_len {
528+
break;
529+
}
530+
}
531+
}
532+
533+
String::from_utf8(buffer).unwrap()
534+
}
535+
536+
fn find_header_end(buffer: &[u8]) -> Option<usize> {
537+
buffer
538+
.windows(4)
539+
.position(|window| window == b"\r\n\r\n")
540+
.map(|position| position + 4)
541+
}
542+
543+
fn parse_content_length(headers: &str) -> usize {
544+
headers
545+
.lines()
546+
.find_map(|line| {
547+
let (name, value) = line.split_once(':')?;
548+
name.eq_ignore_ascii_case("content-length")
549+
.then(|| value.trim().parse::<usize>().ok())
550+
.flatten()
551+
})
552+
.unwrap_or(0)
553+
}
554+
555+
fn sample_create_push_config_request() -> CreateTaskPushNotificationConfigRequest {
556+
CreateTaskPushNotificationConfigRequest {
557+
task_id: "task-1".into(),
558+
config: PushNotificationConfig {
559+
url: "https://example.invalid/webhook".into(),
560+
id: Some("cfg-1".into()),
561+
token: Some("secret-token".into()),
562+
authentication: Some(AuthenticationInfo {
563+
scheme: "Bearer".into(),
564+
credentials: Some("credential".into()),
565+
}),
566+
},
567+
tenant: Some("tenant-1".into()),
568+
}
569+
}
570+
482571
#[tokio::test]
483572
async fn test_parse_sse_stream_jsonrpc_envelope() {
484573
// Build a JSON-RPC response wrapping a StreamResponse (StatusUpdate)
@@ -715,4 +804,154 @@ mod tests {
715804
// Just verify it was created (it's a real transport but we can't call it without a server)
716805
transport.destroy().await.unwrap();
717806
}
807+
808+
#[tokio::test]
809+
async fn test_create_push_config_sends_nested_request_shape() {
810+
let response = json!({
811+
"jsonrpc": "2.0",
812+
"id": "1",
813+
"result": {
814+
"taskId": "task-1",
815+
"config": {
816+
"url": "https://example.invalid/webhook",
817+
"id": "cfg-1",
818+
"token": "secret-token",
819+
"authentication": {
820+
"scheme": "Bearer",
821+
"credentials": "credential"
822+
}
823+
},
824+
"tenant": "tenant-1"
825+
}
826+
})
827+
.to_string();
828+
let (endpoint, request_rx) = spawn_jsonrpc_server(response).await;
829+
let transport = JsonRpcTransport::new(Client::new(), endpoint);
830+
let mut params = ServiceParams::new();
831+
params.insert("x-trace".into(), vec!["alpha".into(), "beta".into()]);
832+
833+
let result = transport
834+
.create_push_config(&params, &sample_create_push_config_request())
835+
.await
836+
.unwrap();
837+
838+
assert_eq!(result.task_id, "task-1");
839+
assert_eq!(result.config.id.as_deref(), Some("cfg-1"));
840+
841+
let request = request_rx.await.unwrap();
842+
let request_lower = request.to_ascii_lowercase();
843+
assert!(request_lower.contains("x-trace: alpha"));
844+
assert!(request_lower.contains("x-trace: beta"));
845+
846+
let body = request.split("\r\n\r\n").nth(1).unwrap();
847+
let payload: Value = serde_json::from_str(body).unwrap();
848+
assert_eq!(payload["method"], methods::CREATE_PUSH_CONFIG);
849+
assert_eq!(
850+
payload["params"],
851+
json!({
852+
"taskId": "task-1",
853+
"config": {
854+
"url": "https://example.invalid/webhook",
855+
"id": "cfg-1",
856+
"token": "secret-token",
857+
"authentication": {
858+
"scheme": "Bearer",
859+
"credentials": "credential"
860+
}
861+
},
862+
"tenant": "tenant-1"
863+
})
864+
);
865+
}
866+
867+
#[tokio::test]
868+
async fn test_create_push_config_surfaces_jsonrpc_error() {
869+
let response = json!({
870+
"jsonrpc": "2.0",
871+
"id": "1",
872+
"error": {
873+
"code": error_code::INVALID_PARAMS,
874+
"message": "invalid params",
875+
"data": null
876+
}
877+
})
878+
.to_string();
879+
let (endpoint, _request_rx) = spawn_jsonrpc_server(response).await;
880+
let transport = JsonRpcTransport::new(Client::new(), endpoint);
881+
882+
let error = transport
883+
.create_push_config(&ServiceParams::new(), &sample_create_push_config_request())
884+
.await
885+
.unwrap_err();
886+
887+
assert_eq!(error.code, error_code::INVALID_PARAMS);
888+
assert_eq!(error.message, "invalid params");
889+
}
890+
891+
#[tokio::test]
892+
async fn test_create_push_config_rejects_missing_result() {
893+
let response = json!({
894+
"jsonrpc": "2.0",
895+
"id": "1"
896+
})
897+
.to_string();
898+
let (endpoint, _request_rx) = spawn_jsonrpc_server(response).await;
899+
let transport = JsonRpcTransport::new(Client::new(), endpoint);
900+
901+
let error = transport
902+
.create_push_config(&ServiceParams::new(), &sample_create_push_config_request())
903+
.await
904+
.unwrap_err();
905+
906+
assert_eq!(error.code, error_code::INTERNAL_ERROR);
907+
assert_eq!(error.message, "JSON-RPC response missing result");
908+
}
909+
910+
#[tokio::test]
911+
async fn test_get_push_config_uses_protojson_request_path() {
912+
let response = json!({
913+
"jsonrpc": "2.0",
914+
"id": "1",
915+
"result": {
916+
"taskId": "task-1",
917+
"config": {
918+
"url": "https://example.invalid/webhook",
919+
"id": "cfg-1",
920+
"token": "secret-token"
921+
},
922+
"tenant": "tenant-1"
923+
}
924+
})
925+
.to_string();
926+
let (endpoint, request_rx) = spawn_jsonrpc_server(response).await;
927+
let transport = JsonRpcTransport::new(Client::new(), endpoint);
928+
929+
let result = transport
930+
.get_push_config(
931+
&ServiceParams::new(),
932+
&GetTaskPushNotificationConfigRequest {
933+
task_id: "task-1".into(),
934+
id: "cfg-1".into(),
935+
tenant: Some("tenant-1".into()),
936+
},
937+
)
938+
.await
939+
.unwrap();
940+
941+
assert_eq!(result.task_id, "task-1");
942+
assert_eq!(result.config.id.as_deref(), Some("cfg-1"));
943+
944+
let request = request_rx.await.unwrap();
945+
let body = request.split("\r\n\r\n").nth(1).unwrap();
946+
let payload: Value = serde_json::from_str(body).unwrap();
947+
assert_eq!(payload["method"], methods::GET_PUSH_CONFIG);
948+
assert_eq!(
949+
payload["params"],
950+
json!({
951+
"taskId": "task-1",
952+
"id": "cfg-1",
953+
"tenant": "tenant-1"
954+
})
955+
);
956+
}
718957
}

0 commit comments

Comments
 (0)