Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions engine/pkg/promutil/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ package promutil
import (
"sync"

libModel "github.com/pingcap/tiflow/engine/lib/model"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
dto "github.com/prometheus/client_model/go"

libModel "github.com/pingcap/tiflow/engine/lib/model"
)

var _ prometheus.Gatherer = globalMetricGatherer
Expand All @@ -39,8 +40,8 @@ func init() {

// Registry is used for registering metric
type Registry struct {
sync.Mutex
*prometheus.Registry
mu sync.Mutex
registry *prometheus.Registry

// collectorByWorker is for cleaning all collectors for specific worker(jobmaster/worker)
collectorByWorker map[libModel.WorkerID][]prometheus.Collector
Expand All @@ -49,7 +50,7 @@ type Registry struct {
// NewRegistry return a new Registry
func NewRegistry() *Registry {
return &Registry{
Registry: prometheus.NewRegistry(),
registry: prometheus.NewRegistry(),
collectorByWorker: make(map[libModel.WorkerID][]prometheus.Collector),
}
}
Expand All @@ -59,10 +60,10 @@ func (r *Registry) MustRegister(workerID libModel.WorkerID, c prometheus.Collect
if c == nil {
return
}
r.Lock()
defer r.Unlock()
r.mu.Lock()
defer r.mu.Unlock()

r.Registry.MustRegister(c)
r.registry.MustRegister(c)

var (
cls []prometheus.Collector
Expand All @@ -78,13 +79,13 @@ func (r *Registry) MustRegister(workerID libModel.WorkerID, c prometheus.Collect

// Unregister unregisters all Collectors of the specified worker
func (r *Registry) Unregister(workerID libModel.WorkerID) {
r.Lock()
defer r.Unlock()
r.mu.Lock()
defer r.mu.Unlock()

cls, exists := r.collectorByWorker[workerID]
if exists {
for _, collector := range cls {
r.Registry.Unregister(collector)
r.registry.Unregister(collector)
}
delete(r.collectorByWorker, workerID)
}
Expand All @@ -93,7 +94,7 @@ func (r *Registry) Unregister(workerID libModel.WorkerID) {
// Gather implements Gatherer interface
func (r *Registry) Gather() ([]*dto.MetricFamily, error) {
// NOT NEED lock here. prometheus.Registry has thread-safe methods
return r.Registry.Gather()
return r.registry.Gather()
}

// AutoRegisterFactory uses inner Factory to create metrics and register metrics
Expand Down
14 changes: 7 additions & 7 deletions engine/pkg/promutil/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestNewRegistry(t *testing.T) {
t.Parallel()

reg := NewRegistry()
require.NotNil(t, reg.Registry)
require.NotNil(t, reg.registry)
require.Len(t, reg.collectorByWorker, 0)
}

Expand All @@ -42,7 +42,7 @@ func TestMustRegister(t *testing.T) {
t.Parallel()

reg := NewRegistry()
require.NotNil(t, reg.Registry)
require.NotNil(t, reg.registry)

// normal case, register successfully
reg.MustRegister("worker0", prometheus.NewCounter(prometheus.CounterOpts{
Expand Down Expand Up @@ -112,7 +112,7 @@ func TestMustRegisterNotValidName(t *testing.T) {
}()

reg := NewRegistry()
require.NotNil(t, reg.Registry)
require.NotNil(t, reg.registry)

// not a valid metric name
reg.MustRegister("worker", prometheus.NewCounter(prometheus.CounterOpts{
Expand All @@ -130,7 +130,7 @@ func TestMustRegisterNotValidLableName(t *testing.T) {
}()

reg := NewRegistry()
require.NotNil(t, reg.Registry)
require.NotNil(t, reg.registry)

// not a valid metric name
reg.MustRegister("worker", prometheus.NewCounter(prometheus.CounterOpts{
Expand All @@ -152,7 +152,7 @@ func TestMustRegisterFailDuplicateNameLabels(t *testing.T) {
}()

reg := NewRegistry()
require.NotNil(t, reg.Registry)
require.NotNil(t, reg.registry)

reg.MustRegister("worker", prometheus.NewCounter(prometheus.CounterOpts{
Name: "counter5",
Expand All @@ -179,7 +179,7 @@ func TestMustRegisterFailInconsistent(t *testing.T) {
}()

reg := NewRegistry()
require.NotNil(t, reg.Registry)
require.NotNil(t, reg.registry)

reg.MustRegister("worker", prometheus.NewCounter(prometheus.CounterOpts{
Name: "counter5",
Expand All @@ -199,7 +199,7 @@ func TestUnregister(t *testing.T) {
t.Parallel()

reg := NewRegistry()
require.NotNil(t, reg.Registry)
require.NotNil(t, reg.registry)
reg.MustRegister("worker0", prometheus.NewCounter(prometheus.CounterOpts{
Name: "counter5",
ConstLabels: prometheus.Labels{
Expand Down