-
Notifications
You must be signed in to change notification settings - Fork 131
detect binlog purged and report to Drainer #1017
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
42558d2
b92f923
2a82ef8
1e59d8e
c2d13f9
c0053e8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,7 @@ import ( | |
| "github.com/pingcap/log" | ||
| "github.com/pingcap/tidb-binlog/pkg/util" | ||
| "github.com/pingcap/tidb-binlog/pump" | ||
| "github.com/pingcap/tidb-binlog/pump/storage" | ||
| "github.com/pingcap/tidb/store/tikv/oracle" | ||
| pb "github.com/pingcap/tipb/go-binlog" | ||
| "go.uber.org/zap" | ||
|
|
@@ -101,9 +102,11 @@ func (p *Pump) PullBinlog(pctx context.Context, last int64) chan MergeItem { | |
| labelReceive := "receive binlog" | ||
| labelCreateConn := "create conn" | ||
| labelPaused := "pump paused" | ||
| labelBinlogGCed := "binlog purged" | ||
| pLog.Add(labelReceive, 10*time.Second) | ||
| pLog.Add(labelCreateConn, 10*time.Second) | ||
| pLog.Add(labelPaused, 30*time.Second) | ||
| pLog.Add(labelBinlogGCed, 30*time.Second) | ||
|
|
||
| ret := make(chan MergeItem, binlogChanSize) | ||
|
|
||
|
|
@@ -119,6 +122,7 @@ func (p *Pump) PullBinlog(pctx context.Context, last int64) chan MergeItem { | |
| }() | ||
|
|
||
| needReCreateConn := false | ||
| isBinlogPurged := false | ||
| for { | ||
| if atomic.LoadInt32(&p.isClosed) == 1 { | ||
| return | ||
|
|
@@ -134,6 +138,16 @@ func (p *Pump) PullBinlog(pctx context.Context, last int64) chan MergeItem { | |
| continue | ||
| } | ||
|
|
||
| if isBinlogPurged { | ||
| // some binlogs have been purged in pump, just print the log and wait to exit by user. | ||
| pLog.Print(labelBinlogGCed, func() { | ||
| p.logger.Error("some binlogs have been purged in pump") | ||
| }) | ||
|
|
||
| time.Sleep(time.Second) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not exit directly? since
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if exit directly, then systemd will restart it again (but that's meaningless). if not exist, log and metrics can still keep working. for |
||
| continue | ||
| } | ||
|
|
||
| if p.grpcConn == nil || needReCreateConn { | ||
| p.logger.Info("pump create pull binlogs client") | ||
| if err := p.createPullBinlogsClient(pctx, last); err != nil { | ||
|
|
@@ -153,7 +167,13 @@ func (p *Pump) PullBinlog(pctx context.Context, last int64) chan MergeItem { | |
| }) | ||
| } | ||
|
|
||
| needReCreateConn = true | ||
| // Pump return binlog GCed error via gRPC response error. | ||
| if strings.Contains(err.Error(), storage.ErrRequestGCedBinlog.Error()) { | ||
| needReCreateConn = false // re-create connection will have no effect. | ||
| isBinlogPurged = true | ||
| } else { | ||
| needReCreateConn = true | ||
| } | ||
|
|
||
| time.Sleep(time.Second) | ||
| // TODO: add metric here | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -43,13 +43,17 @@ import ( | |
|
|
||
| const ( | ||
| maxTxnTimeoutSecond int64 = 600 | ||
| gcMaxBlockTime = 30 * time.Minute // we run GC at every 1 hour, but may block GC when reading and sending binlog at most in this duration. | ||
| chanCapacity = 1 << 20 | ||
| // if pump takes a long time to write binlog, pump will display the binlog meta information (unit: Second) | ||
| slowWriteThreshold = 1.0 | ||
| defaultStopWriteAtAvailableSpace = 10 * (1 << 30) | ||
| ) | ||
|
|
||
| var ( | ||
| // ErrRequestGCedBinlog indicates a Drainer is requesting some purged binlogs. | ||
| ErrRequestGCedBinlog = errors.New("request a purged binlog") | ||
|
|
||
| // save gcTS, the max TS we have gc, for binlog not greater than gcTS, we can delete it from storage | ||
| gcTSKey = []byte("!binlog!gcts") | ||
| // save maxCommitTS, we can get binlog in range [gcTS, maxCommitTS] from PullCommitBinlog | ||
|
|
@@ -83,7 +87,7 @@ type Storage interface { | |
| GetBinlog(ts int64) (binlog *pb.Binlog, err error) | ||
|
|
||
| // PullCommitBinlog return the chan to consume the binlog | ||
| PullCommitBinlog(ctx context.Context, last int64) <-chan []byte | ||
| PullCommitBinlog(ctx context.Context, last int64) (<-chan []byte, <-chan error) | ||
|
|
||
| Close() error | ||
| } | ||
|
|
@@ -104,7 +108,7 @@ type Append struct { | |
| latestTS int64 | ||
|
|
||
| gcWorking int32 | ||
| gcTS int64 | ||
| gcTS GCTS | ||
| maxCommitTS int64 | ||
| headPointer valuePointer | ||
| handlePointer valuePointer | ||
|
|
@@ -166,11 +170,12 @@ func NewAppendWithResolver(dir string, options *Options, tiStore kv.Storage, tiL | |
| sortItems: make(chan sortItem, 1024), | ||
| } | ||
|
|
||
| append.gcTS, err = append.readGCTSFromDB() | ||
| gcTS, err := append.readGCTSFromDB() | ||
| if err != nil { | ||
| return nil, errors.Trace(err) | ||
| } | ||
| gcTSGauge.Set(float64(oracle.ExtractPhysical(uint64(append.gcTS)))) | ||
| append.gcTS.Store(gcTS) | ||
| gcTSGauge.Set(float64(oracle.ExtractPhysical(uint64(gcTS)))) | ||
|
|
||
| append.maxCommitTS, err = append.readInt64(maxCommitTSKey) | ||
| if err != nil { | ||
|
|
@@ -214,7 +219,7 @@ func NewAppendWithResolver(dir string, options *Options, tiStore kv.Storage, tiL | |
| minPointer = append.handlePointer | ||
| } | ||
|
|
||
| log.Info("Append info", zap.Int64("gcTS", append.gcTS), | ||
| log.Info("Append info", zap.Int64("gcTS", gcTS), | ||
| zap.Int64("maxCommitTS", append.maxCommitTS), | ||
| zap.Reflect("headPointer", append.headPointer), | ||
| zap.Reflect("handlePointer", append.handlePointer)) | ||
|
|
@@ -654,12 +659,12 @@ func (a *Append) Close() error { | |
|
|
||
| // GetGCTS implement Storage.GetGCTS | ||
| func (a *Append) GetGCTS() int64 { | ||
| return atomic.LoadInt64(&a.gcTS) | ||
| return a.gcTS.Load() | ||
| } | ||
|
|
||
| // GC implement Storage.GC | ||
| func (a *Append) GC(ts int64) { | ||
| lastTS := atomic.LoadInt64(&a.gcTS) | ||
| lastTS := a.gcTS.Load() | ||
| if ts <= lastTS { | ||
| log.Info("ignore gc request", zap.Int64("ts", ts), zap.Int64("lastTS", lastTS)) | ||
| return | ||
|
|
@@ -673,7 +678,7 @@ func (a *Append) GC(ts int64) { | |
| return | ||
| } | ||
|
|
||
| atomic.StoreInt64(&a.gcTS, ts) | ||
| a.gcTS.Store(ts) // once `Store` returned, no guarantee for metadata or vlog. | ||
| if err := a.saveGCTSToDB(ts); err != nil { | ||
| log.Error("Failed to save GCTS", zap.Int64("ts", ts), zap.Error(err)) | ||
| } | ||
|
|
@@ -1109,7 +1114,7 @@ func (a *Append) feedPreWriteValue(cbinlog *pb.Binlog) error { | |
| } | ||
|
|
||
| // PullCommitBinlog return commit binlog > last | ||
| func (a *Append) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte { | ||
| func (a *Append) PullCommitBinlog(ctx context.Context, last int64) (<-chan []byte, <-chan error) { | ||
| log.Debug("new PullCommitBinlog", zap.Int64("last ts", last)) | ||
|
|
||
| ctx, cancel := context.WithCancel(ctx) | ||
|
|
@@ -1121,14 +1126,21 @@ func (a *Append) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte | |
| } | ||
| }() | ||
|
|
||
| gcTS := atomic.LoadInt64(&a.gcTS) | ||
| values := make(chan []byte, 5) | ||
| errs := make(chan error, 5) // we `return` after sending an error now, so it should never block on this chan. | ||
|
|
||
| gcTS := a.gcTS.Load() | ||
| if last < gcTS { | ||
| log.Warn("last ts less than gcTS", zap.Int64("last ts", last), zap.Int64("gcTS", gcTS)) | ||
| last = gcTS | ||
| if last == 0 { | ||
| log.Warn("last TS is 0, will send binlog from gcTS", zap.Int64("gcTS", gcTS)) | ||
| last = gcTS | ||
| } else { | ||
| log.Error("last TS less than gcTS, some binlog events may be loss", zap.Int64("lastTS", last), zap.Int64("gcTS", gcTS)) | ||
| errs <- errors.Annotatef(ErrRequestGCedBinlog, "requested TS %d, GC TS %d", last, gcTS) | ||
| return values, errs | ||
| } | ||
| } | ||
|
|
||
| values := make(chan []byte, 5) | ||
|
|
||
| irange := &util.Range{ | ||
| Start: encodeTSKey(0), | ||
| Limit: encodeTSKey(math.MaxInt64), | ||
|
|
@@ -1153,12 +1165,23 @@ func (a *Append) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte | |
| continue | ||
| } | ||
|
|
||
| // acquire the lock to block GC. | ||
| // NOTE: do not forget to release the lock carefully. | ||
| gcTS = a.gcTS.LoadAndLock() | ||
| if last < gcTS { | ||
| a.gcTS.ReleaseLoadLock() | ||
| log.Error("last TS less than gcTS, some binlog events may be loss", zap.Int64("lastTS", last), zap.Int64("gcTS", gcTS)) | ||
| errs <- errors.Annotatef(ErrRequestGCedBinlog, "requested TS %d, GC TS %d", last, gcTS) | ||
| return | ||
| } | ||
|
|
||
| irange.Start = encodeTSKey(startTS) | ||
| irange.Limit = encodeTSKey(limitTS) | ||
| iter := a.metadata.NewIterator(irange, nil) | ||
|
|
||
| // log.Debugf("try to get range [%d,%d)", startTS, atomic.LoadInt64(&a.maxCommitTS)+1) | ||
|
|
||
| readForLoop: | ||
| for ok := iter.Seek(encodeTSKey(startTS)); ok; ok = iter.Next() { | ||
| var vp valuePointer | ||
| err := vp.UnmarshalBinary(iter.Value()) | ||
|
|
@@ -1171,17 +1194,21 @@ func (a *Append) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte | |
|
|
||
| value, err := a.vlog.readValue(vp) | ||
| if err != nil { | ||
| log.Error("read value failed", zap.Error(err)) | ||
| iter.Release() | ||
| errorCount.WithLabelValues("read_value").Add(1.0) | ||
| a.gcTS.ReleaseLoadLock() | ||
| log.Error("read value failed", zap.Int64("TS", decodeTSKey(iter.Key())), zap.Error(err)) | ||
| errs <- errors.Errorf("read value failed, TS %d", decodeTSKey(iter.Key())) | ||
| return | ||
| } | ||
|
|
||
| binlog := new(pb.Binlog) | ||
| err = binlog.Unmarshal(value) | ||
| if err != nil { | ||
| log.Error("Unmarshal Binlog failed", zap.Error(err)) | ||
| iter.Release() | ||
| a.gcTS.ReleaseLoadLock() | ||
| log.Error("Unmarshal Binlog failed", zap.Int64("TS", decodeTSKey(iter.Key())), zap.Error(err)) | ||
| errs <- errors.Errorf("Unmarshal Binlog failed, TS %d", decodeTSKey(iter.Key())) | ||
| return | ||
| } | ||
|
|
||
|
|
@@ -1205,30 +1232,40 @@ func (a *Append) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte | |
| } | ||
|
|
||
| errorCount.WithLabelValues("feed_pre_write_value").Add(1.0) | ||
| log.Error("feed pre write value failed", zap.Error(err)) | ||
| iter.Release() | ||
| a.gcTS.ReleaseLoadLock() | ||
| log.Error("feed pre write value failed", zap.Int64("TS", decodeTSKey(iter.Key())), zap.Error(err)) | ||
| errs <- errors.Errorf("feed pre write value failed, TS %d", decodeTSKey(iter.Key())) | ||
| return | ||
| } | ||
| } | ||
|
|
||
| value, err = binlog.Marshal() | ||
| if err != nil { | ||
| log.Error("marshal failed", zap.Error(err)) | ||
| iter.Release() | ||
| a.gcTS.ReleaseLoadLock() | ||
| log.Error("marshal failed", zap.Int64("TS", decodeTSKey(iter.Key())), zap.Error(err)) | ||
| errs <- errors.Errorf("marshal failed, TS %d", decodeTSKey(iter.Key())) | ||
| return | ||
| } | ||
|
|
||
| select { | ||
| case values <- value: | ||
| log.Debug("send value success") | ||
| case <-time.After(gcMaxBlockTime): | ||
| // do not update `last` anymore. | ||
| log.Warn("can not send the binlog for a long time, will try to read again", zap.Duration("duration", gcMaxBlockTime), zap.Int64("current TS", decodeTSKey(iter.Key()))) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we send error to
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so, if drainer encounter the purge error , all pump will reach this warning 🤔️, but both drainer and pump still running. until the user manually fix it
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ? not all pump, but only this one. and pump should still be running because may other drainers can still work normally.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh, I know what you mean. any advice? |
||
| break readForLoop | ||
| case <-ctx.Done(): | ||
| iter.Release() | ||
| a.gcTS.ReleaseLoadLock() | ||
| return | ||
| } | ||
|
|
||
| last = decodeTSKey(iter.Key()) | ||
| } | ||
| iter.Release() | ||
| a.gcTS.ReleaseLoadLock() | ||
| err := iter.Error() | ||
| if err != nil { | ||
| log.Error("encounter iterator error", zap.Error(err)) | ||
|
|
@@ -1243,7 +1280,7 @@ func (a *Append) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte | |
| } | ||
| }() | ||
|
|
||
| return values | ||
| return values, errs | ||
| } | ||
|
|
||
| type storageSize struct { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.