@@ -98,100 +98,103 @@ impl SinkFormatter for DebeziumJsonFormatter {
9898 & self ,
9999 chunk : & StreamChunk ,
100100 ) -> impl Iterator < Item = Result < ( Option < Value > , Option < Value > ) > > {
101- std:: iter:: from_coroutine ( || {
102- let DebeziumJsonFormatter {
103- schema,
104- pk_indices,
105- db_name,
106- sink_from_name,
107- opts,
108- key_encoder,
109- val_encoder,
110- } = self ;
111- let ts_ms = SystemTime :: now ( )
112- . duration_since ( UNIX_EPOCH )
113- . unwrap ( )
114- . as_millis ( ) as u64 ;
115- let source_field = json ! ( {
116- // todo: still some missing fields in source field
117- // ref https://debezium.io/documentation/reference/2.4/connectors/postgresql.html#postgresql-create-events
118- "db" : db_name,
119- "table" : sink_from_name,
120- "ts_ms" : ts_ms,
121- } ) ;
122-
123- let mut update_cache: Option < Map < String , Value > > = None ;
124-
125- for ( op, row) in chunk. rows ( ) {
126- let event_key_object: Option < Value > = Some ( json ! ( {
127- "schema" : json!( {
128- "type" : "struct" ,
129- "fields" : fields_pk_to_json( & schema. fields, pk_indices) ,
130- "optional" : false ,
131- "name" : concat_debezium_name_field( db_name, sink_from_name, "Key" ) ,
132- } ) ,
133- "payload" : tri!( key_encoder. encode( row) ) ,
134- } ) ) ;
135- let event_object: Option < Value > = match op {
136- Op :: Insert => Some ( json ! ( {
137- "schema" : schema_to_json( schema, db_name, sink_from_name) ,
138- "payload" : {
139- "before" : null,
140- "after" : tri!( val_encoder. encode( row) ) ,
141- "op" : "c" ,
142- "ts_ms" : ts_ms,
143- "source" : source_field,
144- }
145- } ) ) ,
146- Op :: Delete => {
147- let value_obj = Some ( json ! ( {
101+ std:: iter:: from_coroutine (
102+ #[ coroutine]
103+ || {
104+ let DebeziumJsonFormatter {
105+ schema,
106+ pk_indices,
107+ db_name,
108+ sink_from_name,
109+ opts,
110+ key_encoder,
111+ val_encoder,
112+ } = self ;
113+ let ts_ms = SystemTime :: now ( )
114+ . duration_since ( UNIX_EPOCH )
115+ . unwrap ( )
116+ . as_millis ( ) as u64 ;
117+ let source_field = json ! ( {
118+ // todo: still some missing fields in source field
119+ // ref https://debezium.io/documentation/reference/2.4/connectors/postgresql.html#postgresql-create-events
120+ "db" : db_name,
121+ "table" : sink_from_name,
122+ "ts_ms" : ts_ms,
123+ } ) ;
124+
125+ let mut update_cache: Option < Map < String , Value > > = None ;
126+
127+ for ( op, row) in chunk. rows ( ) {
128+ let event_key_object: Option < Value > = Some ( json ! ( {
129+ "schema" : json!( {
130+ "type" : "struct" ,
131+ "fields" : fields_pk_to_json( & schema. fields, pk_indices) ,
132+ "optional" : false ,
133+ "name" : concat_debezium_name_field( db_name, sink_from_name, "Key" ) ,
134+ } ) ,
135+ "payload" : tri!( key_encoder. encode( row) ) ,
136+ } ) ) ;
137+ let event_object: Option < Value > = match op {
138+ Op :: Insert => Some ( json ! ( {
148139 "schema" : schema_to_json( schema, db_name, sink_from_name) ,
149140 "payload" : {
150- "before" : tri! ( val_encoder . encode ( row ) ) ,
151- "after" : null ,
152- "op" : "d " ,
141+ "before" : null ,
142+ "after" : tri! ( val_encoder . encode ( row ) ) ,
143+ "op" : "c " ,
153144 "ts_ms" : ts_ms,
154145 "source" : source_field,
155146 }
156- } ) ) ;
157- yield Ok ( ( event_key_object. clone ( ) , value_obj) ) ;
158-
159- if opts. gen_tombstone {
160- // Tomestone event
161- // https://debezium.io/documentation/reference/2.1/connectors/postgresql.html#postgresql-delete-events
162- yield Ok ( ( event_key_object, None ) ) ;
163- }
164-
165- continue ;
166- }
167- Op :: UpdateDelete => {
168- update_cache = Some ( tri ! ( val_encoder. encode( row) ) ) ;
169- continue ;
170- }
171- Op :: UpdateInsert => {
172- if let Some ( before) = update_cache. take ( ) {
173- Some ( json ! ( {
147+ } ) ) ,
148+ Op :: Delete => {
149+ let value_obj = Some ( json ! ( {
174150 "schema" : schema_to_json( schema, db_name, sink_from_name) ,
175151 "payload" : {
176- "before" : before ,
177- "after" : tri! ( val_encoder . encode ( row ) ) ,
178- "op" : "u " ,
152+ "before" : tri! ( val_encoder . encode ( row ) ) ,
153+ "after" : null ,
154+ "op" : "d " ,
179155 "ts_ms" : ts_ms,
180156 "source" : source_field,
181157 }
182- } ) )
183- } else {
184- warn ! (
185- "not found UpdateDelete in prev row, skipping, row index {:?}" ,
186- row. index( )
187- ) ;
158+ } ) ) ;
159+ yield Ok ( ( event_key_object. clone ( ) , value_obj) ) ;
160+
161+ if opts. gen_tombstone {
162+ // Tomestone event
163+ // https://debezium.io/documentation/reference/2.1/connectors/postgresql.html#postgresql-delete-events
164+ yield Ok ( ( event_key_object, None ) ) ;
165+ }
166+
188167 continue ;
189168 }
190- }
191- } ;
192- yield Ok ( ( event_key_object, event_object) ) ;
193- }
194- } )
169+ Op :: UpdateDelete => {
170+ update_cache = Some ( tri ! ( val_encoder. encode( row) ) ) ;
171+ continue ;
172+ }
173+ Op :: UpdateInsert => {
174+ if let Some ( before) = update_cache. take ( ) {
175+ Some ( json ! ( {
176+ "schema" : schema_to_json( schema, db_name, sink_from_name) ,
177+ "payload" : {
178+ "before" : before,
179+ "after" : tri!( val_encoder. encode( row) ) ,
180+ "op" : "u" ,
181+ "ts_ms" : ts_ms,
182+ "source" : source_field,
183+ }
184+ } ) )
185+ } else {
186+ warn ! (
187+ "not found UpdateDelete in prev row, skipping, row index {:?}" ,
188+ row. index( )
189+ ) ;
190+ continue ;
191+ }
192+ }
193+ } ;
194+ yield Ok ( ( event_key_object, event_object) ) ;
195+ }
196+ } ,
197+ )
195198 }
196199}
197200
0 commit comments