Skip to content

Commit 57a57be

Browse files
committed
chore(txPipeline): refactor slottedCommands impl
1 parent 50d1484 commit 57a57be

File tree

1 file changed

+48
-71
lines changed

1 file changed

+48
-71
lines changed

osscluster.go

Lines changed: 48 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1526,102 +1526,79 @@ func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) err
15261526
return err
15271527
}
15281528

1529-
cmdsMap := map[int][]Cmder{}
1529+
keyedCmdsBySlot := c.slottedKeyedCommands(cmds)
15301530
slot := -1
1531-
// get only the keyed commands
1532-
keyedCmds := c.keyedCmds(cmds)
1533-
if len(keyedCmds) == 0 {
1534-
// no keyed commands try random slot
1531+
switch len(keyedCmdsBySlot) {
1532+
case 0:
15351533
slot = hashtag.RandomSlot()
1536-
} else {
1537-
// keyed commands, get slot from them
1538-
// if more than one slot, return cross slot error
1539-
cmdsBySlot := c.mapCmdsBySlot(keyedCmds)
1540-
if len(cmdsBySlot) > 1 {
1541-
// cross slot error, we have more than one slot for keyed commands
1542-
setCmdsErr(cmds, ErrCrossSlot)
1543-
return ErrCrossSlot
1544-
}
1545-
// get the slot, should be only one
1546-
for sl := range cmdsBySlot {
1534+
case 1:
1535+
for sl := range keyedCmdsBySlot {
15471536
slot = sl
15481537
break
15491538
}
1550-
}
1551-
// slot was not determined, try random one
1552-
if slot == -1 {
1553-
slot = hashtag.RandomSlot()
1554-
}
1555-
cmdsMap[slot] = cmds
1556-
1557-
// TxPipeline does not support cross slot transaction.
1558-
// double check the commands are in the same slot
1559-
if len(cmdsMap) > 1 {
1539+
default:
1540+
// TxPipeline does not support cross slot transaction.
15601541
setCmdsErr(cmds, ErrCrossSlot)
15611542
return ErrCrossSlot
15621543
}
15631544

1564-
for slot, cmds := range cmdsMap {
1565-
node, err := state.slotMasterNode(slot)
1566-
if err != nil {
1567-
setCmdsErr(cmds, err)
1568-
continue
1569-
}
1545+
node, err := state.slotMasterNode(slot)
1546+
if err != nil {
1547+
setCmdsErr(cmds, err)
1548+
return err
1549+
}
15701550

1571-
cmdsMap := map[*clusterNode][]Cmder{node: cmds}
1572-
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
1573-
if attempt > 0 {
1574-
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
1575-
setCmdsErr(cmds, err)
1576-
return err
1577-
}
1551+
cmdsMap := map[*clusterNode][]Cmder{node: cmds}
1552+
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
1553+
if attempt > 0 {
1554+
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
1555+
setCmdsErr(cmds, err)
1556+
return err
15781557
}
1558+
}
15791559

1580-
failedCmds := newCmdsMap()
1581-
var wg sync.WaitGroup
1560+
failedCmds := newCmdsMap()
1561+
c.processTxPipelineNode(ctx, node, cmds, failedCmds)
1562+
var wg sync.WaitGroup
15821563

1583-
for node, cmds := range cmdsMap {
1584-
wg.Add(1)
1585-
go func(node *clusterNode, cmds []Cmder) {
1586-
defer wg.Done()
1587-
c.processTxPipelineNode(ctx, node, cmds, failedCmds)
1588-
}(node, cmds)
1589-
}
1564+
for node, cmds := range cmdsMap {
1565+
wg.Add(1)
1566+
go func(node *clusterNode, cmds []Cmder) {
1567+
defer wg.Done()
1568+
c.processTxPipelineNode(ctx, node, cmds, failedCmds)
1569+
}(node, cmds)
1570+
}
15901571

1591-
wg.Wait()
1592-
if len(failedCmds.m) == 0 {
1593-
break
1594-
}
1595-
cmdsMap = failedCmds.m
1572+
wg.Wait()
1573+
if len(failedCmds.m) == 0 {
1574+
break
15961575
}
1576+
cmdsMap = failedCmds.m
15971577
}
15981578

15991579
return cmdsFirstErr(cmds)
16001580
}
16011581

1602-
func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
1603-
cmdsMap := make(map[int][]Cmder)
1604-
preferredRandomSlot := -1
1582+
// slottedKeyedCommands returns a map of slot to commands taking into account
1583+
// only commands that have keys.
1584+
func (c *ClusterClient) slottedKeyedCommands(cmds []Cmder) map[int][]Cmder {
1585+
cmdsSlots := map[int][]Cmder{}
1586+
1587+
prefferedRandomSlot := -1
16051588
for _, cmd := range cmds {
1606-
slot := c.cmdSlot(cmd, preferredRandomSlot)
1607-
if preferredRandomSlot == -1 {
1608-
preferredRandomSlot = slot
1589+
if cmdFirstKeyPos(cmd) == 0 {
1590+
continue
16091591
}
1610-
cmdsMap[slot] = append(cmdsMap[slot], cmd)
1611-
}
1612-
return cmdsMap
1613-
}
16141592

1615-
// keyedCmds returns all the keyed commands from the cmds slice
1616-
// it determines keyed commands by checking if the command has a first key position
1617-
func (c *ClusterClient) keyedCmds(cmds []Cmder) []Cmder {
1618-
keyedCmds := make([]Cmder, 0, len(cmds))
1619-
for _, cmd := range cmds {
1620-
if cmdFirstKeyPos(cmd) != 0 {
1621-
keyedCmds = append(keyedCmds, cmd)
1593+
slot := c.cmdSlot(cmd, prefferedRandomSlot)
1594+
if prefferedRandomSlot == -1 {
1595+
prefferedRandomSlot = slot
16221596
}
1597+
1598+
cmdsSlots[slot] = append(cmdsSlots[slot], cmd)
16231599
}
1624-
return keyedCmds
1600+
1601+
return cmdsSlots
16251602
}
16261603

16271604
func (c *ClusterClient) processTxPipelineNode(

0 commit comments

Comments
 (0)