From 2a32680c8a3100f3cd0650d49bd77c235b002a4c Mon Sep 17 00:00:00 2001 From: Nedyalko Dyakov Date: Thu, 19 Jun 2025 11:40:45 +0300 Subject: [PATCH 1/9] fix(txpipeline): keyless commands should take the slot of the keyed commands --- osscluster.go | 42 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/osscluster.go b/osscluster.go index 0dce50a4a..b56edfd74 100644 --- a/osscluster.go +++ b/osscluster.go @@ -1507,8 +1507,36 @@ func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) err return err } - cmdsMap := c.mapCmdsBySlot(cmds) + cmdsMap := map[int][]Cmder{} + slot := -1 + // split keyed and keyless commands + keyedCmds, _ := c.keyedAndKeyessCmds(cmds) + if len(keyedCmds) == 0 { + // no keyed commands try random slot + slot = hashtag.RandomSlot() + } else { + // keyed commands, get slot from them + // if more than one slot, return cross slot error + cmdsBySlot := c.mapCmdsBySlot(keyedCmds) + if len(cmdsBySlot) > 1 { + // cross slot error, we have more than one slot for keyed commands + setCmdsErr(cmds, ErrCrossSlot) + return ErrCrossSlot + } + // get the slot, should be only one + for sl := range cmdsBySlot { + slot = sl + break + } + } + // slot was not determined, try random one + if slot == -1 { + slot = hashtag.RandomSlot() + } + cmdsMap[slot] = cmds + // TxPipeline does not support cross slot transaction. + // double check the commands are in the same slot if len(cmdsMap) > 1 { setCmdsErr(cmds, ErrCrossSlot) return ErrCrossSlot @@ -1560,6 +1588,18 @@ func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder { } return cmdsMap } +func (c *ClusterClient) keyedAndKeyessCmds(cmds []Cmder) ([]Cmder, []Cmder) { + keyedCmds := make([]Cmder, 0, len(cmds)) + keylessCmds := make([]Cmder, 0, len(cmds)) + for _, cmd := range cmds { + if cmdFirstKeyPos(cmd) == 0 { + keylessCmds = append(keylessCmds, cmd) + } else { + keyedCmds = append(keyedCmds, cmd) + } + } + return keyedCmds, keylessCmds +} func (c *ClusterClient) processTxPipelineNode( ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap, From fffa489c049ab17db11bcacf4b0e74142b32a282 Mon Sep 17 00:00:00 2001 From: Nedyalko Dyakov Date: Thu, 19 Jun 2025 11:52:09 +0300 Subject: [PATCH 2/9] fix(txpipeline): extract only keyed cmds from all cmds --- osscluster.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/osscluster.go b/osscluster.go index b3665cc28..486309293 100644 --- a/osscluster.go +++ b/osscluster.go @@ -1521,8 +1521,8 @@ func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) err cmdsMap := map[int][]Cmder{} slot := -1 - // split keyed and keyless commands - keyedCmds, _ := c.keyedAndKeyessCmds(cmds) + // get only the keyed commands + keyedCmds := c.keyedCmds(cmds) if len(keyedCmds) == 0 { // no keyed commands try random slot slot = hashtag.RandomSlot() @@ -1600,17 +1600,17 @@ func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder { } return cmdsMap } -func (c *ClusterClient) keyedAndKeyessCmds(cmds []Cmder) ([]Cmder, []Cmder) { + +// keyedCmds returns all the keyed commands from the cmds slice +// it determines keyed commands by checking if the command has a first key position +func (c *ClusterClient) keyedCmds(cmds []Cmder) []Cmder { keyedCmds := make([]Cmder, 0, len(cmds)) - keylessCmds := make([]Cmder, 0, len(cmds)) for _, cmd := range cmds { - if cmdFirstKeyPos(cmd) == 0 { - keylessCmds = append(keylessCmds, cmd) - } else { + if cmdFirstKeyPos(cmd) != 0 { keyedCmds = append(keyedCmds, cmd) } } - return keyedCmds, keylessCmds + return keyedCmds } func (c *ClusterClient) processTxPipelineNode( From 4320079ad70996801a529f6fe9dcc7d8df921f38 Mon Sep 17 00:00:00 2001 From: Nedyalko Dyakov Date: Thu, 19 Jun 2025 11:55:57 +0300 Subject: [PATCH 3/9] chore(test): Add tests for keyless cmds and txpipeline --- osscluster_test.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/osscluster_test.go b/osscluster_test.go index 10023218d..2c7f40a5f 100644 --- a/osscluster_test.go +++ b/osscluster_test.go @@ -603,6 +603,15 @@ var _ = Describe("ClusterClient", func() { Expect(err).To(MatchError(redis.ErrCrossSlot)) }) + It("works normally with keyless commands and no CrossSlot error", func() { + pipe.Set(ctx, "A{s}", "A_value", 0) + pipe.Ping(ctx) + pipe.Set(ctx, "B{s}", "B_value", 0) + pipe.Ping(ctx) + _, err := pipe.Exec(ctx) + Expect(err).To(Not(HaveOccurred())) + }) + // doesn't fail when no commands are queued It("returns no error when there are no commands", func() { _, err := pipe.Exec(ctx) From 0f66cd002d49a7b37c6677344958c4b2be032c92 Mon Sep 17 00:00:00 2001 From: Nedyalko Dyakov Date: Thu, 19 Jun 2025 12:07:36 +0300 Subject: [PATCH 4/9] fix(cmdSlot): Add preferred random slot --- internal_test.go | 11 +++++++++-- osscluster.go | 28 +++++++++++++++++++++------- 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/internal_test.go b/internal_test.go index 8f1f1f312..4a655cff0 100644 --- a/internal_test.go +++ b/internal_test.go @@ -364,15 +364,22 @@ var _ = Describe("ClusterClient", func() { It("select slot from args for GETKEYSINSLOT command", func() { cmd := NewStringSliceCmd(ctx, "cluster", "getkeysinslot", 100, 200) - slot := client.cmdSlot(cmd) + slot := client.cmdSlot(cmd, -1) Expect(slot).To(Equal(100)) }) It("select slot from args for COUNTKEYSINSLOT command", func() { cmd := NewStringSliceCmd(ctx, "cluster", "countkeysinslot", 100) - slot := client.cmdSlot(cmd) + slot := client.cmdSlot(cmd, -1) Expect(slot).To(Equal(100)) }) + + It("follows preferred random slot", func() { + cmd := NewStatusCmd(ctx, "ping") + + slot := client.cmdSlot(cmd, 101) + Expect(slot).To(Equal(101)) + }) }) }) diff --git a/osscluster.go b/osscluster.go index 486309293..d92a26b5d 100644 --- a/osscluster.go +++ b/osscluster.go @@ -998,7 +998,7 @@ func (c *ClusterClient) Process(ctx context.Context, cmd Cmder) error { } func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error { - slot := c.cmdSlot(cmd) + slot := c.cmdSlot(cmd, -1) var node *clusterNode var moved bool var ask bool @@ -1344,9 +1344,13 @@ func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmd return err } + preferredRandomSlot := -1 if c.opt.ReadOnly && c.cmdsAreReadOnly(ctx, cmds) { for _, cmd := range cmds { - slot := c.cmdSlot(cmd) + slot := c.cmdSlot(cmd, preferredRandomSlot) + if preferredRandomSlot == -1 { + preferredRandomSlot = slot + } node, err := c.slotReadOnlyNode(state, slot) if err != nil { return err @@ -1357,7 +1361,10 @@ func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmd } for _, cmd := range cmds { - slot := c.cmdSlot(cmd) + slot := c.cmdSlot(cmd, preferredRandomSlot) + if preferredRandomSlot == -1 { + preferredRandomSlot = slot + } node, err := state.slotMasterNode(slot) if err != nil { return err @@ -1594,8 +1601,12 @@ func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) err func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder { cmdsMap := make(map[int][]Cmder) + preferredRandomSlot := -1 for _, cmd := range cmds { - slot := c.cmdSlot(cmd) + slot := c.cmdSlot(cmd, preferredRandomSlot) + if preferredRandomSlot == -1 { + preferredRandomSlot = slot + } cmdsMap[slot] = append(cmdsMap[slot], cmd) } return cmdsMap @@ -1925,17 +1936,20 @@ func (c *ClusterClient) cmdInfo(ctx context.Context, name string) *CommandInfo { return info } -func (c *ClusterClient) cmdSlot(cmd Cmder) int { +func (c *ClusterClient) cmdSlot(cmd Cmder, preferredRandomSlot int) int { args := cmd.Args() if args[0] == "cluster" && (args[1] == "getkeysinslot" || args[1] == "countkeysinslot") { return args[2].(int) } - return cmdSlot(cmd, cmdFirstKeyPos(cmd)) + return cmdSlot(cmd, cmdFirstKeyPos(cmd), preferredRandomSlot) } -func cmdSlot(cmd Cmder, pos int) int { +func cmdSlot(cmd Cmder, pos int, preferredRandomSlot int) int { if pos == 0 { + if preferredRandomSlot != -1 { + return preferredRandomSlot + } return hashtag.RandomSlot() } firstKey := cmd.stringArg(pos) From 185ec0146171f08bee70638eb3eb0a74e71f1349 Mon Sep 17 00:00:00 2001 From: Nedyalko Dyakov Date: Thu, 19 Jun 2025 12:26:06 +0300 Subject: [PATCH 5/9] fix(cmdSlot): Add shortlist of keyless cmds --- command.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/command.go b/command.go index 652e241be..17d609e5c 100644 --- a/command.go +++ b/command.go @@ -75,12 +75,17 @@ func writeCmd(wr *proto.Writer, cmd Cmder) error { return wr.WriteArgs(cmd.Args()) } +// cmdFirstKeyPos returns the position of the first key in the command's arguments. +// If the command does not have a key, it returns 0. +// TODO: Use the data in CommandInfo to determine the first key position. func cmdFirstKeyPos(cmd Cmder) int { if pos := cmd.firstKeyPos(); pos != 0 { return int(pos) } switch cmd.Name() { + case "echo", "ping", "command": + return 0 case "eval", "evalsha", "eval_ro", "evalsha_ro": if cmd.stringArg(2) != "0" { return 3 From c368e7e01f2ddf45a687fbaa6636a2c6df5ced5d Mon Sep 17 00:00:00 2001 From: Nedyalko Dyakov Date: Thu, 19 Jun 2025 14:12:14 +0300 Subject: [PATCH 6/9] chore(test): Fix ring test --- ring_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/ring_test.go b/ring_test.go index ef95e9805..5fd7d9823 100644 --- a/ring_test.go +++ b/ring_test.go @@ -304,7 +304,7 @@ var _ = Describe("Redis Ring", func() { ring = redis.NewRing(opt) }) It("supports Process hook", func() { - err := ring.Ping(ctx).Err() + err := ring.Set(ctx, "key", "test", 0).Err() Expect(err).NotTo(HaveOccurred()) var stack []string @@ -312,12 +312,12 @@ var _ = Describe("Redis Ring", func() { ring.AddHook(&hook{ processHook: func(hook redis.ProcessHook) redis.ProcessHook { return func(ctx context.Context, cmd redis.Cmder) error { - Expect(cmd.String()).To(Equal("ping: ")) + Expect(cmd.String()).To(Equal("get key: ")) stack = append(stack, "ring.BeforeProcess") err := hook(ctx, cmd) - Expect(cmd.String()).To(Equal("ping: PONG")) + Expect(cmd.String()).To(Equal("get key: test")) stack = append(stack, "ring.AfterProcess") return err @@ -329,12 +329,12 @@ var _ = Describe("Redis Ring", func() { shard.AddHook(&hook{ processHook: func(hook redis.ProcessHook) redis.ProcessHook { return func(ctx context.Context, cmd redis.Cmder) error { - Expect(cmd.String()).To(Equal("ping: ")) + Expect(cmd.String()).To(Equal("get key: ")) stack = append(stack, "shard.BeforeProcess") err := hook(ctx, cmd) - Expect(cmd.String()).To(Equal("ping: PONG")) + Expect(cmd.String()).To(Equal("get key: test")) stack = append(stack, "shard.AfterProcess") return err @@ -344,7 +344,7 @@ var _ = Describe("Redis Ring", func() { return nil }) - err = ring.Ping(ctx).Err() + err = ring.Get(ctx, "key").Err() Expect(err).NotTo(HaveOccurred()) Expect(stack).To(Equal([]string{ "ring.BeforeProcess", From 49906ee0fe9e5fa8eb26312d0a9b0f41d1c90c99 Mon Sep 17 00:00:00 2001 From: Nedyalko Dyakov Date: Thu, 19 Jun 2025 14:36:26 +0300 Subject: [PATCH 7/9] fix(keylessCommands): Add list of keyless commands Add list of keyless Commands based on the Commands output for redis 8 --- command.go | 58 ++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 56 insertions(+), 2 deletions(-) diff --git a/command.go b/command.go index 17d609e5c..b79338cb9 100644 --- a/command.go +++ b/command.go @@ -17,6 +17,55 @@ import ( "github.com/redis/go-redis/v9/internal/util" ) +// keylessCommands contains Redis commands that have empty key specifications (9th slot empty) +// Only includes core Redis commands, excludes FT.*, ts.*, timeseries.*, search.* and subcommands +var keylessCommands = map[string]struct{}{ + "acl": {}, + "asking": {}, + "auth": {}, + "bgrewriteaof": {}, + "bgsave": {}, + "client": {}, + "cluster": {}, + "config": {}, + "debug": {}, + "discard": {}, + "echo": {}, + "exec": {}, + "failover": {}, + "function": {}, + "hello": {}, + "latency": {}, + "lolwut": {}, + "module": {}, + "monitor": {}, + "multi": {}, + "pfselftest": {}, + "ping": {}, + "psubscribe": {}, + "psync": {}, + "publish": {}, + "pubsub": {}, + "punsubscribe": {}, + "quit": {}, + "readonly": {}, + "readwrite": {}, + "replconf": {}, + "replicaof": {}, + "role": {}, + "save": {}, + "script": {}, + "select": {}, + "shutdown": {}, + "slaveof": {}, + "slowlog": {}, + "subscribe": {}, + "swapdb": {}, + "sync": {}, + "unsubscribe": {}, + "unwatch": {}, +} + type Cmder interface { // command name. // e.g. "set k v ex 10" -> "set", "cluster info" -> "cluster". @@ -83,9 +132,14 @@ func cmdFirstKeyPos(cmd Cmder) int { return int(pos) } - switch cmd.Name() { - case "echo", "ping", "command": + name := cmd.Name() + + // first check if the command is keyless + if _, ok := keylessCommands[name]; ok { return 0 + } + + switch name { case "eval", "evalsha", "eval_ro", "evalsha_ro": if cmd.stringArg(2) != "0" { return 3 From 2bbcdaa32b89e098ea1274c1ba9663b0a42bb424 Mon Sep 17 00:00:00 2001 From: Nedyalko Dyakov Date: Mon, 23 Jun 2025 17:41:14 +0300 Subject: [PATCH 8/9] chore(txPipeline): refactor slottedCommands impl --- osscluster.go | 118 ++++++++++++++++++++------------------------------ 1 file changed, 47 insertions(+), 71 deletions(-) diff --git a/osscluster.go b/osscluster.go index d92a26b5d..d68eff4f1 100644 --- a/osscluster.go +++ b/osscluster.go @@ -1526,102 +1526,78 @@ func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) err return err } - cmdsMap := map[int][]Cmder{} + keyedCmdsBySlot := c.slottedKeyedCommands(cmds) slot := -1 - // get only the keyed commands - keyedCmds := c.keyedCmds(cmds) - if len(keyedCmds) == 0 { - // no keyed commands try random slot + switch len(keyedCmdsBySlot) { + case 0: slot = hashtag.RandomSlot() - } else { - // keyed commands, get slot from them - // if more than one slot, return cross slot error - cmdsBySlot := c.mapCmdsBySlot(keyedCmds) - if len(cmdsBySlot) > 1 { - // cross slot error, we have more than one slot for keyed commands - setCmdsErr(cmds, ErrCrossSlot) - return ErrCrossSlot - } - // get the slot, should be only one - for sl := range cmdsBySlot { + case 1: + for sl := range keyedCmdsBySlot { slot = sl break } - } - // slot was not determined, try random one - if slot == -1 { - slot = hashtag.RandomSlot() - } - cmdsMap[slot] = cmds - - // TxPipeline does not support cross slot transaction. - // double check the commands are in the same slot - if len(cmdsMap) > 1 { + default: + // TxPipeline does not support cross slot transaction. setCmdsErr(cmds, ErrCrossSlot) return ErrCrossSlot } - for slot, cmds := range cmdsMap { - node, err := state.slotMasterNode(slot) - if err != nil { - setCmdsErr(cmds, err) - continue - } + node, err := state.slotMasterNode(slot) + if err != nil { + setCmdsErr(cmds, err) + return err + } - cmdsMap := map[*clusterNode][]Cmder{node: cmds} - for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { - if attempt > 0 { - if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { - setCmdsErr(cmds, err) - return err - } + cmdsMap := map[*clusterNode][]Cmder{node: cmds} + for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { + if attempt > 0 { + if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { + setCmdsErr(cmds, err) + return err } + } - failedCmds := newCmdsMap() - var wg sync.WaitGroup + failedCmds := newCmdsMap() + var wg sync.WaitGroup - for node, cmds := range cmdsMap { - wg.Add(1) - go func(node *clusterNode, cmds []Cmder) { - defer wg.Done() - c.processTxPipelineNode(ctx, node, cmds, failedCmds) - }(node, cmds) - } + for node, cmds := range cmdsMap { + wg.Add(1) + go func(node *clusterNode, cmds []Cmder) { + defer wg.Done() + c.processTxPipelineNode(ctx, node, cmds, failedCmds) + }(node, cmds) + } - wg.Wait() - if len(failedCmds.m) == 0 { - break - } - cmdsMap = failedCmds.m + wg.Wait() + if len(failedCmds.m) == 0 { + break } + cmdsMap = failedCmds.m } return cmdsFirstErr(cmds) } -func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder { - cmdsMap := make(map[int][]Cmder) - preferredRandomSlot := -1 +// slottedKeyedCommands returns a map of slot to commands taking into account +// only commands that have keys. +func (c *ClusterClient) slottedKeyedCommands(cmds []Cmder) map[int][]Cmder { + cmdsSlots := map[int][]Cmder{} + + prefferedRandomSlot := -1 for _, cmd := range cmds { - slot := c.cmdSlot(cmd, preferredRandomSlot) - if preferredRandomSlot == -1 { - preferredRandomSlot = slot + if cmdFirstKeyPos(cmd) == 0 { + continue } - cmdsMap[slot] = append(cmdsMap[slot], cmd) - } - return cmdsMap -} -// keyedCmds returns all the keyed commands from the cmds slice -// it determines keyed commands by checking if the command has a first key position -func (c *ClusterClient) keyedCmds(cmds []Cmder) []Cmder { - keyedCmds := make([]Cmder, 0, len(cmds)) - for _, cmd := range cmds { - if cmdFirstKeyPos(cmd) != 0 { - keyedCmds = append(keyedCmds, cmd) + slot := c.cmdSlot(cmd, prefferedRandomSlot) + if prefferedRandomSlot == -1 { + prefferedRandomSlot = slot } + + cmdsSlots[slot] = append(cmdsSlots[slot], cmd) } - return keyedCmds + + return cmdsSlots } func (c *ClusterClient) processTxPipelineNode( From ecc985b75b692dc7881c6c47fc8ac0012e89181c Mon Sep 17 00:00:00 2001 From: Nedyalko Dyakov Date: Tue, 24 Jun 2025 09:30:25 +0300 Subject: [PATCH 9/9] fix(osscluster): typo --- osscluster.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/osscluster.go b/osscluster.go index d68eff4f1..0526022ba 100644 --- a/osscluster.go +++ b/osscluster.go @@ -1583,15 +1583,15 @@ func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) err func (c *ClusterClient) slottedKeyedCommands(cmds []Cmder) map[int][]Cmder { cmdsSlots := map[int][]Cmder{} - prefferedRandomSlot := -1 + preferredRandomSlot := -1 for _, cmd := range cmds { if cmdFirstKeyPos(cmd) == 0 { continue } - slot := c.cmdSlot(cmd, prefferedRandomSlot) - if prefferedRandomSlot == -1 { - prefferedRandomSlot = slot + slot := c.cmdSlot(cmd, preferredRandomSlot) + if preferredRandomSlot == -1 { + preferredRandomSlot = slot } cmdsSlots[slot] = append(cmdsSlots[slot], cmd)