@@ -612,7 +612,7 @@ static int snappy_decompress(void *buf, size_t n, KafkaBlock *block)
612612 };
613613 static const size_t snappy_java_hdrlen = 8 + 4 + 4 ;
614614
615- if (!memcmp (buf, snappy_java_magic, 8 ))
615+ if (inlen >= snappy_java_hdrlen && !memcmp (buf, snappy_java_magic, 8 ))
616616 {
617617 inbuf = inbuf + snappy_java_hdrlen;
618618 inlen -= snappy_java_hdrlen;
@@ -1444,19 +1444,26 @@ int KafkaMessage::parse_record_batch(void **buf, size_t *size,
14441444 if (parse_i32 (buf, size, &hdr.record_count ) < 0 )
14451445 return -1 ;
14461446
1447- if (*size < (size_t )(hdr.length - 61 + 12 ))
1447+ if (hdr.length < 49 )
1448+ {
1449+ errno = EBADMSG;
1450+ return -1 ;
1451+ }
1452+
1453+ size_t record_body_size = (size_t )(hdr.length - 49 );
1454+ if (*size < record_body_size)
14481455 return 1 ;
14491456
14501457 KafkaBlock block;
14511458 int compress_type = hdr.attributes & 7 ;
14521459
14531460 if (compress_type != 0 )
14541461 {
1455- if (uncompress_buf (*buf, hdr. length - 61 + 12 , &block, compress_type) < 0 )
1462+ if (uncompress_buf (*buf, record_body_size , &block, compress_type) < 0 )
14561463 return -1 ;
14571464
1458- *buf = (char *)*buf + hdr. length - 61 + 12 ;
1459- *size -= hdr. length - 61 + 12 ;
1465+ *buf = (char *)*buf + record_body_size ;
1466+ *size -= record_body_size ;
14601467 }
14611468
14621469 void *p = *buf;
@@ -3156,8 +3163,8 @@ int KafkaResponse::parse_fetch(void **buf, size_t *size)
31563163 {
31573164 int16_t error;
31583165 int32_t sessionid;
3159- parse_i16 (buf, size, &error);
3160- parse_i32 (buf, size, &sessionid);
3166+ CHECK_RET ( parse_i16 (buf, size, &error) );
3167+ CHECK_RET ( parse_i32 (buf, size, &sessionid) );
31613168 }
31623169
31633170 int32_t topic_cnt;
0 commit comments