From 402fc2e41e4765330bbda7bd8dc5111a600ee305 Mon Sep 17 00:00:00 2001 From: Bulat Aykaev Date: Fri, 13 Sep 2019 17:27:36 +0300 Subject: [PATCH 1/2] fix(canal): Preserve binlog filename received in the fake rotate event #406 --- canal/sync.go | 33 ++++++++++++++++++++++++++++----- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/canal/sync.go b/canal/sync.go index b0bb6dae6..d087edd5e 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -43,6 +43,11 @@ func (c *Canal) runSyncBinlog() error { savePos := false force := false + + // The name of the binlog file received in the fake rotate event. + // It must be preserved until the new position is saved. + fakeRotateLogName := "" + for { ev, err := s.GetEvent(c.ctx) if err != nil { @@ -51,19 +56,35 @@ func (c *Canal) runSyncBinlog() error { // Update the delay between the Canal and the Master before the handler hooks are called c.updateReplicationDelay(ev) - // if log pos equal zero ,it is a fake rotate event,ignore it. - // see https://github.com/mysql/mysql-server/blob/8cc757da3d87bf4a1f07dcfb2d3c96fed3806870/sql/rpl_binlog_sender.cc#L899 - if ev.Header.LogPos == 0 { - continue - } + savePos = false force = false pos := c.master.Position() curPos := pos.Pos + + // If log pos equals zero then the received event is a fake rotate event and + // contains only a name of the next binlog file + // See https://github.com/mysql/mysql-server/blob/8e797a5d6eb3a87f16498edcb7261a75897babae/sql/rpl_binlog_sender.h#L235 + // and https://github.com/mysql/mysql-server/blob/8cc757da3d87bf4a1f07dcfb2d3c96fed3806870/sql/rpl_binlog_sender.cc#L899 + if ev.Header.LogPos == 0 { + switch e := ev.Event.(type) { + case *replication.RotateEvent: + fakeRotateLogName = string(e.NextLogName) + log.Infof("received fake rotate event, next log name is %s", e.NextLogName) + } + + continue + } + // next binlog pos pos.Pos = ev.Header.LogPos + // new file name received in the fake rotate event + if fakeRotateLogName != "" { + pos.Name = fakeRotateLogName + } + // We only save position with RotateEvent and XIDEvent. // For RowsEvent, we can't save the position until meeting XIDEvent // which tells the whole transaction is over. @@ -154,6 +175,8 @@ func (c *Canal) runSyncBinlog() error { if savePos { c.master.Update(pos) c.master.UpdateTimestamp(ev.Header.Timestamp) + fakeRotateLogName = "" + if err := c.eventHandler.OnPosSynced(pos, c.master.GTIDSet(), force); err != nil { return errors.Trace(err) } From e03ad54d6bacb3e42a4a840e816f9769d2b9f022 Mon Sep 17 00:00:00 2001 From: Bulat Aykaev Date: Thu, 19 Sep 2019 19:34:06 +0300 Subject: [PATCH 2/2] style: Move check of the header pos to the initial place --- canal/sync.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/canal/sync.go b/canal/sync.go index d087edd5e..89bc0159d 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -57,12 +57,6 @@ func (c *Canal) runSyncBinlog() error { // Update the delay between the Canal and the Master before the handler hooks are called c.updateReplicationDelay(ev) - savePos = false - force = false - pos := c.master.Position() - - curPos := pos.Pos - // If log pos equals zero then the received event is a fake rotate event and // contains only a name of the next binlog file // See https://github.com/mysql/mysql-server/blob/8e797a5d6eb3a87f16498edcb7261a75897babae/sql/rpl_binlog_sender.h#L235 @@ -77,6 +71,11 @@ func (c *Canal) runSyncBinlog() error { continue } + savePos = false + force = false + pos := c.master.Position() + + curPos := pos.Pos // next binlog pos pos.Pos = ev.Header.LogPos