Skip to content

Commit dfc2756

Browse files
committed
Use node Addresses when updating Loadbalancer
1 parent 26508a7 commit dfc2756

File tree

4 files changed

+219
-195
lines changed

4 files changed

+219
-195
lines changed

internal/core/events.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ type Event struct {
1717
Type EventType
1818
Service *v1.Service
1919
PreviousService *v1.Service
20+
NodeIps []string
2021
}
2122

2223
type ServerUpdateEvent struct {
@@ -27,11 +28,12 @@ type ServerUpdateEvent struct {
2728

2829
type ServerUpdateEvents = []*ServerUpdateEvent
2930

30-
func NewEvent(eventType EventType, service *v1.Service, previousService *v1.Service) Event {
31+
func NewEvent(eventType EventType, service *v1.Service, previousService *v1.Service, nodeIps []string) Event {
3132
return Event{
3233
Type: eventType,
3334
Service: service,
3435
PreviousService: previousService,
36+
NodeIps: nodeIps,
3537
}
3638
}
3739

internal/observation/watcher.go

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"k8s.io/client-go/kubernetes"
1717
"k8s.io/client-go/rest"
1818
"k8s.io/client-go/tools/cache"
19+
"time"
1920
)
2021

2122
const NginxIngressNamespace = "nginx-ingress"
@@ -63,13 +64,6 @@ func (w *Watcher) Watch() error {
6364
defer utilruntime.HandleCrash()
6465
defer w.handler.ShutDown()
6566

66-
nodeIps, err := w.retrieveNodeIps()
67-
if err != nil {
68-
return fmt.Errorf(`error occurred retrieving node ips: %w`, err)
69-
}
70-
71-
logrus.Infof("Watcher::Watch::nodeIps: %v", nodeIps)
72-
7367
go w.informer.Run(w.ctx.Done())
7468

7569
if !cache.WaitForNamedCacheSync(WatcherQueueName, w.ctx.Done(), w.informer.HasSynced) {
@@ -83,29 +77,44 @@ func (w *Watcher) Watch() error {
8377
func (w *Watcher) buildEventHandlerForAdd() func(interface{}) {
8478
logrus.Info("Watcher::buildEventHandlerForAdd")
8579
return func(obj interface{}) {
80+
nodeIps, err := w.retrieveNodeIps()
81+
if err != nil {
82+
logrus.Errorf(`error occurred retrieving node ips: %v`, err)
83+
return
84+
}
8685
service := obj.(*v1.Service)
8786
var previousService *v1.Service
88-
e := core.NewEvent(core.Created, service, previousService)
87+
e := core.NewEvent(core.Created, service, previousService, nodeIps)
8988
w.handler.AddRateLimitedEvent(&e)
9089
}
9190
}
9291

9392
func (w *Watcher) buildEventHandlerForDelete() func(interface{}) {
9493
logrus.Info("Watcher::buildEventHandlerForDelete")
9594
return func(obj interface{}) {
95+
nodeIps, err := w.retrieveNodeIps()
96+
if err != nil {
97+
logrus.Errorf(`error occurred retrieving node ips: %v`, err)
98+
return
99+
}
96100
service := obj.(*v1.Service)
97101
var previousService *v1.Service
98-
e := core.NewEvent(core.Deleted, service, previousService)
102+
e := core.NewEvent(core.Deleted, service, previousService, nodeIps)
99103
w.handler.AddRateLimitedEvent(&e)
100104
}
101105
}
102106

103107
func (w *Watcher) buildEventHandlerForUpdate() func(interface{}, interface{}) {
104108
logrus.Info("Watcher::buildEventHandlerForUpdate")
105109
return func(previous, updated interface{}) {
110+
nodeIps, err := w.retrieveNodeIps()
111+
if err != nil {
112+
logrus.Errorf(`error occurred retrieving node ips: %v`, err)
113+
return
114+
}
106115
service := updated.(*v1.Service)
107116
previousService := previous.(*v1.Service)
108-
e := core.NewEvent(core.Updated, service, previousService)
117+
e := core.NewEvent(core.Updated, service, previousService, nodeIps)
109118
w.handler.AddRateLimitedEvent(&e)
110119
}
111120
}
@@ -156,6 +165,7 @@ func (w *Watcher) initializeEventListeners() error {
156165
}
157166

158167
func (w *Watcher) retrieveNodeIps() ([]string, error) {
168+
started := time.Now()
159169
logrus.Debug("Watcher::retrieveNodeIps")
160170

161171
var nodeIps []string
@@ -167,12 +177,23 @@ func (w *Watcher) retrieveNodeIps() ([]string, error) {
167177
}
168178

169179
for _, node := range nodes.Items {
170-
for _, address := range node.Status.Addresses {
171-
if address.Type == v1.NodeInternalIP {
172-
nodeIps = append(nodeIps, address.Address)
180+
if w.notMasterNode(node) {
181+
for _, address := range node.Status.Addresses {
182+
if address.Type == v1.NodeInternalIP {
183+
nodeIps = append(nodeIps, address.Address)
184+
}
173185
}
174186
}
175187
}
176188

189+
logrus.Infof("Watcher::retrieveNodeIps duration: %d", time.Since(started).Nanoseconds())
177190
return nodeIps, nil
178191
}
192+
193+
func (w *Watcher) notMasterNode(node v1.Node) bool {
194+
logrus.Debug("Watcher::notMasterNode")
195+
196+
_, found := node.Labels["node-role.kubernetes.io/master"]
197+
198+
return !found
199+
}

0 commit comments

Comments
 (0)