@@ -46,14 +46,21 @@ type Pooler interface {
46
46
Close () error
47
47
}
48
48
49
- type dialer func () (net.Conn , error )
49
+ type Options struct {
50
+ Dialer func () (net.Conn , error )
51
+ OnClose func (* Conn ) error
52
+
53
+ PoolSize int
54
+ PoolTimeout time.Duration
55
+ IdleTimeout time.Duration
56
+ IdleCheckFrequency time.Duration
57
+ }
50
58
51
59
type ConnPool struct {
52
- dial dialer
53
- OnClose func (* Conn ) error
60
+ opt * Options
54
61
55
- poolTimeout time. Duration
56
- idleTimeout time. Duration
62
+ dialErrorsNum uint32 // atomic
63
+ _lastDialError atomic. Value
57
64
58
65
queue chan struct {}
59
66
@@ -65,24 +72,21 @@ type ConnPool struct {
65
72
66
73
stats Stats
67
74
68
- _closed int32 // atomic
75
+ _closed uint32 // atomic
69
76
}
70
77
71
78
var _ Pooler = (* ConnPool )(nil )
72
79
73
- func NewConnPool (dial dialer , poolSize int , poolTimeout , idleTimeout , idleCheckFrequency time. Duration ) * ConnPool {
80
+ func NewConnPool (opt * Options ) * ConnPool {
74
81
p := & ConnPool {
75
- dial : dial ,
76
-
77
- poolTimeout : poolTimeout ,
78
- idleTimeout : idleTimeout ,
82
+ opt : opt ,
79
83
80
- queue : make (chan struct {}, poolSize ),
81
- conns : make ([]* Conn , 0 , poolSize ),
82
- freeConns : make ([]* Conn , 0 , poolSize ),
84
+ queue : make (chan struct {}, opt . PoolSize ),
85
+ conns : make ([]* Conn , 0 , opt . PoolSize ),
86
+ freeConns : make ([]* Conn , 0 , opt . PoolSize ),
83
87
}
84
- if idleTimeout > 0 && idleCheckFrequency > 0 {
85
- go p .reaper (idleCheckFrequency )
88
+ if opt . IdleTimeout > 0 && opt . IdleCheckFrequency > 0 {
89
+ go p .reaper (opt . IdleCheckFrequency )
86
90
}
87
91
return p
88
92
}
@@ -92,8 +96,16 @@ func (p *ConnPool) NewConn() (*Conn, error) {
92
96
return nil , ErrClosed
93
97
}
94
98
95
- netConn , err := p .dial ()
99
+ if atomic .LoadUint32 (& p .dialErrorsNum ) >= uint32 (p .opt .PoolSize ) {
100
+ return nil , p .lastDialError ()
101
+ }
102
+
103
+ netConn , err := p .opt .Dialer ()
96
104
if err != nil {
105
+ p .setLastDialError (err )
106
+ if atomic .AddUint32 (& p .dialErrorsNum , 1 ) == uint32 (p .opt .PoolSize ) {
107
+ go p .tryDial ()
108
+ }
97
109
return nil , err
98
110
}
99
111
@@ -105,12 +117,35 @@ func (p *ConnPool) NewConn() (*Conn, error) {
105
117
return cn , nil
106
118
}
107
119
120
+ func (p * ConnPool ) tryDial () {
121
+ for {
122
+ conn , err := p .opt .Dialer ()
123
+ if err != nil {
124
+ p .setLastDialError (err )
125
+ time .Sleep (time .Second )
126
+ continue
127
+ }
128
+
129
+ atomic .StoreUint32 (& p .dialErrorsNum , 0 )
130
+ _ = conn .Close ()
131
+ return
132
+ }
133
+ }
134
+
135
+ func (p * ConnPool ) setLastDialError (err error ) {
136
+ p ._lastDialError .Store (err )
137
+ }
138
+
139
+ func (p * ConnPool ) lastDialError () error {
140
+ return p ._lastDialError .Load ().(error )
141
+ }
142
+
108
143
func (p * ConnPool ) PopFree () * Conn {
109
144
select {
110
145
case p .queue <- struct {}{}:
111
146
default :
112
147
timer := timers .Get ().(* time.Timer )
113
- timer .Reset (p .poolTimeout )
148
+ timer .Reset (p .opt . PoolTimeout )
114
149
115
150
select {
116
151
case p .queue <- struct {}{}:
@@ -158,7 +193,7 @@ func (p *ConnPool) Get() (*Conn, bool, error) {
158
193
case p .queue <- struct {}{}:
159
194
default :
160
195
timer := timers .Get ().(* time.Timer )
161
- timer .Reset (p .poolTimeout )
196
+ timer .Reset (p .opt . PoolTimeout )
162
197
163
198
select {
164
199
case p .queue <- struct {}{}:
@@ -182,7 +217,7 @@ func (p *ConnPool) Get() (*Conn, bool, error) {
182
217
break
183
218
}
184
219
185
- if cn .IsStale (p .idleTimeout ) {
220
+ if cn .IsStale (p .opt . IdleTimeout ) {
186
221
p .CloseConn (cn )
187
222
continue
188
223
}
@@ -232,8 +267,8 @@ func (p *ConnPool) CloseConn(cn *Conn) error {
232
267
}
233
268
234
269
func (p * ConnPool ) closeConn (cn * Conn ) error {
235
- if p .OnClose != nil {
236
- _ = p .OnClose (cn )
270
+ if p .opt . OnClose != nil {
271
+ _ = p .opt . OnClose (cn )
237
272
}
238
273
return cn .Close ()
239
274
}
@@ -265,11 +300,11 @@ func (p *ConnPool) Stats() *Stats {
265
300
}
266
301
267
302
func (p * ConnPool ) closed () bool {
268
- return atomic .LoadInt32 (& p ._closed ) == 1
303
+ return atomic .LoadUint32 (& p ._closed ) == 1
269
304
}
270
305
271
306
func (p * ConnPool ) Close () error {
272
- if ! atomic .CompareAndSwapInt32 (& p ._closed , 0 , 1 ) {
307
+ if ! atomic .CompareAndSwapUint32 (& p ._closed , 0 , 1 ) {
273
308
return ErrClosed
274
309
}
275
310
@@ -299,7 +334,7 @@ func (p *ConnPool) reapStaleConn() bool {
299
334
}
300
335
301
336
cn := p .freeConns [0 ]
302
- if ! cn .IsStale (p .idleTimeout ) {
337
+ if ! cn .IsStale (p .opt . IdleTimeout ) {
303
338
return false
304
339
}
305
340
0 commit comments