Skip to content

Commit 2bbcdaa

Browse files
committed
chore(txPipeline): refactor slottedCommands impl
1 parent 50d1484 commit 2bbcdaa

File tree

1 file changed

+47
-71
lines changed

1 file changed

+47
-71
lines changed

osscluster.go

Lines changed: 47 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1526,102 +1526,78 @@ func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) err
15261526
return err
15271527
}
15281528

1529-
cmdsMap := map[int][]Cmder{}
1529+
keyedCmdsBySlot := c.slottedKeyedCommands(cmds)
15301530
slot := -1
1531-
// get only the keyed commands
1532-
keyedCmds := c.keyedCmds(cmds)
1533-
if len(keyedCmds) == 0 {
1534-
// no keyed commands try random slot
1531+
switch len(keyedCmdsBySlot) {
1532+
case 0:
15351533
slot = hashtag.RandomSlot()
1536-
} else {
1537-
// keyed commands, get slot from them
1538-
// if more than one slot, return cross slot error
1539-
cmdsBySlot := c.mapCmdsBySlot(keyedCmds)
1540-
if len(cmdsBySlot) > 1 {
1541-
// cross slot error, we have more than one slot for keyed commands
1542-
setCmdsErr(cmds, ErrCrossSlot)
1543-
return ErrCrossSlot
1544-
}
1545-
// get the slot, should be only one
1546-
for sl := range cmdsBySlot {
1534+
case 1:
1535+
for sl := range keyedCmdsBySlot {
15471536
slot = sl
15481537
break
15491538
}
1550-
}
1551-
// slot was not determined, try random one
1552-
if slot == -1 {
1553-
slot = hashtag.RandomSlot()
1554-
}
1555-
cmdsMap[slot] = cmds
1556-
1557-
// TxPipeline does not support cross slot transaction.
1558-
// double check the commands are in the same slot
1559-
if len(cmdsMap) > 1 {
1539+
default:
1540+
// TxPipeline does not support cross slot transaction.
15601541
setCmdsErr(cmds, ErrCrossSlot)
15611542
return ErrCrossSlot
15621543
}
15631544

1564-
for slot, cmds := range cmdsMap {
1565-
node, err := state.slotMasterNode(slot)
1566-
if err != nil {
1567-
setCmdsErr(cmds, err)
1568-
continue
1569-
}
1545+
node, err := state.slotMasterNode(slot)
1546+
if err != nil {
1547+
setCmdsErr(cmds, err)
1548+
return err
1549+
}
15701550

1571-
cmdsMap := map[*clusterNode][]Cmder{node: cmds}
1572-
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
1573-
if attempt > 0 {
1574-
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
1575-
setCmdsErr(cmds, err)
1576-
return err
1577-
}
1551+
cmdsMap := map[*clusterNode][]Cmder{node: cmds}
1552+
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
1553+
if attempt > 0 {
1554+
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
1555+
setCmdsErr(cmds, err)
1556+
return err
15781557
}
1558+
}
15791559

1580-
failedCmds := newCmdsMap()
1581-
var wg sync.WaitGroup
1560+
failedCmds := newCmdsMap()
1561+
var wg sync.WaitGroup
15821562

1583-
for node, cmds := range cmdsMap {
1584-
wg.Add(1)
1585-
go func(node *clusterNode, cmds []Cmder) {
1586-
defer wg.Done()
1587-
c.processTxPipelineNode(ctx, node, cmds, failedCmds)
1588-
}(node, cmds)
1589-
}
1563+
for node, cmds := range cmdsMap {
1564+
wg.Add(1)
1565+
go func(node *clusterNode, cmds []Cmder) {
1566+
defer wg.Done()
1567+
c.processTxPipelineNode(ctx, node, cmds, failedCmds)
1568+
}(node, cmds)
1569+
}
15901570

1591-
wg.Wait()
1592-
if len(failedCmds.m) == 0 {
1593-
break
1594-
}
1595-
cmdsMap = failedCmds.m
1571+
wg.Wait()
1572+
if len(failedCmds.m) == 0 {
1573+
break
15961574
}
1575+
cmdsMap = failedCmds.m
15971576
}
15981577

15991578
return cmdsFirstErr(cmds)
16001579
}
16011580

1602-
func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
1603-
cmdsMap := make(map[int][]Cmder)
1604-
preferredRandomSlot := -1
1581+
// slottedKeyedCommands returns a map of slot to commands taking into account
1582+
// only commands that have keys.
1583+
func (c *ClusterClient) slottedKeyedCommands(cmds []Cmder) map[int][]Cmder {
1584+
cmdsSlots := map[int][]Cmder{}
1585+
1586+
prefferedRandomSlot := -1
16051587
for _, cmd := range cmds {
1606-
slot := c.cmdSlot(cmd, preferredRandomSlot)
1607-
if preferredRandomSlot == -1 {
1608-
preferredRandomSlot = slot
1588+
if cmdFirstKeyPos(cmd) == 0 {
1589+
continue
16091590
}
1610-
cmdsMap[slot] = append(cmdsMap[slot], cmd)
1611-
}
1612-
return cmdsMap
1613-
}
16141591

1615-
// keyedCmds returns all the keyed commands from the cmds slice
1616-
// it determines keyed commands by checking if the command has a first key position
1617-
func (c *ClusterClient) keyedCmds(cmds []Cmder) []Cmder {
1618-
keyedCmds := make([]Cmder, 0, len(cmds))
1619-
for _, cmd := range cmds {
1620-
if cmdFirstKeyPos(cmd) != 0 {
1621-
keyedCmds = append(keyedCmds, cmd)
1592+
slot := c.cmdSlot(cmd, prefferedRandomSlot)
1593+
if prefferedRandomSlot == -1 {
1594+
prefferedRandomSlot = slot
16221595
}
1596+
1597+
cmdsSlots[slot] = append(cmdsSlots[slot], cmd)
16231598
}
1624-
return keyedCmds
1599+
1600+
return cmdsSlots
16251601
}
16261602

16271603
func (c *ClusterClient) processTxPipelineNode(

0 commit comments

Comments
 (0)