@@ -203,6 +203,27 @@ type (
203203 IOTotal int `json:"io_total"`
204204 }
205205
206+ OssGStreamEvent struct {
207+ Event string `json:"event"`
208+ }
209+ OssS3CacheGs struct {
210+ Event string `json:"event"`
211+ HitB int `json:"hit_b"` // Bytes that were served to the client directly from the cache (a "cache hit")
212+ MissB int `json:"miss_b"` // Bytes that were served to the client that were a cache miss (not immediately available).
213+ FullHit int `json:"full_hit"` // Count of read requests from the client that were completely served from the cache.
214+ PartHit int `json:"part_hit"` // Count of read requests from the client that were partially served from the cache.
215+ Miss int `json:"miss"` // Count of read requests that were entirely a miss.
216+ BypassB int `json:"bypass_b"` // Bytes that were read in "bypass mode", skipping the cache. Typically, this is from read requests that are larger than the cache size.
217+ Bypass int `json:"bypass"` // Count of read requests that were served by bypassing the cache.
218+ FetchB int `json:"fetch_b"` // Bytes fetched from S3 triggered by cache misses. Note the cache may read in more bytes than requested for the miss.
219+ Fetch int `json:"fetch"` // Count of GET requests sent to S3.
220+ UnusedB int `json:"unused_b"` // Count of bytes that were fetched from S3 but not sent to the client.
221+ PrefetchB int `json:"prefetch_b"` // Bytes prefetched from S3 independent of a client request.
222+ Prefetch int `json:"prefetch"` // Count of prefetch requests sent to S3.
223+ Errors int `json:"errors"` // Count of errors encountered.
224+ BypassS float64 `json:"bypass_s"` // Seconds spent in GET requests serving bypass requests.
225+ FetchS float64 `json:"fetch_s"` // Seconds spent in GET requests serving cache misses.
226+ }
206227 OSSStatsGs struct {
207228 Event string `json:"event"`
208229 Reads int `json:"reads"`
@@ -364,6 +385,12 @@ const (
364385 ProcStat SummaryStatType = "proc" // https://xrootd.web.cern.ch/doc/dev57/xrd_monitoring.htm#_Toc138968507
365386)
366387
388+ // These are the names of the events that are sent by XRootD over the g-stream from the OSS layer
389+ const (
390+ OssStatsEvent = "oss_stats"
391+ S3FileStatsEvent = "s3file_stats"
392+ )
393+
367394var (
368395 // TODO: Remove this metric (the line directly below)
369396 // The renamed metric was added in v7.16
@@ -758,8 +785,34 @@ var (
758785 Help : "CPU utilization of the XRootD server" ,
759786 })
760787
761- lastStats SummaryStat
762- lastOssStats OSSStatsGs
788+ S3CacheBytes = promauto .NewCounterVec (prometheus.CounterOpts {
789+ Name : "xrootd_s3_cache_bytes_total" ,
790+ Help : "Bytes transferred by the S3 cache plugin." ,
791+ }, []string {"type" })
792+
793+ S3CacheHits = promauto .NewCounterVec (prometheus.CounterOpts {
794+ Name : "xrootd_s3_cache_hits_total" ,
795+ Help : "Number of cache hits, partial hits, or misses." ,
796+ }, []string {"type" })
797+
798+ S3CacheRequests = promauto .NewCounterVec (prometheus.CounterOpts {
799+ Name : "xrootd_s3_cache_requests_total" ,
800+ Help : "Number of cache requests." ,
801+ }, []string {"type" })
802+
803+ S3CacheErrors = promauto .NewCounter (prometheus.CounterOpts {
804+ Name : "xrootd_s3_cache_errors_total" ,
805+ Help : "Number of errors encountered by the S3 cache plugin." ,
806+ })
807+
808+ S3CacheRequestSeconds = promauto .NewCounterVec (prometheus.CounterOpts {
809+ Name : "xrootd_s3_cache_request_seconds_total" ,
810+ Help : "Total time spent in S3 requests." ,
811+ }, []string {"type" })
812+
813+ lastStats SummaryStat
814+ lastOssStats OSSStatsGs
815+ lastS3CacheStats OssS3CacheGs
763816
764817 procState = processUtilizationState {}
765818
@@ -776,7 +829,7 @@ var (
776829 monitorPaths []PathList
777830)
778831
779- var allowedEvents = map [string ]bool {"oss_stats" : true }
832+ var allowedEvents = map [string ]bool {OssStatsEvent : true , S3FileStatsEvent : true }
780833
781834// Set up listening and parsing xrootd monitoring UDP packets into prometheus
782835//
@@ -1740,7 +1793,81 @@ func HandleSummaryPacket(packet []byte) error {
17401793 return nil
17411794}
17421795
1743- // handleOSSPacket processes the OSS plugin stats
1796+ func updateCounter [T int | uint | float32 | float64 ](new T , old T , counter prometheus.Counter ) T {
1797+ incBy := math .Max (0 , float64 (new - old ))
1798+ counter .Add (incBy )
1799+ if new < old {
1800+ return old
1801+ }
1802+
1803+ return new
1804+ }
1805+
1806+ // handleS3CacheStats processes the S3 cache stats
1807+ // It expects the blobs to be in JSON format and will update the metrics accordingly
1808+ // It returns an error if the blobs are empty or if there is an error in unmarshalling
1809+ // the JSON data
1810+ // It also handles the case where the total IO or wait time is less than the previous value
1811+ // by resetting the increment to 0.
1812+ // The s3file_stats event schema is as follows:
1813+ //
1814+ // {
1815+ // "event": "s3file_stats",
1816+ // "hit_b": uint,
1817+ // "miss_b": uint,
1818+ // "full_hit": uint,
1819+ // "part_hit": uint,
1820+ // "miss": uint,
1821+ // "bypass_b": uint,
1822+ // "bypass": uint,
1823+ // "fetch_b": uint,
1824+ // "fetch": uint,
1825+ // "unused_b": uint,
1826+ // "prefetch_b": uint,
1827+ // "prefetch": uint,
1828+ // "errors": uint,
1829+ // "bypass_s": float,
1830+ // "fetch_s": float
1831+ // }
1832+ func handleS3CacheStats (blobs [][]byte ) error {
1833+
1834+ // we need to process the blobs backwards to ensure that we are processing the last valid event
1835+ // the most relevant data is the last valid event in the sequence of blobs
1836+ for i := len (blobs ) - 1 ; i >= 0 ; i -- {
1837+ blob := blobs [i ]
1838+ s3fileStats := OssS3CacheGs {}
1839+ if err := json .Unmarshal (blob , & s3fileStats ); err != nil {
1840+ log .Warningf ("Failed to unmarshal S3 file stats json: %s" , string (blob ))
1841+ continue
1842+ }
1843+ if ! allowedEvents [s3fileStats .Event ] {
1844+ log .Warningf ("handleS3CacheStats received an S3 file stats packet with an unrecognized event type (%s)" , s3fileStats .Event )
1845+ continue
1846+ }
1847+ lastS3CacheStats .HitB = updateCounter (s3fileStats .HitB , lastS3CacheStats .HitB , S3CacheBytes .WithLabelValues ("hit" ))
1848+ lastS3CacheStats .MissB = updateCounter (s3fileStats .MissB , lastS3CacheStats .MissB , S3CacheBytes .WithLabelValues ("miss" ))
1849+ lastS3CacheStats .BypassB = updateCounter (s3fileStats .BypassB , lastS3CacheStats .BypassB , S3CacheBytes .WithLabelValues ("bypass" ))
1850+ lastS3CacheStats .FetchB = updateCounter (s3fileStats .FetchB , lastS3CacheStats .FetchB , S3CacheBytes .WithLabelValues ("fetch" ))
1851+ lastS3CacheStats .UnusedB = updateCounter (s3fileStats .UnusedB , lastS3CacheStats .UnusedB , S3CacheBytes .WithLabelValues ("unused" ))
1852+ lastS3CacheStats .PrefetchB = updateCounter (s3fileStats .PrefetchB , lastS3CacheStats .PrefetchB , S3CacheBytes .WithLabelValues ("prefetch" ))
1853+
1854+ lastS3CacheStats .FullHit = updateCounter (s3fileStats .FullHit , lastS3CacheStats .FullHit , S3CacheHits .WithLabelValues ("full" ))
1855+ lastS3CacheStats .PartHit = updateCounter (s3fileStats .PartHit , lastS3CacheStats .PartHit , S3CacheHits .WithLabelValues ("partial" ))
1856+ lastS3CacheStats .Miss = updateCounter (s3fileStats .Miss , lastS3CacheStats .Miss , S3CacheHits .WithLabelValues ("miss" ))
1857+
1858+ lastS3CacheStats .Bypass = updateCounter (s3fileStats .Bypass , lastS3CacheStats .Bypass , S3CacheRequests .WithLabelValues ("bypass" ))
1859+ lastS3CacheStats .Fetch = updateCounter (s3fileStats .Fetch , lastS3CacheStats .Fetch , S3CacheRequests .WithLabelValues ("fetch" ))
1860+ lastS3CacheStats .Prefetch = updateCounter (s3fileStats .Prefetch , lastS3CacheStats .Prefetch , S3CacheRequests .WithLabelValues ("prefetch" ))
1861+
1862+ lastS3CacheStats .Errors = updateCounter (s3fileStats .Errors , lastS3CacheStats .Errors , S3CacheErrors )
1863+
1864+ lastS3CacheStats .BypassS = updateCounter (s3fileStats .BypassS , lastS3CacheStats .BypassS , S3CacheRequestSeconds .WithLabelValues ("bypass" ))
1865+ lastS3CacheStats .FetchS = updateCounter (s3fileStats .FetchS , lastS3CacheStats .FetchS , S3CacheRequestSeconds .WithLabelValues ("fetch" ))
1866+ }
1867+ return nil
1868+ }
1869+
1870+ // handleOSSStats processes the OSS plugin stats
17441871// It expects the blobs to be in JSON format and will update the metrics accordingly
17451872// It returns an error if the blobs are empty or if there is an error in unmarshalling
17461873// the JSON data
@@ -1815,18 +1942,7 @@ func HandleSummaryPacket(packet []byte) error {
18151942// The other fields are used to update the metrics accordingly.
18161943// The slow_ prefix fields are used to update the slow operation histograms.
18171944// The other fields are used to update the counters.
1818- func handleOSSPacket (blobs [][]byte ) error {
1819- updateCounter := func (new int , old int , counter prometheus.Counter ) int {
1820- // if the new value is less than the old value, we know that that the counter shouldnt be incremented
1821- incBy := math .Max (0 , float64 (new - old ))
1822- counter .Add (incBy )
1823-
1824- if new < old {
1825- return old
1826- }
1827- return new
1828- }
1829-
1945+ func handleOSSStats (blobs [][]byte ) error {
18301946 // updateHistogram updates the histogram with the average latency per operation for the given delta.
18311947 // newTotalTime and oldTotalTime are the cumulative times (in seconds).
18321948 // newCount and oldCount are the cumulative counts.
@@ -1843,20 +1959,17 @@ func handleOSSPacket(blobs [][]byte) error {
18431959
18441960 }
18451961 }
1846- if len (blobs ) == 0 {
1847- return errors .New ("no blobs in the OSS packet" )
1848- }
1849-
18501962 // we need to process the blobs backwards to ensure that we are processing the last valid event
18511963 // the most relevant data is the last valid event in the sequence of blobs
18521964 for i := len (blobs ) - 1 ; i >= 0 ; i -- {
18531965 blob := blobs [i ]
18541966 ossStats := OSSStatsGs {}
18551967 if err := json .Unmarshal (blob , & ossStats ); err != nil {
1856- return errors .Wrap (err , "failed to parse OSS stat json" )
1968+ log .Debugf ("Failed to unmarshal S3 file stats json: %s" , string (blob ))
1969+ continue
18571970 }
18581971 if ! allowedEvents [ossStats .Event ] {
1859- log .Debugf ( "handleOSSPacket received an OSS packet with an unrecognized event type (%s)" , ossStats .Event )
1972+ log .Warningf ( "handleOSSStats received an OSS packet with an unrecognized event type (%s)" , ossStats .Event )
18601973 continue
18611974 }
18621975 updateHistogram (ossStats .ReadT , lastOssStats .ReadT , ossStats .Reads , lastOssStats .Reads , OssReadTime )
@@ -1943,6 +2056,46 @@ func handleOSSPacket(blobs [][]byte) error {
19432056 return nil
19442057}
19452058
2059+ // handlesOSSPacket handles the OSS g-stream packets
2060+ // It expects the blobs to be in JSON format
2061+ // The blobs can be of different event types and come intermixed
2062+ // When grouping we preserve the order of the blobs in the original packet
2063+ // We will dispatch the blobs to the appropriate handler based on the event type
2064+ func handleOSSPacket (blobs [][]byte ) error {
2065+ if len (blobs ) == 0 {
2066+ return errors .New ("no blobs in the OSS g-stream packet" )
2067+ }
2068+
2069+ // This map will map the event type to the list of blobs that have that event type
2070+ groupedEvents := make (map [string ][][]byte )
2071+ for _ , blob := range blobs {
2072+ ossEvent := OssGStreamEvent {}
2073+ if err := json .Unmarshal (blob , & ossEvent ); err != nil {
2074+ return errors .Wrap (err , "failed to parse OSS event json" )
2075+ }
2076+ if ! allowedEvents [ossEvent .Event ] {
2077+ log .Warningf ("handleOSSPacket received an OSS packet with an unrecognized event type (%s)" , ossEvent .Event )
2078+ continue
2079+ }
2080+ groupedEvents [ossEvent .Event ] = append (groupedEvents [ossEvent .Event ], blob )
2081+ }
2082+
2083+ for eventType , eventBlobs := range groupedEvents {
2084+ switch eventType {
2085+ case OssStatsEvent :
2086+ if err := handleOSSStats (eventBlobs ); err != nil {
2087+ return errors .Wrap (err , "failed to handle OSS stats" )
2088+ }
2089+ case S3FileStatsEvent :
2090+ if err := handleS3CacheStats (eventBlobs ); err != nil {
2091+ return errors .Wrap (err , "failed to handle S3 file stats" )
2092+ }
2093+ }
2094+ }
2095+
2096+ return nil
2097+ }
2098+
19462099// handleThrottlePacket processes the throttle plugin stats
19472100// It expects the blobs to be in JSON format and will update the metrics accordingly
19482101// It also handles the case where the total IO or wait time is less than the previous value
0 commit comments