@@ -657,22 +657,27 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) {
657657 consumergroupCurrentOffset , prometheus .GaugeValue , float64 (currentOffset ), group .GroupId , topic , strconv .FormatInt (int64 (partition ), 10 ),
658658 )
659659 e .mu .Lock ()
660- if offset , ok := offset [topic ][partition ]; ok {
661- // If the topic is consumed by that consumer group, but no offset associated with the partition
662- // forcing lag to -1 to be able to alert on that
660+ currentPartitionOffset , currentPartitionOffsetError := e .client .GetOffset (topic , partition , sarama .OffsetNewest )
661+ if currentPartitionOffsetError != nil {
662+ klog .Errorf ("Cannot get current offset of topic %s partition %d: %v" , topic , partition , currentPartitionOffsetError )
663+ } else {
663664 var lag int64
664665 if offsetFetchResponseBlock .Offset == - 1 {
665666 lag = - 1
666667 } else {
667- lag = offset - offsetFetchResponseBlock .Offset
668+ if offset , ok := offset [topic ][partition ]; ok {
669+ if currentPartitionOffset == - 1 {
670+ currentPartitionOffset = offset
671+ }
672+ }
673+ lag = currentPartitionOffset - offsetFetchResponseBlock .Offset
668674 lagSum += lag
669675 }
676+
670677 ch <- prometheus .MustNewConstMetric (
671678 consumergroupLag , prometheus .GaugeValue , float64 (lag ), group .GroupId , topic , strconv .FormatInt (int64 (partition ), 10 ),
672679 )
673- } else {
674- klog .Errorf ("No offset of topic %s partition %d, cannot get consumer group lag" , topic , partition )
675- }
680+ }
676681 e .mu .Unlock ()
677682 }
678683 ch <- prometheus .MustNewConstMetric (
0 commit comments