Skip to content

Commit de69403

Browse files
committed
Clean up/refactor some event stream code
Clarifying some comments
1 parent de86d87 commit de69403

4 files changed

Lines changed: 146 additions & 51 deletions

File tree

codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/protocol/RequestSerializerGenerator.kt

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,14 @@ class RequestSerializerGenerator(
268268
// then replace the body with the event stream and set the correct Content-Type.
269269
val eventStreamBody =
270270
writable {
271-
bodyGenerator?.generatePayload(this, "input", operationShape)
271+
software.amazon.smithy.rust.codegen.client.smithy.protocols.renderClientEventStreamBody(
272+
this,
273+
codegenContext,
274+
protocol,
275+
operationShape,
276+
streamingMember!!,
277+
outerName = "input",
278+
)
272279
}
273280
// requestContentType returns the event stream content type for input streams
274281
// (e.g., application/vnd.amazon.eventstream) or the normal protocol content type
@@ -279,7 +286,7 @@ class RequestSerializerGenerator(
279286
let mut request = protocol.serialize_request(
280287
&input, $schemaRef, "", _cfg,
281288
).map_err(#{BoxError}::from)?;
282-
*request.body_mut() = #{SdkBody}::from(#{event_stream_body});
289+
*request.body_mut() = #{event_stream_body};
283290
// The protocol may have set Content-Length based on the initial empty body.
284291
// Remove it since the event stream body has unknown length.
285292
request.headers_mut().remove("Content-Length");

codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/protocol/ResponseDeserializerGenerator.kt

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -195,11 +195,12 @@ class ResponseDeserializerGenerator(
195195

196196
/** Schema-serde path for event stream responses (hybrid).
197197
*
198-
* Mirrors the legacy builder-based pattern: create the output builder, set
199-
* the event stream member via `set_<memberName>(Some(receiver))`, then
200-
* build. The legacy `EventStreamUnmarshallerGenerator` handles frame-level
201-
* unmarshalling, including initial-response data for RPC protocols (which
202-
* arrives via the first event frame, not the HTTP body).
198+
* Creates the output builder, sets the event stream member via
199+
* `set_<memberName>(Some(receiver))`, then builds. `EventStreamUnmarshallerGenerator`
200+
* (invoked here with `useSchemaSerde = true`) emits a schema-serde unmarshaller
201+
* that uses `self.protocol.payload_codec()` to decode each event frame, and also
202+
* handles initial-response data for RPC protocols (which arrives via the first
203+
* event frame, not the HTTP body).
203204
*
204205
* Unlike the non-streaming schema path, `deserialize_with_response` is not
205206
* used because it would call `builder.build()` internally — and for

codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/protocols/HttpBoundProtocolGenerator.kt

Lines changed: 115 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -27,49 +27,7 @@ class ClientHttpBoundProtocolPayloadGenerator(
2727
codegenContext, protocol, HttpMessageType.REQUEST,
2828
eventStreamUseSchemaSerde = SchemaSerdeAllowlist.usesSchemaSerdeExclusively(codegenContext),
2929
renderEventStreamBody = { writer, params ->
30-
val useSchemaSerde = SchemaSerdeAllowlist.usesSchemaSerdeExclusively(codegenContext)
31-
val marshallerNew =
32-
if (useSchemaSerde) {
33-
"""
34-
let protocol = _cfg.load::<#{SharedClientProtocol}>()
35-
.expect("a SharedClientProtocol is required")
36-
.clone();
37-
let error_marshaller = #{errorMarshallerConstructorFn}(protocol.clone());
38-
let marshaller = #{marshallerConstructorFn}(protocol.clone());
39-
"""
40-
} else {
41-
"""
42-
let error_marshaller = #{errorMarshallerConstructorFn}();
43-
let marshaller = #{marshallerConstructorFn}();
44-
"""
45-
}
46-
writer.rustTemplate(
47-
"""
48-
{
49-
$marshallerNew
50-
let (signer, signer_sender) = #{DeferredSigner}::new();
51-
_cfg.interceptor_state().store_put(signer_sender);
52-
#{SdkBody}::from_body_1_x(#{http_body_util}::StreamBody::new(#{event_stream:W}))
53-
}
54-
""",
55-
"http_body_util" to CargoDependency.HttpBodyUtil01x.toType(),
56-
"SdkBody" to
57-
CargoDependency.smithyTypes(codegenContext.runtimeConfig).withFeature("http-body-1-x")
58-
.toType().resolve("body::SdkBody"),
59-
"aws_smithy_http" to RuntimeType.smithyHttp(codegenContext.runtimeConfig),
60-
"DeferredSigner" to
61-
RuntimeType.smithyEventStream(codegenContext.runtimeConfig)
62-
.resolve("frame::DeferredSigner"),
63-
"SharedClientProtocol" to
64-
RuntimeType.smithySchema(codegenContext.runtimeConfig)
65-
.resolve("protocol::SharedClientProtocol"),
66-
"marshallerConstructorFn" to params.eventStreamMarshallerGenerator.render(),
67-
"errorMarshallerConstructorFn" to params.errorMarshallerConstructorFn,
68-
"event_stream" to (
69-
eventStreamWithInitialRequest(codegenContext, protocol, params)
70-
?: messageStreamAdaptor(params.outerName, params.memberName)
71-
),
72-
)
30+
renderEventStreamBodyInline(writer, codegenContext, protocol, params)
7331
},
7432
)
7533

@@ -168,3 +126,117 @@ private fun messageStreamAdaptor(
168126
) = writable {
169127
rust("$outerName.$memberName.into_body_stream(marshaller, error_marshaller, signer)")
170128
}
129+
130+
/**
131+
* Renders an event stream request body directly, without going through the legacy
132+
* [HttpBoundProtocolPayloadGenerator]/[ProtocolPayloadGenerator] plumbing. Used by
133+
* the schema-serde [RequestSerializerGenerator] path so the event stream branch
134+
* does not depend on the legacy `bodyGenerator` parameter.
135+
*/
136+
fun renderClientEventStreamBody(
137+
writer: software.amazon.smithy.rust.codegen.core.rustlang.RustWriter,
138+
codegenContext: ClientCodegenContext,
139+
protocol: Protocol,
140+
operationShape: software.amazon.smithy.model.shapes.OperationShape,
141+
memberShape: software.amazon.smithy.model.shapes.MemberShape,
142+
outerName: String,
143+
) {
144+
val model = codegenContext.model
145+
val symbolProvider = codegenContext.symbolProvider
146+
val httpBindingResolver = protocol.httpBindingResolver
147+
val memberName = symbolProvider.toMemberName(memberShape)
148+
val unionShape =
149+
model.expectShape(
150+
memberShape.target,
151+
software.amazon.smithy.model.shapes.UnionShape::class.java,
152+
)
153+
val payloadContentType =
154+
httpBindingResolver.eventStreamMessageContentType(memberShape)
155+
?: throw software.amazon.smithy.codegen.core.CodegenException("event streams must set a content type")
156+
val useSchemaSerde = SchemaSerdeAllowlist.usesSchemaSerdeExclusively(codegenContext)
157+
val serializerGenerator = protocol.structuredDataSerializer()
158+
val errorMarshallerConstructorFn =
159+
software.amazon.smithy.rust.codegen.core.smithy.protocols.serialize.EventStreamErrorMarshallerGenerator(
160+
model,
161+
software.amazon.smithy.rust.codegen.core.smithy.CodegenTarget.CLIENT,
162+
codegenContext.runtimeConfig,
163+
symbolProvider,
164+
unionShape,
165+
serializerGenerator,
166+
payloadContentType,
167+
useSchemaSerde = useSchemaSerde,
168+
).render()
169+
val eventStreamMarshallerGenerator =
170+
software.amazon.smithy.rust.codegen.core.smithy.protocols.serialize.EventStreamMarshallerGenerator(
171+
model,
172+
software.amazon.smithy.rust.codegen.core.smithy.CodegenTarget.CLIENT,
173+
codegenContext.runtimeConfig,
174+
symbolProvider,
175+
unionShape,
176+
serializerGenerator,
177+
payloadContentType,
178+
useSchemaSerde = useSchemaSerde,
179+
)
180+
val params =
181+
EventStreamBodyParams(
182+
outerName,
183+
memberName,
184+
operationShape,
185+
eventStreamMarshallerGenerator,
186+
errorMarshallerConstructorFn,
187+
payloadContentType,
188+
object : software.amazon.smithy.rust.codegen.core.smithy.generators.protocol.AdditionalPayloadContext {},
189+
)
190+
renderEventStreamBodyInline(writer, codegenContext, protocol, params)
191+
}
192+
193+
/** Shared event-stream-body emission used by both the legacy lambda and the schema-serde helper. */
194+
private fun renderEventStreamBodyInline(
195+
writer: software.amazon.smithy.rust.codegen.core.rustlang.RustWriter,
196+
codegenContext: ClientCodegenContext,
197+
protocol: Protocol,
198+
params: EventStreamBodyParams,
199+
) {
200+
val useSchemaSerde = SchemaSerdeAllowlist.usesSchemaSerdeExclusively(codegenContext)
201+
val marshallerNew =
202+
if (useSchemaSerde) {
203+
"""
204+
let protocol = _cfg.load::<#{SharedClientProtocol}>()
205+
.expect("a SharedClientProtocol is required")
206+
.clone();
207+
let error_marshaller = #{errorMarshallerConstructorFn}(protocol.clone());
208+
let marshaller = #{marshallerConstructorFn}(protocol.clone());
209+
"""
210+
} else {
211+
"""
212+
let error_marshaller = #{errorMarshallerConstructorFn}();
213+
let marshaller = #{marshallerConstructorFn}();
214+
"""
215+
}
216+
writer.rustTemplate(
217+
"""
218+
{
219+
$marshallerNew
220+
let (signer, signer_sender) = #{DeferredSigner}::new();
221+
_cfg.interceptor_state().store_put(signer_sender);
222+
#{SdkBody}::from_body_1_x(#{http_body_util}::StreamBody::new(#{event_stream:W}))
223+
}
224+
""",
225+
"http_body_util" to CargoDependency.HttpBodyUtil01x.toType(),
226+
"SdkBody" to
227+
CargoDependency.smithyTypes(codegenContext.runtimeConfig).withFeature("http-body-1-x")
228+
.toType().resolve("body::SdkBody"),
229+
"DeferredSigner" to
230+
RuntimeType.smithyEventStream(codegenContext.runtimeConfig)
231+
.resolve("frame::DeferredSigner"),
232+
"SharedClientProtocol" to
233+
RuntimeType.smithySchema(codegenContext.runtimeConfig)
234+
.resolve("protocol::SharedClientProtocol"),
235+
"marshallerConstructorFn" to params.eventStreamMarshallerGenerator.render(),
236+
"errorMarshallerConstructorFn" to params.errorMarshallerConstructorFn,
237+
"event_stream" to (
238+
eventStreamWithInitialRequest(codegenContext, protocol, params)
239+
?: messageStreamAdaptor(params.outerName, params.memberName)
240+
),
241+
)
242+
}

rust-runtime/aws-smithy-schema/src/schema/codec.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,22 @@ use crate::serde::{ShapeDeserializer, ShapeSerializer};
1515
/// Trait for serializers that can produce a final byte output.
1616
///
1717
/// This is separate from [`ShapeSerializer`] to preserve object safety on
18-
/// `ShapeSerializer` (which is used as `&mut dyn ShapeSerializer` in generated code).
18+
/// [`ShapeSerializer`] (which is used as `&mut dyn ShapeSerializer` in generated code).
19+
///
20+
/// # Why isn't `FinishSerializer` itself object-safe?
21+
///
22+
/// [`FinishSerializer::finish`] takes `self` by value so it can consume and tear down the
23+
/// serializer (e.g., return an owned `Vec<u8>` without a leftover borrow on the serializer's
24+
/// internal buffer). Methods that receive `self` by value are not dispatchable through a
25+
/// trait object: `dyn FinishSerializer` doesn't know the concrete size of `Self`, so it
26+
/// cannot move it. This is the standard Rust object-safety restriction.
27+
///
28+
/// The consequence is that `FinishSerializer` can only be used with a statically-known
29+
/// serializer type, which is fine for generated code that knows the concrete [`Codec`].
30+
/// For call sites that need dynamic dispatch (e.g., event stream marshallers that receive
31+
/// a `Box<dyn PayloadSerializer>` from `ClientProtocol::payload_codec`), use
32+
/// [`PayloadSerializer::finish_boxed`] instead — it takes `self: Box<Self>`, which *is*
33+
/// object-safe because the `Box` owns the value and knows how to drop it.
1934
pub trait FinishSerializer {
2035
/// Consumes the serializer and returns the serialized bytes.
2136
fn finish(self) -> Vec<u8>;

0 commit comments

Comments
 (0)