From b0cbe4806b3a5e28c5d002c173b2c18d58bf36a2 Mon Sep 17 00:00:00 2001 From: Daniel Fangl Date: Mon, 5 Dec 2022 16:39:03 +0100 Subject: [PATCH 1/6] add hot reloading entrypoints --- cmd/localstack/awsutil.go | 7 +++++++ cmd/localstack/main.go | 7 ++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/cmd/localstack/awsutil.go b/cmd/localstack/awsutil.go index 3fd0c54..7a0c59d 100644 --- a/cmd/localstack/awsutil.go +++ b/cmd/localstack/awsutil.go @@ -196,6 +196,13 @@ func DownloadCodeArchive(url string) { } +func RunFileWatcher(server *CustomInteropServer, targetPaths []string, opts *LsOpts) { + if !opts.HotReloading { + return + } + +} + func InitHandler(sandbox Sandbox, functionVersion string, timeout int64) (time.Time, time.Time) { additionalFunctionEnvironmentVariables := map[string]string{} diff --git a/cmd/localstack/main.go b/cmd/localstack/main.go index 9da81ad..28fb05b 100644 --- a/cmd/localstack/main.go +++ b/cmd/localstack/main.go @@ -18,6 +18,7 @@ type LsOpts struct { RuntimeId string InitTracingPort string CodeDownloadUrl string + HotReloading bool } func GetEnvOrDie(env string) string { @@ -37,6 +38,7 @@ func InitLsOpts() *LsOpts { InitTracingPort: GetenvWithDefault("LOCALSTACK_RUNTIME_TRACING_PORT", "9564"), // optional or empty CodeDownloadUrl: os.Getenv("LOCALSTACK_CODE_ARCHIVE_DOWNLOAD_URL"), + HotReloading: os.Getenv("LOCALSTACK_HOT_RELOADING_ENABLED") != "", } } @@ -65,7 +67,8 @@ func main() { SetTailLogOutput(logCollector) defaultInterop := sandbox.InteropServer() - sandbox.SetInteropServer(NewCustomInteropServer(lsOpts, defaultInterop, logCollector)) + interopServer := NewCustomInteropServer(lsOpts, defaultInterop, logCollector) + sandbox.SetInteropServer(interopServer) if len(handler) > 0 { sandbox.SetHandler(handler) } @@ -82,6 +85,8 @@ func main() { // start runtime init go InitHandler(sandbox, GetEnvOrDie("AWS_LAMBDA_FUNCTION_VERSION"), int64(invokeTimeoutSeconds)) // TODO: replace this with a custom init + RunFileWatcher(interopServer, []string{"/var/task"}, lsOpts) + // TODO: make the tracing server optional // start blocking with the tracing server err = http.ListenAndServe("0.0.0.0:"+lsOpts.InitTracingPort, http.DefaultServeMux) From 805d1b3ef72368c17570b6af6302a347a0be1199 Mon Sep 17 00:00:00 2001 From: Daniel Fangl Date: Tue, 6 Dec 2022 19:55:53 +0100 Subject: [PATCH 2/6] add filewatcher implementation --- cmd/localstack/awsutil.go | 122 +++++++++++++++++++++++++++++++++++++- cmd/localstack/main.go | 10 +++- go.mod | 1 + go.sum | 3 + 4 files changed, 132 insertions(+), 4 deletions(-) diff --git a/cmd/localstack/awsutil.go b/cmd/localstack/awsutil.go index 7a0c59d..43eeaf0 100644 --- a/cmd/localstack/awsutil.go +++ b/cmd/localstack/awsutil.go @@ -13,6 +13,7 @@ import ( "go.amzn.com/lambda/interop" "go.amzn.com/lambda/rapidcore" "io" + "io/fs" "math" "net/http" "os" @@ -20,6 +21,8 @@ import ( "path/filepath" "strings" "time" + + "github.com/fsnotify/fsnotify" ) const ( @@ -196,11 +199,128 @@ func DownloadCodeArchive(url string) { } -func RunFileWatcher(server *CustomInteropServer, targetPaths []string, opts *LsOpts) { +func RunFileWatcher(server *CustomInteropServer, targetPaths []string, opts *LsOpts, done <-chan bool) { if !opts.HotReloading { return } + defaultDuration := 500 * time.Millisecond + log.Infoln("Hot reloading enabled, starting filewatcher.", targetPaths) + watcher, err := fsnotify.NewWatcher() + if err != nil { + log.Errorln("Hot reloading disabled due to filewatcher error.") + log.Errorln(err) + return + } + defer watcher.Close() + + changeChannel := make(chan string, 10) + defer close(changeChannel) + // Start listening for events. + go func(channel chan<- string) { + var watchedFolders []string + for { + select { + case event, ok := <-watcher.Events: + if !ok { + return + } + log.Debugln("Filewatcher event: ", event) + if event.Has(fsnotify.Create) { + stat, err := os.Stat(event.Name) + if err != nil { + log.Errorln("Error stating event file: ", event.Name, err) + } else if stat.IsDir() { + subfolders := getSubFolders(event.Name) + for _, folder := range subfolders { + err = watcher.Add(folder) + watchedFolders = append(watchedFolders, folder) + if err != nil { + log.Errorln("Error watching folder: ", folder, err) + } + } + } + // remove in case of remove / rename (rename within the folder will trigger a separate create event) + } else if event.Has(fsnotify.Remove) || event.Has(fsnotify.Rename) { + // remove all file watchers if it is in our folders list + toBeRemovedDirs, newWatchedFolders := getSubFoldersInList(event.Name, watchedFolders) + watchedFolders = newWatchedFolders + for _, dir := range toBeRemovedDirs { + err = watcher.Remove(dir) + if err != nil { + log.Warnln("Error removing path: ", event.Name, err) + } + } + } + channel <- event.Name + case err, ok := <-watcher.Errors: + if !ok { + log.Println("error:", err) + return + } + log.Println("error:", err) + } + } + }(changeChannel) + + // debouncer to limit restarts + go func(channel <-chan string, duration time.Duration) { + timer := time.NewTimer(duration) + for { + select { + case _, more := <-channel: + if !more { + timer.Stop() + return + } + timer.Reset(duration) + case <-timer.C: + log.Println("Resetting environment...") + server.Reset("HotReload", 2000) + } + } + + }(changeChannel, defaultDuration) + + // Add all target paths and subfolders + for _, targetPath := range targetPaths { + subfolders := getSubFolders(targetPath) + log.Infoln("Subfolders: ", subfolders) + for _, target := range subfolders { + err = watcher.Add(target) + if err != nil { + log.Fatal(err) + } + } + } + <-done + log.Infoln("Closing down filewatcher.") + +} + +func getSubFolders(dirPath string) []string { + var subfolders []string + err := filepath.WalkDir(dirPath, func(path string, d fs.DirEntry, err error) error { + if err == nil && d.IsDir() { + subfolders = append(subfolders, path) + } + return err + }) + if err != nil { + log.Errorln("Error listing directory contents: ", err) + return subfolders + } + return subfolders +} +func getSubFoldersInList(prefix string, pathList []string) (old_folders []string, new_folders []string) { + for _, item := range pathList { + if strings.HasPrefix(item, prefix) { + old_folders = append(old_folders, item) + } else { + new_folders = append(new_folders, item) + } + } + return } func InitHandler(sandbox Sandbox, functionVersion string, timeout int64) (time.Time, time.Time) { diff --git a/cmd/localstack/main.go b/cmd/localstack/main.go index 28fb05b..11ee4fe 100644 --- a/cmd/localstack/main.go +++ b/cmd/localstack/main.go @@ -59,8 +59,13 @@ func main() { opts, args := getCLIArgs() bootstrap, handler := getBootstrap(args, opts) logCollector := NewLogCollector() + closeFileWatcher := make(chan bool) sandbox := rapidcore. NewSandboxBuilder(bootstrap). + AddShutdownFunc(func() { + log.Debugln("Closing file watcher") + closeFileWatcher <- true + }). AddShutdownFunc(func() { os.Exit(0) }). SetExtensionsFlag(true). SetInitCachingFlag(true). @@ -82,16 +87,15 @@ func main() { if err != nil { log.Fatalln(err) } + go RunFileWatcher(interopServer, []string{"/var/task"}, lsOpts, closeFileWatcher) + // start runtime init go InitHandler(sandbox, GetEnvOrDie("AWS_LAMBDA_FUNCTION_VERSION"), int64(invokeTimeoutSeconds)) // TODO: replace this with a custom init - RunFileWatcher(interopServer, []string{"/var/task"}, lsOpts) - // TODO: make the tracing server optional // start blocking with the tracing server err = http.ListenAndServe("0.0.0.0:"+lsOpts.InitTracingPort, http.DefaultServeMux) if err != nil { log.Fatal("Failed to start debug server") } - } diff --git a/go.mod b/go.mod index 871b812..ed53279 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.18 require ( github.com/aws/aws-lambda-go v1.20.0 + github.com/fsnotify/fsnotify v1.6.0 github.com/go-chi/chi v4.1.2+incompatible github.com/go-chi/render v1.0.1 github.com/google/uuid v1.1.2 diff --git a/go.sum b/go.sum index daa8fe3..576d027 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsr github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= +github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/go-chi/chi v4.1.2+incompatible h1:fGFk2Gmi/YKXk0OmGfBh0WgmN3XB8lVnEyNz34tQRec= github.com/go-chi/chi v4.1.2+incompatible/go.mod h1:eB3wogJHnLi3x/kFX2A+IbTBlXxmMeXJVKy9tTv1XzQ= github.com/go-chi/render v1.0.1 h1:4/5tis2cKaNdnv9zFLfXzcquC9HbeZgCnxGnKrltBS8= @@ -33,6 +35,7 @@ golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg= From 82b1b620807f852b9a58f2cabab153988f4a7c39 Mon Sep 17 00:00:00 2001 From: Daniel Fangl Date: Wed, 7 Dec 2022 19:43:35 +0100 Subject: [PATCH 3/6] add poller based approach from hugo, to be tested --- cmd/localstack/awsutil.go | 14 +- cmd/localstack/filenotify/filenotify.go | 64 +++++ cmd/localstack/filenotify/fsnotify.go | 22 ++ cmd/localstack/filenotify/poller.go | 328 ++++++++++++++++++++++++ 4 files changed, 424 insertions(+), 4 deletions(-) create mode 100644 cmd/localstack/filenotify/filenotify.go create mode 100644 cmd/localstack/filenotify/fsnotify.go create mode 100644 cmd/localstack/filenotify/poller.go diff --git a/cmd/localstack/awsutil.go b/cmd/localstack/awsutil.go index 43eeaf0..a214066 100644 --- a/cmd/localstack/awsutil.go +++ b/cmd/localstack/awsutil.go @@ -10,6 +10,7 @@ import ( "fmt" "github.com/jessevdk/go-flags" log "github.com/sirupsen/logrus" + "go.amzn.com/cmd/localstack/filenotify" "go.amzn.com/lambda/interop" "go.amzn.com/lambda/rapidcore" "io" @@ -205,7 +206,7 @@ func RunFileWatcher(server *CustomInteropServer, targetPaths []string, opts *LsO } defaultDuration := 500 * time.Millisecond log.Infoln("Hot reloading enabled, starting filewatcher.", targetPaths) - watcher, err := fsnotify.NewWatcher() + watcher, err := filenotify.New(200 * time.Millisecond) if err != nil { log.Errorln("Hot reloading disabled due to filewatcher error.") log.Errorln(err) @@ -220,11 +221,11 @@ func RunFileWatcher(server *CustomInteropServer, targetPaths []string, opts *LsO var watchedFolders []string for { select { - case event, ok := <-watcher.Events: + case event, ok := <-watcher.Events(): if !ok { return } - log.Debugln("Filewatcher event: ", event) + log.Debugln("FileWatcher got event: ", event) if event.Has(fsnotify.Create) { stat, err := os.Stat(event.Name) if err != nil { @@ -252,7 +253,7 @@ func RunFileWatcher(server *CustomInteropServer, targetPaths []string, opts *LsO } } channel <- event.Name - case err, ok := <-watcher.Errors: + case err, ok := <-watcher.Errors(): if !ok { log.Println("error:", err) return @@ -265,6 +266,11 @@ func RunFileWatcher(server *CustomInteropServer, targetPaths []string, opts *LsO // debouncer to limit restarts go func(channel <-chan string, duration time.Duration) { timer := time.NewTimer(duration) + // immediately stop the timer, since we do not want to reload right at the startup + if !timer.Stop() { + // we have to drain the channel in case the timer already fired + <-timer.C + } for { select { case _, more := <-channel: diff --git a/cmd/localstack/filenotify/filenotify.go b/cmd/localstack/filenotify/filenotify.go new file mode 100644 index 0000000..0628fce --- /dev/null +++ b/cmd/localstack/filenotify/filenotify.go @@ -0,0 +1,64 @@ +// This package is adapted from https://github.com/gohugoio/hugo/tree/master/watcher/filenotify, Apache-2.0 License. + +// Package filenotify provides a mechanism for watching file(s) for changes. +// Generally leans on fsnotify, but provides a poll-based notifier which fsnotify does not support. +// These are wrapped up in a common interface so that either can be used interchangeably in your code. +// +// This package is adapted from https://github.com/moby/moby/tree/master/pkg/filenotify, Apache-2.0 License. +// Hopefully this can be replaced with an external package sometime in the future, see https://github.com/fsnotify/fsnotify/issues/9 +package filenotify + +import ( + log "github.com/sirupsen/logrus" + "golang.org/x/sys/unix" + "strings" + "time" + + "github.com/fsnotify/fsnotify" +) + +// FileWatcher is an interface for implementing file notification watchers +type FileWatcher interface { + Events() <-chan fsnotify.Event + Errors() <-chan error + Add(name string) error + Remove(name string) error + Close() error +} + +// New tries to use a fs-event watcher, and falls back to the poller if there is an error +func New(interval time.Duration) (FileWatcher, error) { + // cheap check if we are in Docker desktop or not. + // We could also inspect the mounts, but that would be more complicated and needs more parsing + var utsname unix.Utsname + err := unix.Uname(&utsname) + release := strings.TrimRight(string(utsname.Release[:]), "\x00") + log.Println("Release detected: ", release) + if err == nil && !(strings.Contains(release, "linuxkit") || strings.Contains(release, "WSL2")) { + if watcher, err := NewEventWatcher(); err == nil { + log.Debugln("Using event based filewatcher") + return watcher, nil + } + } + log.Debugln("Using polling based filewatcher") + return NewPollingWatcher(interval), nil +} + +// NewPollingWatcher returns a poll-based file watcher +func NewPollingWatcher(interval time.Duration) FileWatcher { + return &filePoller{ + interval: interval, + done: make(chan struct{}), + events: make(chan fsnotify.Event), + errors: make(chan error), + } +} + +// NewEventWatcher returns a fs-event based file watcher +func NewEventWatcher() (FileWatcher, error) { + watcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, err + } + return &fsNotifyWatcher{watcher}, nil +} diff --git a/cmd/localstack/filenotify/fsnotify.go b/cmd/localstack/filenotify/fsnotify.go new file mode 100644 index 0000000..4b0502e --- /dev/null +++ b/cmd/localstack/filenotify/fsnotify.go @@ -0,0 +1,22 @@ +// This package is adapted from https://github.com/gohugoio/hugo/tree/master/watcher/filenotify, Apache-2.0 License. + +// Package filenotify is adapted from https://github.com/moby/moby/tree/master/pkg/filenotify, Apache-2.0 License. +// Hopefully this can be replaced with an external package sometime in the future, see https://github.com/fsnotify/fsnotify/issues/9 +package filenotify + +import "github.com/fsnotify/fsnotify" + +// fsNotifyWatcher wraps the fsnotify package to satisfy the FileNotifier interface +type fsNotifyWatcher struct { + *fsnotify.Watcher +} + +// Events returns the fsnotify event channel receiver +func (w *fsNotifyWatcher) Events() <-chan fsnotify.Event { + return w.Watcher.Events +} + +// Errors returns the fsnotify error channel receiver +func (w *fsNotifyWatcher) Errors() <-chan error { + return w.Watcher.Errors +} diff --git a/cmd/localstack/filenotify/poller.go b/cmd/localstack/filenotify/poller.go new file mode 100644 index 0000000..c1a0f9c --- /dev/null +++ b/cmd/localstack/filenotify/poller.go @@ -0,0 +1,328 @@ +// This package is adapted from https://github.com/gohugoio/hugo/tree/master/watcher/filenotify, Apache-2.0 License. + +// Package filenotify is adapted from https://github.com/moby/moby/tree/master/pkg/filenotify, Apache-2.0 License. +// Hopefully this can be replaced with an external package sometime in the future, see https://github.com/fsnotify/fsnotify/issues/9 +package filenotify + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "sync" + "time" + + "github.com/fsnotify/fsnotify" +) + +var ( + // errPollerClosed is returned when the poller is closed + errPollerClosed = errors.New("poller is closed") + // errNoSuchWatch is returned when trying to remove a watch that doesn't exist + errNoSuchWatch = errors.New("watch does not exist") +) + +// filePoller is used to poll files for changes, especially in cases where fsnotify +// can't be run (e.g. when inotify handles are exhausted) +// filePoller satisfies the FileWatcher interface +type filePoller struct { + // the duration between polls. + interval time.Duration + // watches is the list of files currently being polled, close the associated channel to stop the watch + watches map[string]struct{} + // Will be closed when done. + done chan struct{} + // events is the channel to listen to for watch events + events chan fsnotify.Event + // errors is the channel to listen to for watch errors + errors chan error + // mu locks the poller for modification + mu sync.Mutex + // closed is used to specify when the poller has already closed + closed bool +} + +// Add adds a filename to the list of watches +// once added the file is polled for changes in a separate goroutine +func (w *filePoller) Add(name string) error { + w.mu.Lock() + defer w.mu.Unlock() + + if w.closed { + return errPollerClosed + } + + item, err := newItemToWatch(name) + if err != nil { + return err + } + if item.left.FileInfo == nil { + return os.ErrNotExist + } + + if w.watches == nil { + w.watches = make(map[string]struct{}) + } + if _, exists := w.watches[name]; exists { + return fmt.Errorf("watch exists") + } + w.watches[name] = struct{}{} + + go w.watch(item) + return nil +} + +// Remove stops and removes watch with the specified name +func (w *filePoller) Remove(name string) error { + w.mu.Lock() + defer w.mu.Unlock() + return w.remove(name) +} + +func (w *filePoller) remove(name string) error { + if w.closed { + return errPollerClosed + } + + _, exists := w.watches[name] + if !exists { + return errNoSuchWatch + } + delete(w.watches, name) + return nil +} + +// Events returns the event channel +// This is used for notifications on events about watched files +func (w *filePoller) Events() <-chan fsnotify.Event { + return w.events +} + +// Errors returns the errors channel +// This is used for notifications about errors on watched files +func (w *filePoller) Errors() <-chan error { + return w.errors +} + +// Close closes the poller +// All watches are stopped, removed, and the poller cannot be added to +func (w *filePoller) Close() error { + w.mu.Lock() + defer w.mu.Unlock() + + if w.closed { + return nil + } + w.closed = true + close(w.done) + for name := range w.watches { + w.remove(name) + } + + return nil +} + +// sendEvent publishes the specified event to the events channel +func (w *filePoller) sendEvent(e fsnotify.Event) error { + select { + case w.events <- e: + case <-w.done: + return fmt.Errorf("closed") + } + return nil +} + +// sendErr publishes the specified error to the errors channel +func (w *filePoller) sendErr(e error) error { + select { + case w.errors <- e: + case <-w.done: + return fmt.Errorf("closed") + } + return nil +} + +// watch watches item for changes until done is closed. +func (w *filePoller) watch(item *itemToWatch) { + ticker := time.NewTicker(w.interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + case <-w.done: + return + } + + evs, err := item.checkForChanges() + if err != nil { + if err := w.sendErr(err); err != nil { + return + } + } + + item.left, item.right = item.right, item.left + + for _, ev := range evs { + if err := w.sendEvent(ev); err != nil { + return + } + } + + } +} + +// recording records the state of a file or a dir. +type recording struct { + os.FileInfo + + // Set if FileInfo is a dir. + entries map[string]os.FileInfo +} + +func (r *recording) clear() { + r.FileInfo = nil + if r.entries != nil { + for k := range r.entries { + delete(r.entries, k) + } + } +} + +func (r *recording) record(filename string) error { + r.clear() + + fi, err := os.Stat(filename) + if err != nil && !os.IsNotExist(err) { + return err + } + + if fi == nil { + return nil + } + + r.FileInfo = fi + + // If fi is a dir, we watch the files inside that directory (not recursively). + // This matches the behaviour of fsnotity. + if fi.IsDir() { + f, err := os.Open(filename) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + defer f.Close() + + fis, err := f.Readdir(-1) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + + for _, fi := range fis { + r.entries[fi.Name()] = fi + } + } + + return nil +} + +// itemToWatch may be a file or a dir. +type itemToWatch struct { + // Full path to the filename. + filename string + + // Snapshots of the stat state of this file or dir. + left *recording + right *recording +} + +func newItemToWatch(filename string) (*itemToWatch, error) { + r := &recording{ + entries: make(map[string]os.FileInfo), + } + err := r.record(filename) + if err != nil { + return nil, err + } + + return &itemToWatch{filename: filename, left: r}, nil + +} + +func (item *itemToWatch) checkForChanges() ([]fsnotify.Event, error) { + if item.right == nil { + item.right = &recording{ + entries: make(map[string]os.FileInfo), + } + } + + err := item.right.record(item.filename) + if err != nil && !os.IsNotExist(err) { + return nil, err + } + + dirOp := checkChange(item.left.FileInfo, item.right.FileInfo) + + if dirOp != 0 { + evs := []fsnotify.Event{fsnotify.Event{Op: dirOp, Name: item.filename}} + return evs, nil + } + + if item.left.FileInfo == nil || !item.left.IsDir() { + // Done. + return nil, nil + } + + leftIsIn := false + left, right := item.left.entries, item.right.entries + if len(right) > len(left) { + left, right = right, left + leftIsIn = true + } + + var evs []fsnotify.Event + + for name, fi1 := range left { + fi2 := right[name] + fil, fir := fi1, fi2 + if leftIsIn { + fil, fir = fir, fil + } + op := checkChange(fil, fir) + if op != 0 { + evs = append(evs, fsnotify.Event{Op: op, Name: filepath.Join(item.filename, name)}) + } + + } + + return evs, nil + +} + +func checkChange(fi1, fi2 os.FileInfo) fsnotify.Op { + if fi1 == nil && fi2 != nil { + return fsnotify.Create + } + if fi1 != nil && fi2 == nil { + return fsnotify.Remove + } + if fi1 == nil && fi2 == nil { + return 0 + } + if fi1.IsDir() || fi2.IsDir() { + return 0 + } + if fi1.Mode() != fi2.Mode() { + return fsnotify.Chmod + } + if fi1.ModTime() != fi2.ModTime() || fi1.Size() != fi2.Size() { + return fsnotify.Write + } + + return 0 +} From b65a6da5a7d9d485c91adfa71af465e48cfa0e71 Mon Sep 17 00:00:00 2001 From: Daniel Fangl Date: Mon, 12 Dec 2022 20:17:52 +0100 Subject: [PATCH 4/6] changes after PR comments --- cmd/localstack/awsutil.go | 200 +++++++++++++++--------- cmd/localstack/filenotify/filenotify.go | 15 +- cmd/localstack/main.go | 7 +- go.mod | 2 +- 4 files changed, 137 insertions(+), 87 deletions(-) diff --git a/cmd/localstack/awsutil.go b/cmd/localstack/awsutil.go index a214066..51fe61d 100644 --- a/cmd/localstack/awsutil.go +++ b/cmd/localstack/awsutil.go @@ -7,6 +7,7 @@ package main import ( "archive/zip" + "context" "fmt" "github.com/jessevdk/go-flags" log "github.com/sirupsen/logrus" @@ -200,105 +201,148 @@ func DownloadCodeArchive(url string) { } -func RunFileWatcher(server *CustomInteropServer, targetPaths []string, opts *LsOpts, done <-chan bool) { - if !opts.HotReloading { - return - } - defaultDuration := 500 * time.Millisecond - log.Infoln("Hot reloading enabled, starting filewatcher.", targetPaths) +type ChangeListener struct { + watcher filenotify.FileWatcher + changeChannel chan string + watchedFolders []string +} + +func createChangeListener() (*ChangeListener, error) { watcher, err := filenotify.New(200 * time.Millisecond) if err != nil { - log.Errorln("Hot reloading disabled due to filewatcher error.") - log.Errorln(err) - return + log.Errorln("Cannot create change listener due to filewatcher error.", err) + return nil, err } - defer watcher.Close() + return &ChangeListener{ + changeChannel: make(chan string, 10), + watcher: watcher, + }, nil +} - changeChannel := make(chan string, 10) - defer close(changeChannel) - // Start listening for events. - go func(channel chan<- string) { - var watchedFolders []string - for { - select { - case event, ok := <-watcher.Events(): - if !ok { - return - } - log.Debugln("FileWatcher got event: ", event) - if event.Has(fsnotify.Create) { - stat, err := os.Stat(event.Name) - if err != nil { - log.Errorln("Error stating event file: ", event.Name, err) - } else if stat.IsDir() { - subfolders := getSubFolders(event.Name) - for _, folder := range subfolders { - err = watcher.Add(folder) - watchedFolders = append(watchedFolders, folder) - if err != nil { - log.Errorln("Error watching folder: ", folder, err) - } - } - } - // remove in case of remove / rename (rename within the folder will trigger a separate create event) - } else if event.Has(fsnotify.Remove) || event.Has(fsnotify.Rename) { - // remove all file watchers if it is in our folders list - toBeRemovedDirs, newWatchedFolders := getSubFoldersInList(event.Name, watchedFolders) - watchedFolders = newWatchedFolders - for _, dir := range toBeRemovedDirs { - err = watcher.Remove(dir) +func (c *ChangeListener) watch() { + for { + select { + case event, ok := <-c.watcher.Events(): + if !ok { + return + } + log.Debugln("FileWatcher got event: ", event) + if event.Has(fsnotify.Create) { + stat, err := os.Stat(event.Name) + if err != nil { + log.Errorln("Error stating event file: ", event.Name, err) + } else if stat.IsDir() { + subfolders := getSubFolders(event.Name) + for _, folder := range subfolders { + err = c.watcher.Add(folder) + c.watchedFolders = append(c.watchedFolders, folder) if err != nil { - log.Warnln("Error removing path: ", event.Name, err) + log.Errorln("Error watching folder: ", folder, err) } } } - channel <- event.Name - case err, ok := <-watcher.Errors(): - if !ok { - log.Println("error:", err) - return + // remove in case of remove / rename (rename within the folder will trigger a separate create event) + } else if event.Has(fsnotify.Remove) || event.Has(fsnotify.Rename) { + // remove all file watchers if it is in our folders list + toBeRemovedDirs, newWatchedFolders := getSubFoldersInList(event.Name, c.watchedFolders) + c.watchedFolders = newWatchedFolders + for _, dir := range toBeRemovedDirs { + err := c.watcher.Remove(dir) + if err != nil { + log.Warnln("Error removing path: ", event.Name, err) + } } + } + c.changeChannel <- event.Name + case err, ok := <-c.watcher.Errors(): + if !ok { log.Println("error:", err) + return } + log.Println("error:", err) } - }(changeChannel) + } +} - // debouncer to limit restarts - go func(channel <-chan string, duration time.Duration) { - timer := time.NewTimer(duration) - // immediately stop the timer, since we do not want to reload right at the startup - if !timer.Stop() { - // we have to drain the channel in case the timer already fired - <-timer.C +func (c *ChangeListener) addTargetPaths(targetPaths []string) { + // Add all target paths and subfolders + for _, targetPath := range targetPaths { + subfolders := getSubFolders(targetPath) + log.Infoln("Subfolders: ", subfolders) + for _, target := range subfolders { + err := c.watcher.Add(target) + if err != nil { + log.Fatal(err) + } } + } +} + +func (c *ChangeListener) close() error { + return c.watcher.Close() +} + +func resetListener(changeChannel <-chan bool, server *CustomInteropServer) { + for { + _, more := <-changeChannel + if !more { + return + } + log.Println("Resetting environment...") + _, err := server.Reset("HotReload", 2000) + if err != nil { + log.Warnln("Error resetting server: ", err) + } + } + +} + +func debounceChannel(changeChannel <-chan string, duration time.Duration) <-chan bool { + resultChannel := make(chan bool, 10) + // debouncer to limit restarts + timer := time.NewTimer(duration) + // immediately stop the timer, since we do not want to reload right at the startup + if !timer.Stop() { + // we have to drain the channel in case the timer already fired + <-timer.C + } + go func() { for { select { - case _, more := <-channel: + case _, more := <-changeChannel: if !more { timer.Stop() + close(resultChannel) return } timer.Reset(duration) case <-timer.C: - log.Println("Resetting environment...") - server.Reset("HotReload", 2000) + resultChannel <- true } } + }() + return resultChannel +} - }(changeChannel, defaultDuration) - - // Add all target paths and subfolders - for _, targetPath := range targetPaths { - subfolders := getSubFolders(targetPath) - log.Infoln("Subfolders: ", subfolders) - for _, target := range subfolders { - err = watcher.Add(target) - if err != nil { - log.Fatal(err) - } - } +func RunHotReloadingListener(server *CustomInteropServer, targetPaths []string, opts *LsOpts, ctx context.Context) { + if !opts.HotReloading { + log.Debugln("Hot reloading disabled.") + return + } + defaultDuration := 500 * time.Millisecond + log.Infoln("Hot reloading enabled, starting filewatcher.", targetPaths) + changeListener, err := createChangeListener() + if err != nil { + log.Errorln("Hot reloading disabled due to change listener error.", err) + return } - <-done + defer changeListener.close() + go changeListener.watch() + changeListener.addTargetPaths(targetPaths) + debouncedChannel := debounceChannel(changeListener.changeChannel, defaultDuration) + go resetListener(debouncedChannel, server) + + <-ctx.Done() log.Infoln("Closing down filewatcher.") } @@ -318,12 +362,12 @@ func getSubFolders(dirPath string) []string { return subfolders } -func getSubFoldersInList(prefix string, pathList []string) (old_folders []string, new_folders []string) { - for _, item := range pathList { - if strings.HasPrefix(item, prefix) { - old_folders = append(old_folders, item) +func getSubFoldersInList(prefix string, pathList []string) (oldFolders []string, newFolders []string) { + for _, pathItem := range pathList { + if strings.HasPrefix(pathItem, prefix) { + oldFolders = append(oldFolders, pathItem) } else { - new_folders = append(new_folders, item) + newFolders = append(newFolders, pathItem) } } return diff --git a/cmd/localstack/filenotify/filenotify.go b/cmd/localstack/filenotify/filenotify.go index 0628fce..8bd16cd 100644 --- a/cmd/localstack/filenotify/filenotify.go +++ b/cmd/localstack/filenotify/filenotify.go @@ -26,15 +26,20 @@ type FileWatcher interface { Close() error } -// New tries to use a fs-event watcher, and falls back to the poller if there is an error -func New(interval time.Duration) (FileWatcher, error) { - // cheap check if we are in Docker desktop or not. - // We could also inspect the mounts, but that would be more complicated and needs more parsing +func useEventWatcher() bool { + // Whether to use an event watcher or polling mechanism var utsname unix.Utsname err := unix.Uname(&utsname) release := strings.TrimRight(string(utsname.Release[:]), "\x00") log.Println("Release detected: ", release) - if err == nil && !(strings.Contains(release, "linuxkit") || strings.Contains(release, "WSL2")) { + return err == nil && !(strings.Contains(release, "linuxkit") || strings.Contains(release, "WSL2")) +} + +// New tries to use a fs-event watcher, and falls back to the poller if there is an error +func New(interval time.Duration) (FileWatcher, error) { + // cheap check if we are in Docker desktop or not. + // We could also inspect the mounts, but that would be more complicated and needs more parsing + if useEventWatcher() { if watcher, err := NewEventWatcher(); err == nil { log.Debugln("Using event based filewatcher") return watcher, nil diff --git a/cmd/localstack/main.go b/cmd/localstack/main.go index 11ee4fe..afed0d0 100644 --- a/cmd/localstack/main.go +++ b/cmd/localstack/main.go @@ -3,6 +3,7 @@ package main import ( + "context" log "github.com/sirupsen/logrus" "go.amzn.com/lambda/rapidcore" "net/http" @@ -59,12 +60,12 @@ func main() { opts, args := getCLIArgs() bootstrap, handler := getBootstrap(args, opts) logCollector := NewLogCollector() - closeFileWatcher := make(chan bool) + fileWatcherContext, cancelFileWatcher := context.WithCancel(context.Background()) sandbox := rapidcore. NewSandboxBuilder(bootstrap). AddShutdownFunc(func() { log.Debugln("Closing file watcher") - closeFileWatcher <- true + cancelFileWatcher() }). AddShutdownFunc(func() { os.Exit(0) }). SetExtensionsFlag(true). @@ -87,7 +88,7 @@ func main() { if err != nil { log.Fatalln(err) } - go RunFileWatcher(interopServer, []string{"/var/task"}, lsOpts, closeFileWatcher) + go RunHotReloadingListener(interopServer, []string{"/var/task"}, lsOpts, fileWatcherContext) // start runtime init go InitHandler(sandbox, GetEnvOrDie("AWS_LAMBDA_FUNCTION_VERSION"), int64(invokeTimeoutSeconds)) // TODO: replace this with a custom init diff --git a/go.mod b/go.mod index ed53279..7e17c86 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/sirupsen/logrus v1.6.0 github.com/stretchr/testify v1.6.1 golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 + golang.org/x/sys v0.1.0 ) require ( @@ -20,6 +21,5 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/objx v0.1.0 // indirect golang.org/x/net v0.1.0 // indirect - golang.org/x/sys v0.1.0 // indirect gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 // indirect ) From e766f4201d9e45dca7ad1aa08557f39eae8227a1 Mon Sep 17 00:00:00 2001 From: Daniel Fangl Date: Tue, 13 Dec 2022 15:28:10 +0100 Subject: [PATCH 5/6] some more refactoring --- cmd/localstack/awsutil.go | 124 ++------------------------------ cmd/localstack/hotreloading.go | 125 +++++++++++++++++++++++++++++++++ 2 files changed, 131 insertions(+), 118 deletions(-) create mode 100644 cmd/localstack/hotreloading.go diff --git a/cmd/localstack/awsutil.go b/cmd/localstack/awsutil.go index 51fe61d..f9b58b1 100644 --- a/cmd/localstack/awsutil.go +++ b/cmd/localstack/awsutil.go @@ -11,7 +11,6 @@ import ( "fmt" "github.com/jessevdk/go-flags" log "github.com/sirupsen/logrus" - "go.amzn.com/cmd/localstack/filenotify" "go.amzn.com/lambda/interop" "go.amzn.com/lambda/rapidcore" "io" @@ -23,8 +22,6 @@ import ( "path/filepath" "strings" "time" - - "github.com/fsnotify/fsnotify" ) const ( @@ -201,87 +198,6 @@ func DownloadCodeArchive(url string) { } -type ChangeListener struct { - watcher filenotify.FileWatcher - changeChannel chan string - watchedFolders []string -} - -func createChangeListener() (*ChangeListener, error) { - watcher, err := filenotify.New(200 * time.Millisecond) - if err != nil { - log.Errorln("Cannot create change listener due to filewatcher error.", err) - return nil, err - } - return &ChangeListener{ - changeChannel: make(chan string, 10), - watcher: watcher, - }, nil -} - -func (c *ChangeListener) watch() { - for { - select { - case event, ok := <-c.watcher.Events(): - if !ok { - return - } - log.Debugln("FileWatcher got event: ", event) - if event.Has(fsnotify.Create) { - stat, err := os.Stat(event.Name) - if err != nil { - log.Errorln("Error stating event file: ", event.Name, err) - } else if stat.IsDir() { - subfolders := getSubFolders(event.Name) - for _, folder := range subfolders { - err = c.watcher.Add(folder) - c.watchedFolders = append(c.watchedFolders, folder) - if err != nil { - log.Errorln("Error watching folder: ", folder, err) - } - } - } - // remove in case of remove / rename (rename within the folder will trigger a separate create event) - } else if event.Has(fsnotify.Remove) || event.Has(fsnotify.Rename) { - // remove all file watchers if it is in our folders list - toBeRemovedDirs, newWatchedFolders := getSubFoldersInList(event.Name, c.watchedFolders) - c.watchedFolders = newWatchedFolders - for _, dir := range toBeRemovedDirs { - err := c.watcher.Remove(dir) - if err != nil { - log.Warnln("Error removing path: ", event.Name, err) - } - } - } - c.changeChannel <- event.Name - case err, ok := <-c.watcher.Errors(): - if !ok { - log.Println("error:", err) - return - } - log.Println("error:", err) - } - } -} - -func (c *ChangeListener) addTargetPaths(targetPaths []string) { - // Add all target paths and subfolders - for _, targetPath := range targetPaths { - subfolders := getSubFolders(targetPath) - log.Infoln("Subfolders: ", subfolders) - for _, target := range subfolders { - err := c.watcher.Add(target) - if err != nil { - log.Fatal(err) - } - } - } -} - -func (c *ChangeListener) close() error { - return c.watcher.Close() -} - func resetListener(changeChannel <-chan bool, server *CustomInteropServer) { for { _, more := <-changeChannel @@ -297,50 +213,22 @@ func resetListener(changeChannel <-chan bool, server *CustomInteropServer) { } -func debounceChannel(changeChannel <-chan string, duration time.Duration) <-chan bool { - resultChannel := make(chan bool, 10) - // debouncer to limit restarts - timer := time.NewTimer(duration) - // immediately stop the timer, since we do not want to reload right at the startup - if !timer.Stop() { - // we have to drain the channel in case the timer already fired - <-timer.C - } - go func() { - for { - select { - case _, more := <-changeChannel: - if !more { - timer.Stop() - close(resultChannel) - return - } - timer.Reset(duration) - case <-timer.C: - resultChannel <- true - } - } - }() - return resultChannel -} - func RunHotReloadingListener(server *CustomInteropServer, targetPaths []string, opts *LsOpts, ctx context.Context) { if !opts.HotReloading { log.Debugln("Hot reloading disabled.") return } - defaultDuration := 500 * time.Millisecond + defaultDebouncingDuration := 500 * time.Millisecond log.Infoln("Hot reloading enabled, starting filewatcher.", targetPaths) - changeListener, err := createChangeListener() + changeListener, err := NewChangeListener(defaultDebouncingDuration) if err != nil { log.Errorln("Hot reloading disabled due to change listener error.", err) return } - defer changeListener.close() - go changeListener.watch() - changeListener.addTargetPaths(targetPaths) - debouncedChannel := debounceChannel(changeListener.changeChannel, defaultDuration) - go resetListener(debouncedChannel, server) + defer changeListener.Close() + go changeListener.Start() + changeListener.AddTargetPaths(targetPaths) + go resetListener(changeListener.debouncedChannel, server) <-ctx.Done() log.Infoln("Closing down filewatcher.") diff --git a/cmd/localstack/hotreloading.go b/cmd/localstack/hotreloading.go new file mode 100644 index 0000000..26e979a --- /dev/null +++ b/cmd/localstack/hotreloading.go @@ -0,0 +1,125 @@ +package main + +import ( + "github.com/fsnotify/fsnotify" + log "github.com/sirupsen/logrus" + "go.amzn.com/cmd/localstack/filenotify" + "os" + "time" +) + +type ChangeListener struct { + watcher filenotify.FileWatcher + changeChannel chan string + debouncedChannel chan bool + debouncingInterval time.Duration + watchedFolders []string +} + +func NewChangeListener(debouncingInterval time.Duration) (*ChangeListener, error) { + watcher, err := filenotify.New(200 * time.Millisecond) + if err != nil { + log.Errorln("Cannot create change listener due to filewatcher error.", err) + return nil, err + } + changeListener := &ChangeListener{ + changeChannel: make(chan string, 10), + debouncedChannel: make(chan bool, 10), + debouncingInterval: debouncingInterval, + watcher: watcher, + } + return changeListener, nil +} + +func (c *ChangeListener) Start() { + c.debounceChannel() + c.Watch() +} + +func (c *ChangeListener) Watch() { + for { + select { + case event, ok := <-c.watcher.Events(): + if !ok { + return + } + log.Debugln("FileWatcher got event: ", event) + if event.Has(fsnotify.Create) { + stat, err := os.Stat(event.Name) + if err != nil { + log.Errorln("Error stating event file: ", event.Name, err) + } else if stat.IsDir() { + subfolders := getSubFolders(event.Name) + for _, folder := range subfolders { + err = c.watcher.Add(folder) + c.watchedFolders = append(c.watchedFolders, folder) + if err != nil { + log.Errorln("Error watching folder: ", folder, err) + } + } + } + // remove in case of remove / rename (rename within the folder will trigger a separate create event) + } else if event.Has(fsnotify.Remove) || event.Has(fsnotify.Rename) { + // remove all file watchers if it is in our folders list + toBeRemovedDirs, newWatchedFolders := getSubFoldersInList(event.Name, c.watchedFolders) + c.watchedFolders = newWatchedFolders + for _, dir := range toBeRemovedDirs { + err := c.watcher.Remove(dir) + if err != nil { + log.Warnln("Error removing path: ", event.Name, err) + } + } + } + c.changeChannel <- event.Name + case err, ok := <-c.watcher.Errors(): + if !ok { + log.Println("error:", err) + return + } + log.Println("error:", err) + } + } +} + +func (c *ChangeListener) AddTargetPaths(targetPaths []string) { + // Add all target paths and subfolders + for _, targetPath := range targetPaths { + subfolders := getSubFolders(targetPath) + log.Infoln("Subfolders: ", subfolders) + for _, target := range subfolders { + err := c.watcher.Add(target) + if err != nil { + log.Fatal(err) + } + } + } +} + +func (c *ChangeListener) debounceChannel() { + // debouncer to limit restarts + timer := time.NewTimer(c.debouncingInterval) + // immediately stop the timer, since we do not want to reload right at the startup + if !timer.Stop() { + // we have to drain the channel in case the timer already fired + <-timer.C + } + go func() { + for { + select { + case _, more := <-c.changeChannel: + if !more { + timer.Stop() + close(c.debouncedChannel) + return + } + timer.Reset(c.debouncingInterval) + case <-timer.C: + c.debouncedChannel <- true + } + } + }() +} + +func (c *ChangeListener) Close() error { + return c.watcher.Close() +} From 20ac329415083ed81d2101717dbf90abc643ce24 Mon Sep 17 00:00:00 2001 From: Daniel Fangl Date: Tue, 13 Dec 2022 17:15:13 +0100 Subject: [PATCH 6/6] add proper close, move comments --- cmd/localstack/filenotify/filenotify.go | 8 ++++---- cmd/localstack/hotreloading.go | 1 + 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/cmd/localstack/filenotify/filenotify.go b/cmd/localstack/filenotify/filenotify.go index 8bd16cd..1c1d708 100644 --- a/cmd/localstack/filenotify/filenotify.go +++ b/cmd/localstack/filenotify/filenotify.go @@ -26,20 +26,20 @@ type FileWatcher interface { Close() error } -func useEventWatcher() bool { +func shouldUseEventWatcher() bool { // Whether to use an event watcher or polling mechanism var utsname unix.Utsname err := unix.Uname(&utsname) release := strings.TrimRight(string(utsname.Release[:]), "\x00") log.Println("Release detected: ", release) + // cheap check if we are in Docker desktop or not. + // We could also inspect the mounts, but that would be more complicated and needs more parsing return err == nil && !(strings.Contains(release, "linuxkit") || strings.Contains(release, "WSL2")) } // New tries to use a fs-event watcher, and falls back to the poller if there is an error func New(interval time.Duration) (FileWatcher, error) { - // cheap check if we are in Docker desktop or not. - // We could also inspect the mounts, but that would be more complicated and needs more parsing - if useEventWatcher() { + if shouldUseEventWatcher() { if watcher, err := NewEventWatcher(); err == nil { log.Debugln("Using event based filewatcher") return watcher, nil diff --git a/cmd/localstack/hotreloading.go b/cmd/localstack/hotreloading.go index 26e979a..9218465 100644 --- a/cmd/localstack/hotreloading.go +++ b/cmd/localstack/hotreloading.go @@ -41,6 +41,7 @@ func (c *ChangeListener) Watch() { select { case event, ok := <-c.watcher.Events(): if !ok { + close(c.changeChannel) return } log.Debugln("FileWatcher got event: ", event)