Skip to content

Commit c1c8fcd

Browse files
crobert-1mx-psi
andauthored
[service] Fix memory leaks and enable goleak check in tests (#9241)
**Description:** <Describe what has changed.> <!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> This change adds `goleak` to check for memory leaks. Originally there were 3 failing tests in the `service` package, so I'll describe changes in relation to resolving each test's failing goleak check. 1. `TestServiceTelemetryRestart`: Simplest fix, close the response body to make sure goroutines aren't leaked by reopening a server on the same port. This was just a test issue 2. `TestTelemetryInit.UseOTelWithSDKConfiguration`: The [meter provider](https://github.com/open-telemetry/opentelemetry-collector/blob/fb3ed1b0d65b91e49209a7e60d40ef4b607c6b10/service/telemetry.go#L57-L58) was being started in the initialization process ([metrics reference](https://github.com/open-telemetry/opentelemetry-collector/blob/fb3ed1b0d65b91e49209a7e60d40ef4b607c6b10/service/internal/proctelemetry/config.go#L135)), but never shutdown. The type originally being used (`meter.MetricProvider`) was the base interface which didn't provide a `Shutdown` method. I changed this to use the `sdk` interfaces that provide the required `Shutdown` method. The actual functionality of starting the providers was already using and returning the `sdk` interface, so the actual underlying type remains the same. Since `mp` is a private member and `sdkmetric` and implement the original type, I don't believe changing the type is a breaking change. 3. `TestServiceTelemetryCleanupOnError`: This test starts a server using a sub-goroutine, cancels it, starts again in a subroutine, and cancels again in the main goroutine. This test showed the racing behavior of the subroutine running [`server.ListenAndServe`](https://github.com/open-telemetry/opentelemetry-collector/blob/fb3ed1b0d65b91e49209a7e60d40ef4b607c6b10/service/internal/proctelemetry/config.go#L148) and the main goroutine's functionality of [calling close](https://github.com/open-telemetry/opentelemetry-collector/blob/fb3ed1b0d65b91e49209a7e60d40ef4b607c6b10/service/telemetry.go#L219) and then starting the server again [right away](https://github.com/open-telemetry/opentelemetry-collector/blob/fb3ed1b0d65b91e49209a7e60d40ef4b607c6b10/service/service_test.go#L244). The solution here is to add a `sync.WaitGroup` variable that can properly block until all servers are closed before returning from `shutdown`. This will allow us to ensure it's safe to proceed knowing the ports are free, and server is fully closed. The first test fix was just a test issue, but 2 and 3 were real bugs. I realize it's a bit hard to read with them all together, but I assumed adding PR dependency notes would be more complicated. **Link to tracking Issue:** <Issue number if applicable> #9165 **Testing:** <Describe what testing was performed and which tests were added.> All tests are passing as well as goleak check. --------- Co-authored-by: Pablo Baeyens <[email protected]>
1 parent aacddac commit c1c8fcd

File tree

7 files changed

+64
-26
lines changed

7 files changed

+64
-26
lines changed

.chloggen/goleak_service.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: service
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Fix memory leaks during service package shutdown
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [9165]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: []

service/generated_package_test.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

service/internal/proctelemetry/config.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"net/url"
1414
"os"
1515
"strings"
16+
"sync"
1617
"time"
1718

1819
"github.com/prometheus/client_golang/prometheus"
@@ -63,9 +64,9 @@ var (
6364
errNoValidMetricExporter = errors.New("no valid metric exporter")
6465
)
6566

66-
func InitMetricReader(ctx context.Context, reader config.MetricReader, asyncErrorChannel chan error) (sdkmetric.Reader, *http.Server, error) {
67+
func InitMetricReader(ctx context.Context, reader config.MetricReader, asyncErrorChannel chan error, serverWG *sync.WaitGroup) (sdkmetric.Reader, *http.Server, error) {
6768
if reader.Pull != nil {
68-
return initPullExporter(reader.Pull.Exporter, asyncErrorChannel)
69+
return initPullExporter(reader.Pull.Exporter, asyncErrorChannel, serverWG)
6970
}
7071
if reader.Periodic != nil {
7172
var opts []sdkmetric.PeriodicReaderOption
@@ -93,17 +94,23 @@ func InitOpenTelemetry(res *resource.Resource, options []sdkmetric.Option, disab
9394
), nil
9495
}
9596

96-
func InitPrometheusServer(registry *prometheus.Registry, address string, asyncErrorChannel chan error) *http.Server {
97+
func InitPrometheusServer(registry *prometheus.Registry, address string, asyncErrorChannel chan error, serverWG *sync.WaitGroup) *http.Server {
9798
mux := http.NewServeMux()
9899
mux.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{}))
99100
server := &http.Server{
100101
Addr: address,
101102
Handler: mux,
102103
ReadHeaderTimeout: defaultReadHeaderTimeout,
103104
}
105+
106+
serverWG.Add(1)
104107
go func() {
108+
defer serverWG.Done()
105109
if serveErr := server.ListenAndServe(); serveErr != nil && !errors.Is(serveErr, http.ErrServerClosed) {
106-
asyncErrorChannel <- serveErr
110+
select {
111+
case asyncErrorChannel <- serveErr:
112+
case <-time.After(1 * time.Second):
113+
}
107114
}
108115
}()
109116
return server
@@ -152,7 +159,7 @@ func cardinalityFilter(kvs ...attribute.KeyValue) attribute.Filter {
152159
}
153160
}
154161

155-
func initPrometheusExporter(prometheusConfig *config.Prometheus, asyncErrorChannel chan error) (sdkmetric.Reader, *http.Server, error) {
162+
func initPrometheusExporter(prometheusConfig *config.Prometheus, asyncErrorChannel chan error, serverWG *sync.WaitGroup) (sdkmetric.Reader, *http.Server, error) {
156163
promRegistry := prometheus.NewRegistry()
157164
if prometheusConfig.Host == nil {
158165
return nil, nil, fmt.Errorf("host must be specified")
@@ -176,12 +183,12 @@ func initPrometheusExporter(prometheusConfig *config.Prometheus, asyncErrorChann
176183
return nil, nil, fmt.Errorf("error creating otel prometheus exporter: %w", err)
177184
}
178185

179-
return exporter, InitPrometheusServer(promRegistry, net.JoinHostPort(*prometheusConfig.Host, fmt.Sprintf("%d", *prometheusConfig.Port)), asyncErrorChannel), nil
186+
return exporter, InitPrometheusServer(promRegistry, net.JoinHostPort(*prometheusConfig.Host, fmt.Sprintf("%d", *prometheusConfig.Port)), asyncErrorChannel, serverWG), nil
180187
}
181188

182-
func initPullExporter(exporter config.MetricExporter, asyncErrorChannel chan error) (sdkmetric.Reader, *http.Server, error) {
189+
func initPullExporter(exporter config.MetricExporter, asyncErrorChannel chan error, serverWG *sync.WaitGroup) (sdkmetric.Reader, *http.Server, error) {
183190
if exporter.Prometheus != nil {
184-
return initPrometheusExporter(exporter.Prometheus, asyncErrorChannel)
191+
return initPrometheusExporter(exporter.Prometheus, asyncErrorChannel, serverWG)
185192
}
186193
return nil, nil, errNoValidMetricExporter
187194
}

service/internal/proctelemetry/config_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"errors"
99
"net/url"
1010
"reflect"
11+
"sync"
1112
"testing"
1213

1314
"github.com/stretchr/testify/assert"
@@ -529,7 +530,7 @@ func TestMetricReader(t *testing.T) {
529530
}
530531
for _, tt := range testCases {
531532
t.Run(tt.name, func(t *testing.T) {
532-
gotReader, server, err := InitMetricReader(context.Background(), tt.reader, make(chan error))
533+
gotReader, server, err := InitMetricReader(context.Background(), tt.reader, make(chan error), &sync.WaitGroup{})
533534

534535
defer func() {
535536
if gotReader != nil {

service/metadata.yaml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,6 @@ status:
66
development: [traces, metrics, logs]
77
distributions: [core, contrib]
88

9-
tests:
10-
goleak:
11-
ignore:
12-
top:
13-
- "go.opentelemetry.io/collector/service/internal/proctelemetry.InitPrometheusServer.func1"
14-
159
telemetry:
1610
metrics:
1711
process_uptime:

service/service_test.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,10 @@ func TestServiceTelemetryRestart(t *testing.T) {
340340
assert.NoError(t, err)
341341
assert.NoError(t, resp.Body.Close())
342342
assert.Equal(t, http.StatusOK, resp.StatusCode)
343+
// Response body must be closed now instead of defer as the test
344+
// restarts the server on the same port. Leaving response open
345+
// leaks a goroutine.
346+
resp.Body.Close()
343347

344348
// Shutdown the service
345349
require.NoError(t, srvOne.Shutdown(context.Background()))
@@ -362,6 +366,7 @@ func TestServiceTelemetryRestart(t *testing.T) {
362366
100*time.Millisecond,
363367
"Must get a valid response from the service",
364368
)
369+
defer resp.Body.Close()
365370
assert.Equal(t, http.StatusOK, resp.StatusCode)
366371

367372
// Shutdown the new service
@@ -536,13 +541,14 @@ func assertZPages(t *testing.T, zpagesAddr string) {
536541

537542
func newNopSettings() Settings {
538543
return Settings{
539-
BuildInfo: component.NewDefaultBuildInfo(),
540-
CollectorConf: confmap.New(),
541-
Receivers: receivertest.NewNopBuilder(),
542-
Processors: processortest.NewNopBuilder(),
543-
Exporters: exportertest.NewNopBuilder(),
544-
Connectors: connectortest.NewNopBuilder(),
545-
Extensions: extensiontest.NewNopBuilder(),
544+
BuildInfo: component.NewDefaultBuildInfo(),
545+
CollectorConf: confmap.New(),
546+
Receivers: receivertest.NewNopBuilder(),
547+
Processors: processortest.NewNopBuilder(),
548+
Exporters: exportertest.NewNopBuilder(),
549+
Connectors: connectortest.NewNopBuilder(),
550+
Extensions: extensiontest.NewNopBuilder(),
551+
AsyncErrorChannel: make(chan error),
546552
}
547553
}
548554

service/telemetry.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"net"
99
"net/http"
1010
"strconv"
11+
"sync"
1112

1213
"go.opentelemetry.io/contrib/config"
1314
"go.opentelemetry.io/otel/metric"
@@ -29,7 +30,8 @@ const (
2930

3031
type meterProvider struct {
3132
*sdkmetric.MeterProvider
32-
servers []*http.Server
33+
servers []*http.Server
34+
serverWG sync.WaitGroup
3335
}
3436

3537
type meterProviderSettings struct {
@@ -71,7 +73,7 @@ func newMeterProvider(set meterProviderSettings, disableHighCardinality bool) (m
7173
var opts []sdkmetric.Option
7274
for _, reader := range set.cfg.Readers {
7375
// https://github.com/open-telemetry/opentelemetry-collector/issues/8045
74-
r, server, err := proctelemetry.InitMetricReader(context.Background(), reader, set.asyncErrorChannel)
76+
r, server, err := proctelemetry.InitMetricReader(context.Background(), reader, set.asyncErrorChannel, &mp.serverWG)
7577
if err != nil {
7678
return nil, err
7779
}
@@ -110,5 +112,8 @@ func (mp *meterProvider) Shutdown(ctx context.Context) error {
110112
errs = multierr.Append(errs, server.Close())
111113
}
112114
}
113-
return multierr.Append(errs, mp.MeterProvider.Shutdown(ctx))
115+
errs = multierr.Append(errs, mp.MeterProvider.Shutdown(ctx))
116+
mp.serverWG.Wait()
117+
118+
return errs
114119
}

0 commit comments

Comments
 (0)