Skip to content

fix(canal): Preserve binlog filename received in the fake rotate event #428

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 23, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions canal/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ func (c *Canal) runSyncBinlog() error {

savePos := false
force := false

// The name of the binlog file received in the fake rotate event.
// It must be preserved until the new position is saved.
fakeRotateLogName := ""

for {
ev, err := s.GetEvent(c.ctx)
if err != nil {
Expand All @@ -51,11 +56,21 @@ func (c *Canal) runSyncBinlog() error {

// Update the delay between the Canal and the Master before the handler hooks are called
c.updateReplicationDelay(ev)
// if log pos equal zero ,it is a fake rotate event,ignore it.
// see https://github.com/mysql/mysql-server/blob/8cc757da3d87bf4a1f07dcfb2d3c96fed3806870/sql/rpl_binlog_sender.cc#L899

// If log pos equals zero then the received event is a fake rotate event and
// contains only a name of the next binlog file
// See https://github.com/mysql/mysql-server/blob/8e797a5d6eb3a87f16498edcb7261a75897babae/sql/rpl_binlog_sender.h#L235
// and https://github.com/mysql/mysql-server/blob/8cc757da3d87bf4a1f07dcfb2d3c96fed3806870/sql/rpl_binlog_sender.cc#L899
if ev.Header.LogPos == 0 {
switch e := ev.Event.(type) {
case *replication.RotateEvent:
fakeRotateLogName = string(e.NextLogName)
log.Infof("received fake rotate event, next log name is %s", e.NextLogName)
}

continue
}

savePos = false
force = false
pos := c.master.Position()
Expand All @@ -64,6 +79,11 @@ func (c *Canal) runSyncBinlog() error {
// next binlog pos
pos.Pos = ev.Header.LogPos

// new file name received in the fake rotate event
if fakeRotateLogName != "" {
pos.Name = fakeRotateLogName
}

// We only save position with RotateEvent and XIDEvent.
// For RowsEvent, we can't save the position until meeting XIDEvent
// which tells the whole transaction is over.
Expand Down Expand Up @@ -154,6 +174,8 @@ func (c *Canal) runSyncBinlog() error {
if savePos {
c.master.Update(pos)
c.master.UpdateTimestamp(ev.Header.Timestamp)
fakeRotateLogName = ""

if err := c.eventHandler.OnPosSynced(pos, c.master.GTIDSet(), force); err != nil {
return errors.Trace(err)
}
Expand Down