Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions pkg/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type EmbeddedEtcdMember struct {
// etcd leader key when the PD node is successfully elected as the PD leader
// of the cluster. Every write will use it to check PD leadership.
memberValue string
// lastLeaderUpdatedTime is the last time when the leader is updated.
lastLeaderUpdatedTime atomic.Value
}

// NewMember create a new Member.
Expand Down Expand Up @@ -140,11 +142,13 @@ func (m *EmbeddedEtcdMember) GetLeader() *pdpb.Member {
// setLeader sets the member's PD leader.
func (m *EmbeddedEtcdMember) setLeader(member *pdpb.Member) {
m.leader.Store(member)
m.lastLeaderUpdatedTime.Store(time.Now())
}

// unsetLeader unsets the member's PD leader.
func (m *EmbeddedEtcdMember) unsetLeader() {
m.leader.Store(&pdpb.Member{})
m.lastLeaderUpdatedTime.Store(time.Now())
}

// EnableLeader sets the member itself to a PD leader.
Expand All @@ -162,6 +166,15 @@ func (m *EmbeddedEtcdMember) GetLeadership() *election.Leadership {
return m.leadership
}

// GetLastLeaderUpdatedTime returns the last time when the leader is updated.
func (m *EmbeddedEtcdMember) GetLastLeaderUpdatedTime() time.Time {
lastLeaderUpdatedTime := m.lastLeaderUpdatedTime.Load()
if lastLeaderUpdatedTime == nil {
return time.Time{}
}
return lastLeaderUpdatedTime.(time.Time)
}

// CampaignLeader is used to campaign a PD member's leadership
// and make it become a PD leader.
func (m *EmbeddedEtcdMember) CampaignLeader(leaseTimeout int64) error {
Expand Down
14 changes: 14 additions & 0 deletions pkg/member/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type Participant struct {
// campaignChecker is used to check whether the additional constraints for a
// campaign are satisfied. If it returns false, the campaign will fail.
campaignChecker atomic.Value // Store as leadershipCheckFunc
// lastLeaderUpdatedTime is the last time when the leader is updated.
lastLeaderUpdatedTime atomic.Value
}

// NewParticipant create a new Participant.
Expand Down Expand Up @@ -78,6 +80,7 @@ func (m *Participant) InitInfo(name string, id uint64, rootPath string, leaderNa
m.rootPath = rootPath
m.leaderPath = path.Join(rootPath, leaderName)
m.leadership = election.NewLeadership(m.client, m.GetLeaderPath(), purpose)
m.lastLeaderUpdatedTime.Store(time.Now())
log.Info("participant joining election", zap.Stringer("participant-info", m.member), zap.String("leader-path", m.leaderPath))
}

Expand Down Expand Up @@ -143,11 +146,13 @@ func (m *Participant) GetLeader() *tsopb.Participant {
// setLeader sets the member's leader.
func (m *Participant) setLeader(member *tsopb.Participant) {
m.leader.Store(member)
m.lastLeaderUpdatedTime.Store(time.Now())
}

// unsetLeader unsets the member's leader.
func (m *Participant) unsetLeader() {
m.leader.Store(&tsopb.Participant{})
m.lastLeaderUpdatedTime.Store(time.Now())
}

// EnableLeader declares the member itself to be the leader.
Expand All @@ -160,6 +165,15 @@ func (m *Participant) GetLeaderPath() string {
return m.leaderPath
}

// GetLastLeaderUpdatedTime returns the last time when the leader is updated.
func (m *Participant) GetLastLeaderUpdatedTime() time.Time {
lastLeaderUpdatedTime := m.lastLeaderUpdatedTime.Load()
if lastLeaderUpdatedTime == nil {
return time.Time{}
}
return lastLeaderUpdatedTime.(time.Time)
}

// GetLeadership returns the leadership of the member.
func (m *Participant) GetLeadership() *election.Leadership {
return m.leadership
Expand Down
2 changes: 2 additions & 0 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ type ElectionMember interface {
GetLeaderPath() string
// GetLeadership returns the leadership of the election member.
GetLeadership() *election.Leadership
// GetLastLeaderUpdatedTime returns the last time when the leader is updated.
GetLastLeaderUpdatedTime() time.Time
// GetDCLocationPathPrefix returns the dc-location path prefix of the cluster.
GetDCLocationPathPrefix() string
// GetDCLocationPath returns the dc-location path of a member with the given member ID.
Expand Down
41 changes: 41 additions & 0 deletions server/server.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"fmt"
"math/rand"
"net/http"
"os"
"path"
Expand Down Expand Up @@ -105,6 +106,9 @@ const (
maxRetryTimesGetServicePrimary = 25
// retryIntervalGetServicePrimary is the retry interval for getting primary addr.
retryIntervalGetServicePrimary = 100 * time.Millisecond

lostPDLeaderMaxTimeoutSecs = 10
lostPDLeaderReElectionFactor = 10
)

// EtcdStartTimeout the timeout of the startup etcd.
Expand Down Expand Up @@ -1432,6 +1436,14 @@ func (s *Server) leaderLoop() {
}

leader, checkAgain := s.member.CheckLeader()
// add failpoint to test leader check go to stuck.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file permission has been changed.

failpoint.Inject("leaderLoopCheckAgain", func(val failpoint.Value) {
memberString := val.(string)
memberID, _ := strconv.ParseUint(memberString, 10, 64)
if s.member.ID() == memberID {
checkAgain = true
}
})
if checkAgain {
continue
}
Expand Down Expand Up @@ -1459,6 +1471,25 @@ func (s *Server) leaderLoop() {
// To make sure the etcd leader and PD leader are on the same server.
etcdLeader := s.member.GetEtcdLeader()
if etcdLeader != s.member.ID() {
if s.member.GetLeader() == nil {
lastUpdated := s.member.GetLastLeaderUpdatedTime()
// use random timeout to avoid leader campaigning storm.
randomTimeout := time.Duration(rand.Intn(int(lostPDLeaderMaxTimeoutSecs)))*time.Second + lostPDLeaderMaxTimeoutSecs*time.Second + lostPDLeaderReElectionFactor*s.cfg.ElectionInterval.Duration
// add failpoint to test the campaign leader logic.
failpoint.Inject("timeoutWaitPDLeader", func() {
log.Info("timeoutWaitPDLeader is injected, skip wait other etcd leader be etcd leader")
randomTimeout = time.Duration(rand.Intn(10))*time.Millisecond + 100*time.Millisecond
})
if lastUpdated.Add(randomTimeout).Before(time.Now()) && !lastUpdated.IsZero() && etcdLeader != 0 {
log.Info("the pd leader is lost for a long time, try to re-campaign a pd leader with resign etcd leader",
zap.Duration("timeout", randomTimeout),
zap.Time("last-updated", lastUpdated),
zap.String("current-leader-member-id", types.ID(etcdLeader).String()),
zap.String("transferee-member-id", types.ID(s.member.ID()).String()),
)
s.member.MoveEtcdLeader(s.ctx, etcdLeader, s.member.ID())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider the leader's priority?

Copy link
Copy Markdown
Contributor Author

@nolouch nolouch May 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In most cases, there are no priority. if there exists priority, it also not affect the priority because the higher priority will do move the leader again. so, I think we can keep simple with this implementation.

}
}
log.Info("skip campaigning of pd leader and check later",
zap.String("server-name", s.Name()),
zap.Uint64("etcd-leader-id", etcdLeader),
Expand Down Expand Up @@ -1575,6 +1606,16 @@ func (s *Server) campaignLeader() {
log.Info("no longer a leader because lease has expired, pd leader will step down")
return
}
// add failpoint to test exit leader, failpoint judge the member is the give value, then break
failpoint.Inject("exitCampaignLeader", func(val failpoint.Value) {
memberString := val.(string)
memberID, _ := strconv.ParseUint(memberString, 10, 64)
if s.member.ID() == memberID {
log.Info("exit PD leader")
failpoint.Return()
}
})

etcdLeader := s.member.GetEtcdLeader()
if etcdLeader != s.member.ID() {
log.Info("etcd leader changed, resigns pd leadership", zap.String("old-pd-leader-name", s.Name()))
Expand Down
24 changes: 24 additions & 0 deletions tests/server/member/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,30 @@ func TestLeaderResignWithBlock(t *testing.T) {
re.NoError(failpoint.Disable("github.com/tikv/pd/server/raftclusterIsBusy"))
}

func TestPDLeaderLostWhileEtcdLeaderIntact(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestCluster(ctx, 2)
defer cluster.Destroy()
re.NoError(err)

err = cluster.RunInitialServers()
re.NoError(err)

leader1 := cluster.WaitLeader()
memberID := cluster.GetServer(leader1).GetLeader().GetMemberId()

re.NoError(failpoint.Enable("github.com/tikv/pd/server/leaderLoopCheckAgain", fmt.Sprintf("return(\"%d\")", memberID)))
re.NoError(failpoint.Enable("github.com/tikv/pd/server/exitCampaignLeader", fmt.Sprintf("return(\"%d\")", memberID)))
re.NoError(failpoint.Enable("github.com/tikv/pd/server/timeoutWaitPDLeader", `return(true)`))
leader2 := waitLeaderChange(re, cluster, leader1)
re.NotEqual(leader1, leader2)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check the duration that leader changing costs won't be too long?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without this fix, this test will be failed after timeout after 30s in waitLeaderChange.

re.NoError(failpoint.Disable("github.com/tikv/pd/server/leaderLoopCheckAgain"))
re.NoError(failpoint.Disable("github.com/tikv/pd/server/exitCampaignLeader"))
re.NoError(failpoint.Disable("github.com/tikv/pd/server/timeoutWaitPDLeader"))
}

func waitLeaderChange(re *require.Assertions, cluster *tests.TestCluster, old string) string {
var leader string
testutil.Eventually(re, func() bool {
Expand Down