diff --git a/cmd/localstack/awsutil.go b/cmd/localstack/awsutil.go index 3fd0c54..f9b58b1 100644 --- a/cmd/localstack/awsutil.go +++ b/cmd/localstack/awsutil.go @@ -7,12 +7,14 @@ package main import ( "archive/zip" + "context" "fmt" "github.com/jessevdk/go-flags" log "github.com/sirupsen/logrus" "go.amzn.com/lambda/interop" "go.amzn.com/lambda/rapidcore" "io" + "io/fs" "math" "net/http" "os" @@ -196,6 +198,69 @@ func DownloadCodeArchive(url string) { } +func resetListener(changeChannel <-chan bool, server *CustomInteropServer) { + for { + _, more := <-changeChannel + if !more { + return + } + log.Println("Resetting environment...") + _, err := server.Reset("HotReload", 2000) + if err != nil { + log.Warnln("Error resetting server: ", err) + } + } + +} + +func RunHotReloadingListener(server *CustomInteropServer, targetPaths []string, opts *LsOpts, ctx context.Context) { + if !opts.HotReloading { + log.Debugln("Hot reloading disabled.") + return + } + defaultDebouncingDuration := 500 * time.Millisecond + log.Infoln("Hot reloading enabled, starting filewatcher.", targetPaths) + changeListener, err := NewChangeListener(defaultDebouncingDuration) + if err != nil { + log.Errorln("Hot reloading disabled due to change listener error.", err) + return + } + defer changeListener.Close() + go changeListener.Start() + changeListener.AddTargetPaths(targetPaths) + go resetListener(changeListener.debouncedChannel, server) + + <-ctx.Done() + log.Infoln("Closing down filewatcher.") + +} + +func getSubFolders(dirPath string) []string { + var subfolders []string + err := filepath.WalkDir(dirPath, func(path string, d fs.DirEntry, err error) error { + if err == nil && d.IsDir() { + subfolders = append(subfolders, path) + } + return err + }) + if err != nil { + log.Errorln("Error listing directory contents: ", err) + return subfolders + } + return subfolders +} + +func getSubFoldersInList(prefix string, pathList []string) (oldFolders []string, newFolders []string) { + for _, pathItem := range pathList { + if strings.HasPrefix(pathItem, prefix) { + oldFolders = append(oldFolders, pathItem) + } else { + newFolders = append(newFolders, pathItem) + } + } + return +} + func InitHandler(sandbox Sandbox, functionVersion string, timeout int64) (time.Time, time.Time) { additionalFunctionEnvironmentVariables := map[string]string{} diff --git a/cmd/localstack/filenotify/filenotify.go b/cmd/localstack/filenotify/filenotify.go new file mode 100644 index 0000000..1c1d708 --- /dev/null +++ b/cmd/localstack/filenotify/filenotify.go @@ -0,0 +1,69 @@ +// This package is adapted from https://github.com/gohugoio/hugo/tree/master/watcher/filenotify, Apache-2.0 License. + +// Package filenotify provides a mechanism for watching file(s) for changes. +// Generally leans on fsnotify, but provides a poll-based notifier which fsnotify does not support. +// These are wrapped up in a common interface so that either can be used interchangeably in your code. +// +// This package is adapted from https://github.com/moby/moby/tree/master/pkg/filenotify, Apache-2.0 License. +// Hopefully this can be replaced with an external package sometime in the future, see https://github.com/fsnotify/fsnotify/issues/9 +package filenotify + +import ( + log "github.com/sirupsen/logrus" + "golang.org/x/sys/unix" + "strings" + "time" + + "github.com/fsnotify/fsnotify" +) + +// FileWatcher is an interface for implementing file notification watchers +type FileWatcher interface { + Events() <-chan fsnotify.Event + Errors() <-chan error + Add(name string) error + Remove(name string) error + Close() error +} + +func shouldUseEventWatcher() bool { + // Whether to use an event watcher or polling mechanism + var utsname unix.Utsname + err := unix.Uname(&utsname) + release := strings.TrimRight(string(utsname.Release[:]), "\x00") + log.Println("Release detected: ", release) + // cheap check if we are in Docker desktop or not. + // We could also inspect the mounts, but that would be more complicated and needs more parsing + return err == nil && !(strings.Contains(release, "linuxkit") || strings.Contains(release, "WSL2")) +} + +// New tries to use a fs-event watcher, and falls back to the poller if there is an error +func New(interval time.Duration) (FileWatcher, error) { + if shouldUseEventWatcher() { + if watcher, err := NewEventWatcher(); err == nil { + log.Debugln("Using event based filewatcher") + return watcher, nil + } + } + log.Debugln("Using polling based filewatcher") + return NewPollingWatcher(interval), nil +} + +// NewPollingWatcher returns a poll-based file watcher +func NewPollingWatcher(interval time.Duration) FileWatcher { + return &filePoller{ + interval: interval, + done: make(chan struct{}), + events: make(chan fsnotify.Event), + errors: make(chan error), + } +} + +// NewEventWatcher returns a fs-event based file watcher +func NewEventWatcher() (FileWatcher, error) { + watcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, err + } + return &fsNotifyWatcher{watcher}, nil +} diff --git a/cmd/localstack/filenotify/fsnotify.go b/cmd/localstack/filenotify/fsnotify.go new file mode 100644 index 0000000..4b0502e --- /dev/null +++ b/cmd/localstack/filenotify/fsnotify.go @@ -0,0 +1,22 @@ +// This package is adapted from https://github.com/gohugoio/hugo/tree/master/watcher/filenotify, Apache-2.0 License. + +// Package filenotify is adapted from https://github.com/moby/moby/tree/master/pkg/filenotify, Apache-2.0 License. +// Hopefully this can be replaced with an external package sometime in the future, see https://github.com/fsnotify/fsnotify/issues/9 +package filenotify + +import "github.com/fsnotify/fsnotify" + +// fsNotifyWatcher wraps the fsnotify package to satisfy the FileNotifier interface +type fsNotifyWatcher struct { + *fsnotify.Watcher +} + +// Events returns the fsnotify event channel receiver +func (w *fsNotifyWatcher) Events() <-chan fsnotify.Event { + return w.Watcher.Events +} + +// Errors returns the fsnotify error channel receiver +func (w *fsNotifyWatcher) Errors() <-chan error { + return w.Watcher.Errors +} diff --git a/cmd/localstack/filenotify/poller.go b/cmd/localstack/filenotify/poller.go new file mode 100644 index 0000000..c1a0f9c --- /dev/null +++ b/cmd/localstack/filenotify/poller.go @@ -0,0 +1,328 @@ +// This package is adapted from https://github.com/gohugoio/hugo/tree/master/watcher/filenotify, Apache-2.0 License. + +// Package filenotify is adapted from https://github.com/moby/moby/tree/master/pkg/filenotify, Apache-2.0 License. +// Hopefully this can be replaced with an external package sometime in the future, see https://github.com/fsnotify/fsnotify/issues/9 +package filenotify + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "sync" + "time" + + "github.com/fsnotify/fsnotify" +) + +var ( + // errPollerClosed is returned when the poller is closed + errPollerClosed = errors.New("poller is closed") + // errNoSuchWatch is returned when trying to remove a watch that doesn't exist + errNoSuchWatch = errors.New("watch does not exist") +) + +// filePoller is used to poll files for changes, especially in cases where fsnotify +// can't be run (e.g. when inotify handles are exhausted) +// filePoller satisfies the FileWatcher interface +type filePoller struct { + // the duration between polls. + interval time.Duration + // watches is the list of files currently being polled, close the associated channel to stop the watch + watches map[string]struct{} + // Will be closed when done. + done chan struct{} + // events is the channel to listen to for watch events + events chan fsnotify.Event + // errors is the channel to listen to for watch errors + errors chan error + // mu locks the poller for modification + mu sync.Mutex + // closed is used to specify when the poller has already closed + closed bool +} + +// Add adds a filename to the list of watches +// once added the file is polled for changes in a separate goroutine +func (w *filePoller) Add(name string) error { + w.mu.Lock() + defer w.mu.Unlock() + + if w.closed { + return errPollerClosed + } + + item, err := newItemToWatch(name) + if err != nil { + return err + } + if item.left.FileInfo == nil { + return os.ErrNotExist + } + + if w.watches == nil { + w.watches = make(map[string]struct{}) + } + if _, exists := w.watches[name]; exists { + return fmt.Errorf("watch exists") + } + w.watches[name] = struct{}{} + + go w.watch(item) + return nil +} + +// Remove stops and removes watch with the specified name +func (w *filePoller) Remove(name string) error { + w.mu.Lock() + defer w.mu.Unlock() + return w.remove(name) +} + +func (w *filePoller) remove(name string) error { + if w.closed { + return errPollerClosed + } + + _, exists := w.watches[name] + if !exists { + return errNoSuchWatch + } + delete(w.watches, name) + return nil +} + +// Events returns the event channel +// This is used for notifications on events about watched files +func (w *filePoller) Events() <-chan fsnotify.Event { + return w.events +} + +// Errors returns the errors channel +// This is used for notifications about errors on watched files +func (w *filePoller) Errors() <-chan error { + return w.errors +} + +// Close closes the poller +// All watches are stopped, removed, and the poller cannot be added to +func (w *filePoller) Close() error { + w.mu.Lock() + defer w.mu.Unlock() + + if w.closed { + return nil + } + w.closed = true + close(w.done) + for name := range w.watches { + w.remove(name) + } + + return nil +} + +// sendEvent publishes the specified event to the events channel +func (w *filePoller) sendEvent(e fsnotify.Event) error { + select { + case w.events <- e: + case <-w.done: + return fmt.Errorf("closed") + } + return nil +} + +// sendErr publishes the specified error to the errors channel +func (w *filePoller) sendErr(e error) error { + select { + case w.errors <- e: + case <-w.done: + return fmt.Errorf("closed") + } + return nil +} + +// watch watches item for changes until done is closed. +func (w *filePoller) watch(item *itemToWatch) { + ticker := time.NewTicker(w.interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + case <-w.done: + return + } + + evs, err := item.checkForChanges() + if err != nil { + if err := w.sendErr(err); err != nil { + return + } + } + + item.left, item.right = item.right, item.left + + for _, ev := range evs { + if err := w.sendEvent(ev); err != nil { + return + } + } + + } +} + +// recording records the state of a file or a dir. +type recording struct { + os.FileInfo + + // Set if FileInfo is a dir. + entries map[string]os.FileInfo +} + +func (r *recording) clear() { + r.FileInfo = nil + if r.entries != nil { + for k := range r.entries { + delete(r.entries, k) + } + } +} + +func (r *recording) record(filename string) error { + r.clear() + + fi, err := os.Stat(filename) + if err != nil && !os.IsNotExist(err) { + return err + } + + if fi == nil { + return nil + } + + r.FileInfo = fi + + // If fi is a dir, we watch the files inside that directory (not recursively). + // This matches the behaviour of fsnotity. + if fi.IsDir() { + f, err := os.Open(filename) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + defer f.Close() + + fis, err := f.Readdir(-1) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + + for _, fi := range fis { + r.entries[fi.Name()] = fi + } + } + + return nil +} + +// itemToWatch may be a file or a dir. +type itemToWatch struct { + // Full path to the filename. + filename string + + // Snapshots of the stat state of this file or dir. + left *recording + right *recording +} + +func newItemToWatch(filename string) (*itemToWatch, error) { + r := &recording{ + entries: make(map[string]os.FileInfo), + } + err := r.record(filename) + if err != nil { + return nil, err + } + + return &itemToWatch{filename: filename, left: r}, nil + +} + +func (item *itemToWatch) checkForChanges() ([]fsnotify.Event, error) { + if item.right == nil { + item.right = &recording{ + entries: make(map[string]os.FileInfo), + } + } + + err := item.right.record(item.filename) + if err != nil && !os.IsNotExist(err) { + return nil, err + } + + dirOp := checkChange(item.left.FileInfo, item.right.FileInfo) + + if dirOp != 0 { + evs := []fsnotify.Event{fsnotify.Event{Op: dirOp, Name: item.filename}} + return evs, nil + } + + if item.left.FileInfo == nil || !item.left.IsDir() { + // Done. + return nil, nil + } + + leftIsIn := false + left, right := item.left.entries, item.right.entries + if len(right) > len(left) { + left, right = right, left + leftIsIn = true + } + + var evs []fsnotify.Event + + for name, fi1 := range left { + fi2 := right[name] + fil, fir := fi1, fi2 + if leftIsIn { + fil, fir = fir, fil + } + op := checkChange(fil, fir) + if op != 0 { + evs = append(evs, fsnotify.Event{Op: op, Name: filepath.Join(item.filename, name)}) + } + + } + + return evs, nil + +} + +func checkChange(fi1, fi2 os.FileInfo) fsnotify.Op { + if fi1 == nil && fi2 != nil { + return fsnotify.Create + } + if fi1 != nil && fi2 == nil { + return fsnotify.Remove + } + if fi1 == nil && fi2 == nil { + return 0 + } + if fi1.IsDir() || fi2.IsDir() { + return 0 + } + if fi1.Mode() != fi2.Mode() { + return fsnotify.Chmod + } + if fi1.ModTime() != fi2.ModTime() || fi1.Size() != fi2.Size() { + return fsnotify.Write + } + + return 0 +} diff --git a/cmd/localstack/hotreloading.go b/cmd/localstack/hotreloading.go new file mode 100644 index 0000000..9218465 --- /dev/null +++ b/cmd/localstack/hotreloading.go @@ -0,0 +1,126 @@ +package main + +import ( + "github.com/fsnotify/fsnotify" + log "github.com/sirupsen/logrus" + "go.amzn.com/cmd/localstack/filenotify" + "os" + "time" +) + +type ChangeListener struct { + watcher filenotify.FileWatcher + changeChannel chan string + debouncedChannel chan bool + debouncingInterval time.Duration + watchedFolders []string +} + +func NewChangeListener(debouncingInterval time.Duration) (*ChangeListener, error) { + watcher, err := filenotify.New(200 * time.Millisecond) + if err != nil { + log.Errorln("Cannot create change listener due to filewatcher error.", err) + return nil, err + } + changeListener := &ChangeListener{ + changeChannel: make(chan string, 10), + debouncedChannel: make(chan bool, 10), + debouncingInterval: debouncingInterval, + watcher: watcher, + } + return changeListener, nil +} + +func (c *ChangeListener) Start() { + c.debounceChannel() + c.Watch() +} + +func (c *ChangeListener) Watch() { + for { + select { + case event, ok := <-c.watcher.Events(): + if !ok { + close(c.changeChannel) + return + } + log.Debugln("FileWatcher got event: ", event) + if event.Has(fsnotify.Create) { + stat, err := os.Stat(event.Name) + if err != nil { + log.Errorln("Error stating event file: ", event.Name, err) + } else if stat.IsDir() { + subfolders := getSubFolders(event.Name) + for _, folder := range subfolders { + err = c.watcher.Add(folder) + c.watchedFolders = append(c.watchedFolders, folder) + if err != nil { + log.Errorln("Error watching folder: ", folder, err) + } + } + } + // remove in case of remove / rename (rename within the folder will trigger a separate create event) + } else if event.Has(fsnotify.Remove) || event.Has(fsnotify.Rename) { + // remove all file watchers if it is in our folders list + toBeRemovedDirs, newWatchedFolders := getSubFoldersInList(event.Name, c.watchedFolders) + c.watchedFolders = newWatchedFolders + for _, dir := range toBeRemovedDirs { + err := c.watcher.Remove(dir) + if err != nil { + log.Warnln("Error removing path: ", event.Name, err) + } + } + } + c.changeChannel <- event.Name + case err, ok := <-c.watcher.Errors(): + if !ok { + log.Println("error:", err) + return + } + log.Println("error:", err) + } + } +} + +func (c *ChangeListener) AddTargetPaths(targetPaths []string) { + // Add all target paths and subfolders + for _, targetPath := range targetPaths { + subfolders := getSubFolders(targetPath) + log.Infoln("Subfolders: ", subfolders) + for _, target := range subfolders { + err := c.watcher.Add(target) + if err != nil { + log.Fatal(err) + } + } + } +} + +func (c *ChangeListener) debounceChannel() { + // debouncer to limit restarts + timer := time.NewTimer(c.debouncingInterval) + // immediately stop the timer, since we do not want to reload right at the startup + if !timer.Stop() { + // we have to drain the channel in case the timer already fired + <-timer.C + } + go func() { + for { + select { + case _, more := <-c.changeChannel: + if !more { + timer.Stop() + close(c.debouncedChannel) + return + } + timer.Reset(c.debouncingInterval) + case <-timer.C: + c.debouncedChannel <- true + } + } + }() +} + +func (c *ChangeListener) Close() error { + return c.watcher.Close() +} diff --git a/cmd/localstack/main.go b/cmd/localstack/main.go index 9da81ad..afed0d0 100644 --- a/cmd/localstack/main.go +++ b/cmd/localstack/main.go @@ -3,6 +3,7 @@ package main import ( + "context" log "github.com/sirupsen/logrus" "go.amzn.com/lambda/rapidcore" "net/http" @@ -18,6 +19,7 @@ type LsOpts struct { RuntimeId string InitTracingPort string CodeDownloadUrl string + HotReloading bool } func GetEnvOrDie(env string) string { @@ -37,6 +39,7 @@ func InitLsOpts() *LsOpts { InitTracingPort: GetenvWithDefault("LOCALSTACK_RUNTIME_TRACING_PORT", "9564"), // optional or empty CodeDownloadUrl: os.Getenv("LOCALSTACK_CODE_ARCHIVE_DOWNLOAD_URL"), + HotReloading: os.Getenv("LOCALSTACK_HOT_RELOADING_ENABLED") != "", } } @@ -57,15 +60,21 @@ func main() { opts, args := getCLIArgs() bootstrap, handler := getBootstrap(args, opts) logCollector := NewLogCollector() + fileWatcherContext, cancelFileWatcher := context.WithCancel(context.Background()) sandbox := rapidcore. NewSandboxBuilder(bootstrap). + AddShutdownFunc(func() { + log.Debugln("Closing file watcher") + cancelFileWatcher() + }). AddShutdownFunc(func() { os.Exit(0) }). SetExtensionsFlag(true). SetInitCachingFlag(true). SetTailLogOutput(logCollector) defaultInterop := sandbox.InteropServer() - sandbox.SetInteropServer(NewCustomInteropServer(lsOpts, defaultInterop, logCollector)) + interopServer := NewCustomInteropServer(lsOpts, defaultInterop, logCollector) + sandbox.SetInteropServer(interopServer) if len(handler) > 0 { sandbox.SetHandler(handler) } @@ -79,6 +88,8 @@ func main() { if err != nil { log.Fatalln(err) } + go RunHotReloadingListener(interopServer, []string{"/var/task"}, lsOpts, fileWatcherContext) + // start runtime init go InitHandler(sandbox, GetEnvOrDie("AWS_LAMBDA_FUNCTION_VERSION"), int64(invokeTimeoutSeconds)) // TODO: replace this with a custom init @@ -88,5 +99,4 @@ func main() { if err != nil { log.Fatal("Failed to start debug server") } - } diff --git a/go.mod b/go.mod index 871b812..7e17c86 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.18 require ( github.com/aws/aws-lambda-go v1.20.0 + github.com/fsnotify/fsnotify v1.6.0 github.com/go-chi/chi v4.1.2+incompatible github.com/go-chi/render v1.0.1 github.com/google/uuid v1.1.2 @@ -11,6 +12,7 @@ require ( github.com/sirupsen/logrus v1.6.0 github.com/stretchr/testify v1.6.1 golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 + golang.org/x/sys v0.1.0 ) require ( @@ -19,6 +21,5 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/objx v0.1.0 // indirect golang.org/x/net v0.1.0 // indirect - golang.org/x/sys v0.1.0 // indirect gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 // indirect ) diff --git a/go.sum b/go.sum index daa8fe3..576d027 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsr github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= +github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/go-chi/chi v4.1.2+incompatible h1:fGFk2Gmi/YKXk0OmGfBh0WgmN3XB8lVnEyNz34tQRec= github.com/go-chi/chi v4.1.2+incompatible/go.mod h1:eB3wogJHnLi3x/kFX2A+IbTBlXxmMeXJVKy9tTv1XzQ= github.com/go-chi/render v1.0.1 h1:4/5tis2cKaNdnv9zFLfXzcquC9HbeZgCnxGnKrltBS8= @@ -33,6 +35,7 @@ golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg=