Skip to content

Commit 93243a8

Browse files
start xray daemon if request is sampled
1 parent ae1c506 commit 93243a8

File tree

9 files changed

+437
-64
lines changed

9 files changed

+437
-64
lines changed

cmd/localstack/awsutil.go

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ package main
88
import (
99
"context"
1010
"fmt"
11-
"github.com/jessevdk/go-flags"
1211
log "github.com/sirupsen/logrus"
1312
"go.amzn.com/lambda/interop"
1413
"go.amzn.com/lambda/rapidcore"
@@ -27,29 +26,12 @@ const (
2726
runtimeBootstrap = "/var/runtime/bootstrap"
2827
)
2928

30-
type options struct {
31-
LogLevel string `long:"log-level" default:"info" description:"log level"`
32-
InitCachingEnabled bool `long:"enable-init-caching" description:"Enable support for Init Caching"`
33-
}
34-
35-
func getCLIArgs() (options, []string) {
36-
var opts options
37-
parser := flags.NewParser(&opts, flags.IgnoreUnknown)
38-
args, err := parser.ParseArgs(os.Args)
39-
40-
if err != nil {
41-
log.WithError(err).Fatal("Failed to parse command line arguments:", os.Args)
42-
}
43-
44-
return opts, args
45-
}
46-
4729
func isBootstrapFileExist(filePath string) bool {
4830
file, err := os.Stat(filePath)
4931
return !os.IsNotExist(err) && !file.IsDir()
5032
}
5133

52-
func getBootstrap(args []string, opts options) (*rapidcore.Bootstrap, string) {
34+
func getBootstrap(args []string) (*rapidcore.Bootstrap, string) {
5335
var bootstrapLookupCmd []string
5436
var handler string
5537
currentWorkingDir := "/var/task" // default value
@@ -148,7 +130,7 @@ func resetListener(changeChannel <-chan bool, server *CustomInteropServer) {
148130

149131
func RunDNSRewriter(opts *LsOpts, ctx context.Context) {
150132
if opts.EnableDnsServer != "1" {
151-
log.Debugln("Dns server disabled")
133+
log.Debugln("DNS server disabled. S")
152134
return
153135
}
154136
dnsForwarder, err := NewDnsForwarder(opts.LocalstackIP)
@@ -160,7 +142,7 @@ func RunDNSRewriter(opts *LsOpts, ctx context.Context) {
160142
dnsForwarder.Start()
161143

162144
<-ctx.Done()
163-
log.Debugln("Shutting down dns server")
145+
log.Debugln("DNS server stopped")
164146
}
165147

166148
func RunHotReloadingListener(server *CustomInteropServer, targetPaths []string, ctx context.Context) {
@@ -234,11 +216,11 @@ func InitHandler(sandbox Sandbox, functionVersion string, timeout int64) (time.T
234216
// pass to rapid
235217
sandbox.Init(&interop.Init{
236218
Handler: GetenvWithDefault("AWS_LAMBDA_FUNCTION_HANDLER", os.Getenv("_HANDLER")),
237-
CorrelationID: "initCorrelationID",
219+
CorrelationID: "initCorrelationID", // TODO
238220
AwsKey: os.Getenv("AWS_ACCESS_KEY_ID"),
239221
AwsSecret: os.Getenv("AWS_SECRET_ACCESS_KEY"),
240222
AwsSession: os.Getenv("AWS_SESSION_TOKEN"),
241-
XRayDaemonAddress: "0.0.0.0:0", // TODO
223+
XRayDaemonAddress: GetenvWithDefault("AWS_XRAY_DAEMON_ADDRESS", "127.0.0.1:2000"),
242224
FunctionName: GetenvWithDefault("AWS_LAMBDA_FUNCTION_NAME", "test_function"),
243225
FunctionVersion: functionVersion,
244226

cmd/localstack/custom_interop.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,14 @@ func (l *LocalStackAdapter) SendStatus(status LocalStackStatus) error {
4444
return nil
4545
}
4646

47+
// The InvokeRequest is sent by LocalStack to trigger an invocation
4748
type InvokeRequest struct {
4849
InvokeId string `json:"invoke-id"`
4950
InvokedFunctionArn string `json:"invoked-function-arn"`
5051
Payload string `json:"payload"`
5152
}
5253

54+
// The ErrorResponse is sent TO LocalStack when encountering an error
5355
type ErrorResponse struct {
5456
ErrorMessage string `json:"errorMessage"`
5557
ErrorType string `json:"errorType,omitempty"`
@@ -95,10 +97,9 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate rapidcore.InteropServer, lo
9597
Payload: strings.NewReader(invokeR.Payload), // r.Body,
9698
NeedDebugLogs: true,
9799
CorrelationID: "invokeCorrelationID",
98-
// TODO: should we use the env _X_AMZN_TRACE_ID here or get the value from the request headers from the direct invoke?
99-
// for now we just set a "real" static value
100-
TraceID: "Root=1-53cfd31b-192638fa13e39d2c2bcea001;Parent=365fb4b15f2e3987;Sampled=0", // r.Header.Get("X-Amzn-Trace-Id"),
101-
//TraceID: GetEnvOrDie("_X_AMZN_TRACE_ID"), // r.Header.Get("X-Amzn-Trace-Id"),
100+
101+
// TODO: unclear how this would behave for non-managed runtimes
102+
TraceID: GetEnvOrDie("_X_AMZN_TRACE_ID"), // r.Header.Get("X-Amzn-Trace-Id"),
102103
// TODO: set correct segment ID from request
103104
//LambdaSegmentID: "LambdaSegmentID", // r.Header.Get("X-Amzn-Segment-Id"),
104105
//CognitoIdentityID: "",

cmd/localstack/main.go

Lines changed: 54 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type LsOpts struct {
2424
HotReloadingPaths []string
2525
EnableDnsServer string
2626
LocalstackIP string
27+
InitLogLevel string
2728
}
2829

2930
func GetEnvOrDie(env string) string {
@@ -36,12 +37,14 @@ func GetEnvOrDie(env string) string {
3637

3738
func InitLsOpts() *LsOpts {
3839
return &LsOpts{
40+
// required
3941
RuntimeEndpoint: GetEnvOrDie("LOCALSTACK_RUNTIME_ENDPOINT"),
4042
RuntimeId: GetEnvOrDie("LOCALSTACK_RUNTIME_ID"),
4143
// optional with default
4244
InteropPort: GetenvWithDefault("LOCALSTACK_INTEROP_PORT", "9563"),
4345
InitTracingPort: GetenvWithDefault("LOCALSTACK_RUNTIME_TRACING_PORT", "9564"),
4446
User: GetenvWithDefault("LOCALSTACK_USER", "sbx_user1051"),
47+
InitLogLevel: GetenvWithDefault("LOCALSTACK_INIT_LOG_LEVEL", "debug"),
4548
// optional or empty
4649
CodeArchives: os.Getenv("LOCALSTACK_CODE_ARCHIVES"),
4750
HotReloadingPaths: strings.Split(GetenvWithDefault("LOCALSTACK_HOT_RELOADING_PATHS", ""), ","),
@@ -78,22 +81,33 @@ func main() {
7881
// we're setting this to the same value as in the official RIE
7982
debug.SetGCPercent(33)
8083

84+
// configuration parsing
8185
lsOpts := InitLsOpts()
8286
UnsetLsEnvs()
8387

84-
// set up logging (logrus)
85-
//log.SetFormatter(&log.JSONFormatter{})
86-
//log.SetLevel(log.TraceLevel)
87-
log.SetLevel(log.DebugLevel)
88+
// set up logging
8889
log.SetReportCaller(true)
90+
switch lsOpts.InitLogLevel {
91+
case "debug":
92+
log.SetLevel(log.DebugLevel)
93+
case "trace":
94+
log.SetFormatter(&log.JSONFormatter{})
95+
log.SetLevel(log.TraceLevel)
96+
default:
97+
log.Fatal("Invalid value for LOCALSTACK_INIT_LOG_LEVEL")
98+
}
99+
100+
// enable dns server
101+
dnsServerContext, stopDnsServer := context.WithCancel(context.Background())
102+
go RunDNSRewriter(lsOpts, dnsServerContext)
89103

90104
// download code archive if env variable is set
91105
if err := DownloadCodeArchives(lsOpts.CodeArchives); err != nil {
92106
log.Fatal("Failed to download code archives")
93107
}
94-
// enable dns server
95-
dnsServerContext, stopDnsServer := context.WithCancel(context.Background())
96-
go RunDNSRewriter(lsOpts, dnsServerContext)
108+
109+
// parse CLI args
110+
bootstrap, handler := getBootstrap(os.Args)
97111

98112
// Switch to non-root user and drop root privileges
99113
if IsRootUser() && lsOpts.User != "" {
@@ -108,23 +122,40 @@ func main() {
108122
UserLogger().Debugln("Process running as non-root user.")
109123
}
110124

111-
// parse CLI args
112-
opts, args := getCLIArgs()
113-
bootstrap, handler := getBootstrap(args, opts)
114125
logCollector := NewLogCollector()
126+
127+
// file watcher for hot-reloading
115128
fileWatcherContext, cancelFileWatcher := context.WithCancel(context.Background())
129+
130+
// build sandbox
116131
sandbox := rapidcore.
117132
NewSandboxBuilder(bootstrap).
133+
//SetTracer(tracer).
118134
AddShutdownFunc(func() {
119-
log.Debugln("Closing contexts")
135+
log.Debugln("Stopping file watcher")
120136
cancelFileWatcher()
137+
log.Debugln("Stopping DNS server")
121138
stopDnsServer()
122139
}).
123-
AddShutdownFunc(func() { os.Exit(0) }).
124140
SetExtensionsFlag(true).
125141
SetInitCachingFlag(true).
126142
SetTailLogOutput(logCollector)
127143

144+
// xray daemon
145+
if shouldRunXrayDaemon() {
146+
xrayConfig := initConfig(
147+
"http://"+GetEnvOrDie("LOCALSTACK_HOSTNAME")+":4566",
148+
GetEnvOrDie("LOCALSTACK_LAMBDA_FUNCTION_ARN"),
149+
)
150+
d := initDaemon(xrayConfig)
151+
sandbox.AddShutdownFunc(func() {
152+
d.stop()
153+
})
154+
runDaemon(d) // async
155+
156+
defer d.close() // synchronous wait for all receivers to be finished
157+
}
158+
128159
defaultInterop := sandbox.InteropServer()
129160
interopServer := NewCustomInteropServer(lsOpts, defaultInterop, logCollector)
130161
sandbox.SetInteropServer(interopServer)
@@ -136,7 +167,7 @@ func main() {
136167
go sandbox.Create()
137168

138169
// get timeout
139-
invokeTimeoutEnv := GetEnvOrDie("AWS_LAMBDA_FUNCTION_TIMEOUT")
170+
invokeTimeoutEnv := GetEnvOrDie("AWS_LAMBDA_FUNCTION_TIMEOUT") // TODO: collect all AWS_* env parsing
140171
invokeTimeoutSeconds, err := strconv.Atoi(invokeTimeoutEnv)
141172
if err != nil {
142173
log.Fatalln(err)
@@ -153,3 +184,13 @@ func main() {
153184
log.Fatal("Failed to start debug server")
154185
}
155186
}
187+
188+
func shouldRunXrayDaemon() bool {
189+
xrayAddress := os.Getenv("AWS_XRAY_DAEMON_ADDRESS")
190+
traceHeaer := os.Getenv("_X_AMZN_TRACE_ID")
191+
if xrayAddress != "" && traceHeaer != "" {
192+
// no point in running the daemon if we don't actually sample the request
193+
return strings.Contains(traceHeaer, "Sampled=1")
194+
}
195+
return false
196+
}

0 commit comments

Comments
 (0)