From c82a928c7a1646fe8fbaee781ab88552ae0c643f Mon Sep 17 00:00:00 2001 From: Alessandro Lioi Date: Fri, 11 Oct 2024 11:36:35 +0200 Subject: [PATCH 01/29] feat(wasi): add `wasmtime-go` as a dependency CGO has to be enabled because `wasmtime-go` is a wrapper for the C library --- Makefile | 6 +++--- go.mod | 3 ++- go.sum | 8 ++++++-- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/Makefile b/Makefile index 6aeb1209..8e480e31 100644 --- a/Makefile +++ b/Makefile @@ -3,13 +3,13 @@ BIN=bin all: serverledge executor serverledge-cli lb serverledge: - CGO_ENABLED=0 GOOS=linux go build -o $(BIN)/$@ cmd/$@/main.go + GOOS=linux go build -o $(BIN)/$@ cmd/$@/main.go lb: - CGO_ENABLED=0 GOOS=linux go build -o $(BIN)/$@ cmd/$@/main.go + GOOS=linux go build -o $(BIN)/$@ cmd/$@/main.go serverledge-cli: - CGO_ENABLED=0 GOOS=linux go build -o $(BIN)/$@ cmd/cli/main.go + GOOS=linux go build -o $(BIN)/$@ cmd/cli/main.go executor: CGO_ENABLED=0 GOOS=linux go build -o $(BIN)/$@ cmd/$@/executor.go diff --git a/go.mod b/go.mod index bdd71e28..cb85de6b 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ toolchain go1.22.5 require ( github.com/LK4D4/trylock v0.0.0-20191027065348-ff7e133a5c54 + github.com/bytecodealliance/wasmtime-go/v25 v25.0.0 github.com/docker/docker v20.10.12+incompatible github.com/hexablock/vivaldi v0.0.0-20180727225019-07adad3f2b5f github.com/labstack/echo/v4 v4.6.1 @@ -17,6 +18,7 @@ require ( go.opentelemetry.io/otel v1.28.0 go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.28.0 go.opentelemetry.io/otel/sdk v1.28.0 + go.opentelemetry.io/otel/trace v1.28.0 golang.org/x/net v0.0.0-20220225172249-27dd8689420f ) @@ -65,7 +67,6 @@ require ( go.etcd.io/etcd/api/v3 v3.5.1 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.1 // indirect go.opentelemetry.io/otel/metric v1.28.0 // indirect - go.opentelemetry.io/otel/trace v1.28.0 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect go.uber.org/zap v1.17.0 // indirect diff --git a/go.sum b/go.sum index a7be0bb6..0cd2f307 100644 --- a/go.sum +++ b/go.sum @@ -100,6 +100,8 @@ github.com/buger/jsonparser v0.0.0-20180808090653-f4dd9f5a6b44/go.mod h1:bbYlZJ7 github.com/bugsnag/bugsnag-go v0.0.0-20141110184014-b1d153021fcd/go.mod h1:2oa8nejYd4cQ/b0hMIopN0lCRxU0bueqREvZLWFrtK8= github.com/bugsnag/osext v0.0.0-20130617224835-0dd3f918b21b/go.mod h1:obH5gd0BsqsP2LwDJ9aOkm/6J86V6lyAXCoQWGw3K50= github.com/bugsnag/panicwrap v0.0.0-20151223152923-e2c28503fcd0/go.mod h1:D/8v3kj0zr8ZAKg1AQ6crr+5VwKN5eIywRkfhyM/+dE= +github.com/bytecodealliance/wasmtime-go/v25 v25.0.0 h1:ZTn4Ho+srrk0466ugqPfTDCITczsWdT48A0ZMA/TpRU= +github.com/bytecodealliance/wasmtime-go/v25 v25.0.0/go.mod h1:8mMIYQ92CpVDwXPIb6udnhtFGI3vDZ/937cGeQr5I68= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= @@ -446,7 +448,6 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxv github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= -github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA= @@ -506,6 +507,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= github.com/ncw/swift v1.0.47/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= @@ -1093,8 +1096,9 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLks gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20141024133853-64131543e789/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b h1:QRR6H1YWRnHb4Y/HeNFCTJLFVxaq6wH4YuVdsUOr75U= +gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= From 62afbf3b8143e249d7bed721e3ec585c13e98169 Mon Sep 17 00:00:00 2001 From: Alessandro Lioi Date: Fri, 11 Oct 2024 11:37:35 +0200 Subject: [PATCH 02/29] feat(wasi): implement WasiFactory for WASI code and untar functionality --- internal/container/wasi.go | 213 +++++++++++++++++++++++++++++++++++++ utils/untar.go | 59 ++++++++++ 2 files changed, 272 insertions(+) create mode 100644 internal/container/wasi.go create mode 100644 utils/untar.go diff --git a/internal/container/wasi.go b/internal/container/wasi.go new file mode 100644 index 00000000..85aa5833 --- /dev/null +++ b/internal/container/wasi.go @@ -0,0 +1,213 @@ +package container + +import ( + "context" + "fmt" + "io" + "log" + "os" + "path/filepath" + "strings" + + "github.com/bytecodealliance/wasmtime-go/v25" + "github.com/grussorusso/serverledge/utils" +) + +type WasiType string + +const WasiModule WasiType = "module" +const WasiComponent WasiType = "component" + +type WasiFactory struct { + ctx context.Context + runners map[string]*wasiRunner +} + +type wasiRunner struct { + wasiType WasiType // WasiModule is executed using wasmtime-go; WasiComponent using Wasmtime CLI + env []string // List of KEY=VALUE + mount string // Directories are preloaded to this mount-point + tar io.Reader // Tar of .wasm and other required files + // WASI Module Specifics + store *wasmtime.Store // Group of WASM instances + linker *wasmtime.Linker // Used to instantiate module + module *wasmtime.Module // Compiled WASM + stdout *os.File // Temporary file for the stdout + stderr *os.File // Temporary file for the stderr + // WASI Component Specifics + cliArgs []string + + // TODO: check if it's possible to set RAM and CPU quota in CLI and wasmtime-go + // - config.SetMaxWasmStack can be used to set the max "RAM" + // - fuel or epoch interruption can probably be used to simulate the CPU quota +} + +func (wr *wasiRunner) Close() { + if wr.module != nil { + wr.module.Close() + } + if wr.linker != nil { + wr.linker.Close() + } + if wr.store != nil { + wr.store.Close() + } + if wr.stdout != nil { + wr.stdout.Close() + } + if wr.stderr != nil { + wr.stderr.Close() + } +} + +func InitWasiFactory() *WasiFactory { + ctx := context.Background() + wasiFactory := &WasiFactory{ctx, make(map[string]*wasiRunner)} + cf = wasiFactory + return wasiFactory +} + +// Image is the ID +// NOTE: this approach requires Runtime to be set to wasi and CustomImage to an identifier (e.g. function name) +func (wf *WasiFactory) Create(image string, opts *ContainerOptions) (ContainerID, error) { + wf.runners[image] = &wasiRunner{env: opts.Env} + return image, nil +} + +// Saves the decoded function code in the Wasi Runner +func (wf *WasiFactory) CopyToContainer(contID ContainerID, content io.Reader, destPath string) error { + wf.runners[contID].tar = content + wf.runners[contID].mount = destPath + return nil +} + +// WASI Module: compiles the module +// Component: creates the CLI command +// NOTE: using contID (set as custom_image from CLI as the wasm filename inside the tar) +func (wf *WasiFactory) Start(contID ContainerID) error { + wasiRunner, ok := wf.runners[contID] + if !ok { + return fmt.Errorf("[WasiFactory]: no runner with %s found", contID) + } + + // Create Store configuration + storeConfig := wasmtime.NewConfig() + storeConfig.SetWasmRelaxedSIMD(true) + storeConfig.SetWasmBulkMemory(true) + storeConfig.SetWasmMultiValue(true) + // NOTE: this can probably be the RAM limit + // storeConfig.SetMaxWasmStack() + + // Create wasmtime engine + engine := wasmtime.NewEngineWithConfig(storeConfig) + // Create new store + wasiRunner.store = wasmtime.NewStore(engine) + // Create WASI Configuration + wasiConfig := wasmtime.NewWasiConfig() + + stdout, err := os.CreateTemp("", fmt.Sprintf("%s-stdout", contID)) + if err != nil { + return fmt.Errorf("[WasiFactory]: failed to create temp stdout file for %s: %v", contID, err) + } + stderr, err := os.CreateTemp("", fmt.Sprintf("%s-stdout", contID)) + if err != nil { + return fmt.Errorf("[WasiFactory]: failed to create temp stderr file for %s: %v", contID, err) + } + wasiConfig.SetStdoutFile(stdout.Name()) + wasiConfig.SetStderrFile(stderr.Name()) + wasiRunner.stdout = stdout + wasiRunner.stderr = stderr + + untarDest, err := os.MkdirTemp("", contID) + if err != nil { + return fmt.Errorf("[WasiFactory] Failed to create temporary directory for %s: %v", contID, err) + } + + wasiConfig.PreopenDir(untarDest, wasiRunner.mount) + + // Splitting the env array to separate keys and values + // Assuming env is formatted correctly: KEY=VALUE + var envKeys, envVals []string + for _, v := range wasiRunner.env { + split := strings.Split(v, "=") + key := split[0] + value := split[1] + envKeys = append(envKeys, key) + envVals = append(envVals, value) + } + wasiConfig.SetEnv(envKeys, envVals) + + // Save the WASI Configuration to the store + wasiRunner.store.SetWasi(wasiConfig) + + // Create a linker + wasiRunner.linker = wasmtime.NewLinker(engine) + if err := wasiRunner.linker.DefineWasi(); err != nil { + wasiRunner.Close() + return fmt.Errorf("[WasiFactory] Failed to define WASI in the linker for %s: %v", contID, err) + } + + // Untar the code in a temporary folder + if err := utils.Untar(wasiRunner.tar, untarDest); err != nil { + return fmt.Errorf("[WasiFactory] Faield to untar code for %s: %v", contID, err) + } + + wasmFileName := filepath.Join(untarDest, contID+".wasm") + + // Read module code + moduleData, err := os.ReadFile(wasmFileName) + if err != nil { + wasiRunner.Close() + return fmt.Errorf("[WasiFactory] Failed to read the WASI code for %s: %v", contID, err) + } + // Compile the WASI Module + module, err := wasmtime.NewModule(engine, moduleData) + if err != nil { + if strings.HasPrefix(err.Error(), "failed to parse WebAssembly module") { + // File is a WASI Component + wasiRunner.cliArgs = append(wasiRunner.cliArgs, "--dir", untarDest+"::/app") + for _, v := range wasiRunner.env { + wasiRunner.cliArgs = append(wasiRunner.cliArgs, "--env") + wasiRunner.cliArgs = append(wasiRunner.cliArgs, v) + } + wasiRunner.cliArgs = append(wasiRunner.cliArgs, wasmFileName) + wasiRunner.wasiType = WasiComponent + return nil + } + wasiRunner.Close() + return fmt.Errorf("[WasiFactory] Failed to create WASI Module for %s: %v", contID, err) + } + wasiRunner.module = module + wasiRunner.wasiType = WasiModule + return nil +} + +func (wf *WasiFactory) Destroy(id ContainerID) error { + wasiRunner := wf.runners[id] + if wasiRunner.wasiType == WasiModule { + wasiRunner.Close() + } + delete(wf.runners, id) + return nil +} + +func (wf *WasiFactory) HasImage(string) bool { + log.Println("[WasiFactory] HasImage unimplemented") + return false +} + +func (wf *WasiFactory) PullImage(string) error { + log.Println("[WasiFactory] PullImage unimplemented") + return nil +} + +func (wf *WasiFactory) GetIPAddress(ContainerID) (string, error) { + log.Println("[WasiFactory] GetIPAddress unimplemented") + return "", nil +} + +func (wf *WasiFactory) GetMemoryMB(id ContainerID) (int64, error) { + // NOTE: this can probably be the WasmStackSize + log.Println("[WasiFactory] GetMemoryMB unimplemented") + return 0, nil +} diff --git a/utils/untar.go b/utils/untar.go new file mode 100644 index 00000000..4ed4d104 --- /dev/null +++ b/utils/untar.go @@ -0,0 +1,59 @@ +package utils + +import ( + "archive/tar" + "io" + "os" + "path/filepath" + "strings" +) + +// Modified from: https://github.com/golang/build/blob/master/internal/untar/untar.go + +func Untar(r io.Reader, dir string) (err error) { + // Create tar reader + tr := tar.NewReader(r) + + // Extract each file + for { + header, err := tr.Next() + if err == io.EOF { + break // end of tar archive + } + if err != nil { + return err + } + + // Strip the first component from the header name + components := strings.SplitN(header.Name, "/", 2) + var target string + + if len(components) > 1 { + target = filepath.Join(dir, components[1]) // Skip the first component + } else { + target = filepath.Join(dir, header.Name) // No components to strip + } + + // Check the file type + switch header.Typeflag { + case tar.TypeDir: + // Create directory if it doesn’t exist + if err := os.MkdirAll(target, os.FileMode(header.Mode)); err != nil { + return err + } + case tar.TypeReg: + // Create file + file, err := os.OpenFile(target, os.O_CREATE|os.O_RDWR, os.FileMode(header.Mode)) + if err != nil { + return err + } + defer file.Close() + + // Copy file contents + if _, err := io.Copy(file, tr); err != nil { + return err + } + } + } + return nil +} From e0fc53f03ac65db9055c6b012c7fd0978b1b49b5 Mon Sep 17 00:00:00 2001 From: Alessandro Lioi Date: Fri, 11 Oct 2024 11:38:57 +0200 Subject: [PATCH 03/29] feat(wasi): add new runtime `WASI_RUNTIME` --- internal/api/api.go | 2 +- internal/container/runtimes.go | 1 + internal/node/pool.go | 2 +- internal/scheduling/execution.go | 2 +- 4 files changed, 4 insertions(+), 3 deletions(-) diff --git a/internal/api/api.go b/internal/api/api.go index 8e67a221..34d676bb 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -143,7 +143,7 @@ func CreateFunction(c echo.Context) error { log.Printf("New request: creation of %s\n", f.Name) // Check that the selected runtime exists - if f.Runtime != container.CUSTOM_RUNTIME { + if f.Runtime != container.CUSTOM_RUNTIME && f.Runtime != container.WASI_RUNTIME { _, ok := container.RuntimeToInfo[f.Runtime] if !ok { return c.JSON(http.StatusNotFound, "Invalid runtime.") diff --git a/internal/container/runtimes.go b/internal/container/runtimes.go index f8e8d87d..3e88fee9 100644 --- a/internal/container/runtimes.go +++ b/internal/container/runtimes.go @@ -7,6 +7,7 @@ type RuntimeInfo struct { } const CUSTOM_RUNTIME = "custom" +const WASI_RUNTIME = "wasi" var refreshedImages = map[string]bool{} diff --git a/internal/node/pool.go b/internal/node/pool.go index b7cdcc1f..35ef1a04 100644 --- a/internal/node/pool.go +++ b/internal/node/pool.go @@ -181,7 +181,7 @@ func NewContainer(fun *function.Function) (container.ContainerID, error) { func getImageForFunction(fun *function.Function) (string, error) { var image string - if fun.Runtime == container.CUSTOM_RUNTIME { + if fun.Runtime == container.CUSTOM_RUNTIME || fun.Runtime == container.WASI_RUNTIME { image = fun.CustomImage } else { runtime, ok := container.RuntimeToInfo[fun.Runtime] diff --git a/internal/scheduling/execution.go b/internal/scheduling/execution.go index 1e1baa05..b8223856 100644 --- a/internal/scheduling/execution.go +++ b/internal/scheduling/execution.go @@ -16,7 +16,7 @@ func Execute(contID container.ContainerID, r *scheduledRequest, isWarm bool) (fu //log.Printf("[%s] Executing on container: %v", r.Fun, contID) var req executor.InvocationRequest - if r.Fun.Runtime == container.CUSTOM_RUNTIME { + if r.Fun.Runtime == container.CUSTOM_RUNTIME || r.Fun.Runtime == container.WASI_RUNTIME { req = executor.InvocationRequest{ Params: r.Params, ReturnOutput: r.ReturnOutput, From a6f98b5d791dfae6ce4dc3fa3f8b0c1e66e75ad6 Mon Sep 17 00:00:00 2001 From: Alessandro Lioi Date: Fri, 11 Oct 2024 11:39:50 +0200 Subject: [PATCH 04/29] feat(wasi): separate execution based on factory type NOTE: this will be changed when both runtimes will be supported by the same node --- internal/container/container.go | 92 +++++++++++++++++++++++++++++++- internal/scheduling/scheduler.go | 4 +- 2 files changed, 94 insertions(+), 2 deletions(-) diff --git a/internal/container/container.go b/internal/container/container.go index 3ccba2bf..97c43142 100644 --- a/internal/container/container.go +++ b/internal/container/container.go @@ -8,6 +8,8 @@ import ( "io" "log" "net/http" + "os/exec" + "reflect" "time" "github.com/grussorusso/serverledge/internal/executor" @@ -38,9 +40,97 @@ func NewContainer(image, codeTar string, opts *ContainerOptions) (ContainerID, e return contID, nil } +func Execute(contID ContainerID, req *executor.InvocationRequest) (*executor.InvocationResult, time.Duration, error) { + switch cf.(type) { + case *DockerFactory: + return dockerExecute(contID, req) + case *WasiFactory: + return wasiExecute(contID, req) + default: + return nil, 0, fmt.Errorf("Unrecognized Factory type: %s", reflect.TypeOf(cf).Name()) + } +} + +func wasiExecute(contID ContainerID, req *executor.InvocationRequest) (*executor.InvocationResult, time.Duration, error) { + wasiRunner := cf.(*WasiFactory).runners[contID] + t0 := time.Now() + + if wasiRunner.wasiType == WasiModule { + // Create an instance of the module + instance, err := wasiRunner.linker.Instantiate(wasiRunner.store, wasiRunner.module) + if err != nil { + return nil, time.Now().Sub(t0), fmt.Errorf("Failed to instantiate WASI module: %v", err) + } + + // Get the _start function (entrypoint of any wasm module) + start := instance.GetFunc(wasiRunner.store, "_start") + if start == nil { + return nil, time.Now().Sub(t0), fmt.Errorf("WASI Module does not have a _start function") + } + + // Call the _start function + if _, err := start.Call(wasiRunner.store); err != nil { + return nil, time.Now().Sub(t0), fmt.Errorf("Failed to run WASI module: %v", err) + } + + // Read stdout from the temp file + stdout, err := io.ReadAll(wasiRunner.stdout) + if err != nil { + return nil, time.Now().Sub(t0), fmt.Errorf("Failed to read stdout for WASI: %v", err) + } + + // Read stderr from the temp file + stderr, err := io.ReadAll(wasiRunner.stderr) + if err != nil { + return nil, time.Now().Sub(t0), fmt.Errorf("Failed to read stderr for WASI: %v", err) + } + + // Populate result + res := &executor.InvocationResult{Success: true, Result: string(stdout)} + if req.ReturnOutput { + res.Output = fmt.Sprintf("%s\n%s", string(stdout), string(stderr)) + } + return res, time.Now().Sub(t0), nil + } else if wasiRunner.wasiType == WasiComponent { + // Create wasmtime CLI command + execCmd := exec.Command("wasmtime", wasiRunner.cliArgs...) + + // Save stdout and stderr to another buffer + var stdoutBuffer, stderrBuffer bytes.Buffer + execCmd.Stdout = &stdoutBuffer + execCmd.Stderr = &stderrBuffer + // Execute wasmtime CLI + err := execCmd.Run() + if err != nil { + log.Printf("wasmtime failed with %v\n", err) + } + + // Read stdout from temporary buffer + stdout, err := io.ReadAll(&stdoutBuffer) + if err != nil { + log.Printf("Failed to read stdout: %v", err) + } + + // Read stderr from temporary buffer + stderr, err := io.ReadAll(&stderrBuffer) + if err != nil { + log.Printf("Failed to read stderr: %v", err) + } + + // Create response + resp := &executor.InvocationResult{Success: err == nil, Result: string(stdout)} + if req.ReturnOutput { + resp.Output = fmt.Sprintf("%s\n%s", string(stdout), string(stderr)) + } + return resp, time.Now().Sub(t0), nil + } else { + return nil, 0, fmt.Errorf("Unrecognized WASI Type") + } +} + // Execute interacts with the Executor running in the container to invoke the // function through a HTTP request. -func Execute(contID ContainerID, req *executor.InvocationRequest) (*executor.InvocationResult, time.Duration, error) { +func dockerExecute(contID ContainerID, req *executor.InvocationRequest) (*executor.InvocationResult, time.Duration, error) { ipAddr, err := cf.GetIPAddress(contID) if err != nil { return nil, 0, fmt.Errorf("Failed to retrieve IP address for container: %v", err) diff --git a/internal/scheduling/scheduler.go b/internal/scheduling/scheduler.go index f736c497..2519ab17 100644 --- a/internal/scheduling/scheduler.go +++ b/internal/scheduling/scheduler.go @@ -38,7 +38,9 @@ func Run(p Policy) { node.Resources.ContainerPools = make(map[string]*node.ContainerPool) log.Printf("Current resources: %v\n", &node.Resources) - container.InitDockerContainerFactory() + // TODO: handle both factories + // container.InitDockerContainerFactory() + container.InitWasiFactory() //janitor periodically remove expired warm container node.GetJanitorInstance() From 362e3935722f87ba183cf7a231631c477aa9a4c2 Mon Sep 17 00:00:00 2001 From: Alessandro Lioi Date: Fri, 11 Oct 2024 18:02:06 +0200 Subject: [PATCH 05/29] feat(wasi): support both docker and wasi factories This required the functions in `internal/container/container.go` to require a `function.Function` pointer in the function signature, used to determine what factory should be used The callers always has, or can retrieve, the appropriate `function.Function` pointer --- internal/container/container.go | 28 +++++++++++----------- internal/container/docker.go | 5 +++- internal/container/factory.go | 18 +++++++++++--- internal/container/wasi.go | 5 +++- internal/node/pool.go | 40 ++++++++++++++++++-------------- internal/scheduling/execution.go | 2 +- internal/scheduling/scheduler.go | 4 ++-- 7 files changed, 61 insertions(+), 41 deletions(-) diff --git a/internal/container/container.go b/internal/container/container.go index 97c43142..7b56fd07 100644 --- a/internal/container/container.go +++ b/internal/container/container.go @@ -9,14 +9,15 @@ import ( "log" "net/http" "os/exec" - "reflect" "time" "github.com/grussorusso/serverledge/internal/executor" + "github.com/grussorusso/serverledge/internal/function" ) // NewContainer creates and starts a new container. -func NewContainer(image, codeTar string, opts *ContainerOptions) (ContainerID, error) { +func NewContainer(image, codeTar string, opts *ContainerOptions, f *function.Function) (ContainerID, error) { + cf := GetFactoryFromRuntime(f.Runtime) contID, err := cf.Create(image, opts) if err != nil { log.Printf("Failed container creation\n") @@ -40,19 +41,16 @@ func NewContainer(image, codeTar string, opts *ContainerOptions) (ContainerID, e return contID, nil } -func Execute(contID ContainerID, req *executor.InvocationRequest) (*executor.InvocationResult, time.Duration, error) { - switch cf.(type) { - case *DockerFactory: - return dockerExecute(contID, req) - case *WasiFactory: +func Execute(contID ContainerID, req *executor.InvocationRequest, f *function.Function) (*executor.InvocationResult, time.Duration, error) { + if f.Runtime == WASI_RUNTIME { return wasiExecute(contID, req) - default: - return nil, 0, fmt.Errorf("Unrecognized Factory type: %s", reflect.TypeOf(cf).Name()) + } else { + return dockerExecute(contID, req) } } func wasiExecute(contID ContainerID, req *executor.InvocationRequest) (*executor.InvocationResult, time.Duration, error) { - wasiRunner := cf.(*WasiFactory).runners[contID] + wasiRunner := factories[WASI_FACTORY_KEY].(*WasiFactory).runners[contID] t0 := time.Now() if wasiRunner.wasiType == WasiModule { @@ -131,7 +129,7 @@ func wasiExecute(contID ContainerID, req *executor.InvocationRequest) (*executor // Execute interacts with the Executor running in the container to invoke the // function through a HTTP request. func dockerExecute(contID ContainerID, req *executor.InvocationRequest) (*executor.InvocationResult, time.Duration, error) { - ipAddr, err := cf.GetIPAddress(contID) + ipAddr, err := factories[DOCKER_FACTORY_KEY].GetIPAddress(contID) if err != nil { return nil, 0, fmt.Errorf("Failed to retrieve IP address for container: %v", err) } @@ -160,12 +158,12 @@ func dockerExecute(contID ContainerID, req *executor.InvocationRequest) (*execut return response, waitDuration, nil } -func GetMemoryMB(id ContainerID) (int64, error) { - return cf.GetMemoryMB(id) +func GetMemoryMB(id ContainerID, f *function.Function) (int64, error) { + return GetFactoryFromRuntime(f.Runtime).GetMemoryMB(id) } -func Destroy(id ContainerID) error { - return cf.Destroy(id) +func Destroy(id ContainerID, f *function.Function) error { + return GetFactoryFromRuntime(f.Runtime).Destroy(id) } func sendPostRequestWithRetries(url string, body *bytes.Buffer) (*http.Response, time.Duration, error) { diff --git a/internal/container/docker.go b/internal/container/docker.go index 43e05922..c4da7ac7 100644 --- a/internal/container/docker.go +++ b/internal/container/docker.go @@ -27,7 +27,10 @@ func InitDockerContainerFactory() *DockerFactory { } dockerFact := &DockerFactory{cli, ctx} - cf = dockerFact + if factories == nil { + factories = make(map[string]Factory) + } + factories[DOCKER_FACTORY_KEY] = dockerFact return dockerFact } diff --git a/internal/container/factory.go b/internal/container/factory.go index f5204019..68818eeb 100644 --- a/internal/container/factory.go +++ b/internal/container/factory.go @@ -4,6 +4,9 @@ import ( "io" ) +const WASI_FACTORY_KEY = "wasi" +const DOCKER_FACTORY_KEY = "docker" + // A Factory to create and manage container. type Factory interface { Create(string, *ContainerOptions) (ContainerID, error) @@ -26,10 +29,19 @@ type ContainerOptions struct { type ContainerID = string -// cf is the container factory for the node -var cf Factory +// Factories for this node; currently supporting only Docker and WASI +var factories map[string]Factory + +func GetFactoryFromRuntime(runtime string) Factory { + if runtime == WASI_RUNTIME { + return factories[WASI_FACTORY_KEY] + } else { + return factories[DOCKER_FACTORY_KEY] + } +} -func DownloadImage(image string, forceRefresh bool) error { +func DownloadImage(image string, forceRefresh bool, runtime string) error { + cf := GetFactoryFromRuntime(runtime) if forceRefresh || !cf.HasImage(image) { return cf.PullImage(image) } diff --git a/internal/container/wasi.go b/internal/container/wasi.go index 85aa5833..6300a6bd 100644 --- a/internal/container/wasi.go +++ b/internal/container/wasi.go @@ -63,7 +63,10 @@ func (wr *wasiRunner) Close() { func InitWasiFactory() *WasiFactory { ctx := context.Background() wasiFactory := &WasiFactory{ctx, make(map[string]*wasiRunner)} - cf = wasiFactory + if factories == nil { + factories = make(map[string]Factory) + } + factories[WASI_FACTORY_KEY] = wasiFactory return wasiFactory } diff --git a/internal/node/pool.go b/internal/node/pool.go index 35ef1a04..e456663b 100644 --- a/internal/node/pool.go +++ b/internal/node/pool.go @@ -206,7 +206,7 @@ func NewContainerWithAcquiredResources(fun *function.Function) (container.Contai contID, err := container.NewContainer(image, fun.TarFunctionCode, &container.ContainerOptions{ MemoryMB: fun.MemoryMB, CPUQuota: fun.CPUDemand, - }) + }, fun) if err != nil { log.Printf("Failed container creation: %v\n", err) @@ -230,6 +230,7 @@ type itemToDismiss struct { pool *ContainerPool elem *list.Element memory int64 + fun *function.Function } // dismissContainer ... this function is used to get free memory used for a new container @@ -241,17 +242,18 @@ func dismissContainer(requiredMemoryMB int64) (bool, error) { res := false //first phase, research - for _, funPool := range Resources.ContainerPools { + for fun, funPool := range Resources.ContainerPools { + functionDescriptor, _ := function.GetFunction(fun) if funPool.ready.Len() > 0 { // every container into the funPool has the same memory (same function) //so it is not important which one you destroy elem := funPool.ready.Front() contID := elem.Value.(warmContainer).contID // container in the same pool need same memory - memory, _ := container.GetMemoryMB(contID) + memory, _ := container.GetMemoryMB(contID, functionDescriptor) for ok := true; ok; ok = elem != nil { containerToDismiss = append(containerToDismiss, - itemToDismiss{contID: contID, pool: funPool, elem: elem, memory: memory}) + itemToDismiss{contID: contID, pool: funPool, elem: elem, memory: memory, fun: functionDescriptor}) cleanedMB += memory if cleanedMB >= requiredMemoryMB { goto cleanup @@ -266,8 +268,8 @@ cleanup: // second phase, cleanup // memory check if cleanedMB >= requiredMemoryMB { for _, item := range containerToDismiss { - item.pool.ready.Remove(item.elem) // remove the container from the funPool - err := container.Destroy(item.contID) // destroy the container + item.pool.ready.Remove(item.elem) // remove the container from the funPool + err := container.Destroy(item.contID, item.fun) // destroy the container if err != nil { res = false return res, nil @@ -288,8 +290,10 @@ func DeleteExpiredContainer() { Resources.Lock() defer Resources.Unlock() - for _, pool := range Resources.ContainerPools { + for fun, pool := range Resources.ContainerPools { elem := pool.ready.Front() + functionDescriptor, _ := function.GetFunction(fun) + for ok := elem != nil; ok; ok = elem != nil { warmed := elem.Value.(warmContainer) if now > warmed.Expiration { @@ -298,9 +302,9 @@ func DeleteExpiredContainer() { log.Printf("cleaner: Removing container %s\n", warmed.contID) pool.ready.Remove(temp) // remove the expired element - memory, _ := container.GetMemoryMB(warmed.contID) + memory, _ := container.GetMemoryMB(warmed.contID, functionDescriptor) releaseResources(0, memory) - err := container.Destroy(warmed.contID) + err := container.Destroy(warmed.contID, functionDescriptor) if err != nil { log.Printf("Error while destroying container %s: %s\n", warmed.contID, err) } @@ -334,7 +338,7 @@ func ShutdownWarmContainersFor(f *function.Function) { log.Printf("Removing container with ID %s\n", warmed.contID) fp.ready.Remove(temp) - memory, _ := container.GetMemoryMB(warmed.contID) + memory, _ := container.GetMemoryMB(warmed.contID, f) Resources.AvailableMemMB += memory containersToDelete = append(containersToDelete, warmed.contID) } @@ -342,7 +346,7 @@ func ShutdownWarmContainersFor(f *function.Function) { go func(contIDs []container.ContainerID) { for _, contID := range contIDs { // No need to update available resources here - if err := container.Destroy(contID); err != nil { + if err := container.Destroy(contID, f); err != nil { log.Printf("An error occurred while deleting %s: %v\n", contID, err) } else { log.Printf("Deleted %s\n", contID) @@ -358,6 +362,8 @@ func ShutdownAllContainers() { for fun, pool := range Resources.ContainerPools { elem := pool.ready.Front() + functionDescriptor, _ := function.GetFunction(fun) + for ok := elem != nil; ok; ok = elem != nil { warmed := elem.Value.(warmContainer) temp := elem @@ -365,16 +371,14 @@ func ShutdownAllContainers() { log.Printf("Removing container with ID %s\n", warmed.contID) pool.ready.Remove(temp) - memory, _ := container.GetMemoryMB(warmed.contID) - err := container.Destroy(warmed.contID) + memory, _ := container.GetMemoryMB(warmed.contID, functionDescriptor) + err := container.Destroy(warmed.contID, functionDescriptor) if err != nil { log.Printf("Error while destroying container %s: %s", warmed.contID, err) } Resources.AvailableMemMB += memory } - functionDescriptor, _ := function.GetFunction(fun) - elem = pool.busy.Front() for ok := elem != nil; ok; ok = elem != nil { contID := elem.Value.(container.ContainerID) @@ -383,8 +387,8 @@ func ShutdownAllContainers() { log.Printf("Removing container with ID %s\n", contID) pool.ready.Remove(temp) - memory, _ := container.GetMemoryMB(contID) - err := container.Destroy(contID) + memory, _ := container.GetMemoryMB(contID, functionDescriptor) + err := container.Destroy(contID, functionDescriptor) if err != nil { log.Printf("Error while destroying container %s: %s", contID, err) } @@ -411,7 +415,7 @@ func PrewarmInstances(f *function.Function, count int64, forcePull bool) (int64, if err != nil { return 0, err } - err = container.DownloadImage(image, forcePull) + err = container.DownloadImage(image, forcePull, f.Runtime) if err != nil { return 0, err } diff --git a/internal/scheduling/execution.go b/internal/scheduling/execution.go index b8223856..26f444e1 100644 --- a/internal/scheduling/execution.go +++ b/internal/scheduling/execution.go @@ -35,7 +35,7 @@ func Execute(contID container.ContainerID, r *scheduledRequest, isWarm bool) (fu t0 := time.Now() initTime := t0.Sub(r.Arrival).Seconds() - response, invocationWait, err := container.Execute(contID, &req) + response, invocationWait, err := container.Execute(contID, &req, r.Fun) if err != nil { // notify scheduler completions <- &completionNotification{fun: r.Fun, contID: contID, executionReport: nil} diff --git a/internal/scheduling/scheduler.go b/internal/scheduling/scheduler.go index 2519ab17..3afd2857 100644 --- a/internal/scheduling/scheduler.go +++ b/internal/scheduling/scheduler.go @@ -38,8 +38,8 @@ func Run(p Policy) { node.Resources.ContainerPools = make(map[string]*node.ContainerPool) log.Printf("Current resources: %v\n", &node.Resources) - // TODO: handle both factories - // container.InitDockerContainerFactory() + // Create factories for Docker and Wasi + container.InitDockerContainerFactory() container.InitWasiFactory() //janitor periodically remove expired warm container From 7e22e5d503d9da33ff52a9d6dd26578474bfa7df Mon Sep 17 00:00:00 2001 From: Alessandro Lioi Date: Fri, 11 Oct 2024 18:09:07 +0200 Subject: [PATCH 06/29] fix(wasi): rename WasiType const to upper case --- internal/container/container.go | 4 ++-- internal/container/wasi.go | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/internal/container/container.go b/internal/container/container.go index 7b56fd07..df44b5d2 100644 --- a/internal/container/container.go +++ b/internal/container/container.go @@ -53,7 +53,7 @@ func wasiExecute(contID ContainerID, req *executor.InvocationRequest) (*executor wasiRunner := factories[WASI_FACTORY_KEY].(*WasiFactory).runners[contID] t0 := time.Now() - if wasiRunner.wasiType == WasiModule { + if wasiRunner.wasiType == WASI_TYPE_MODULE { // Create an instance of the module instance, err := wasiRunner.linker.Instantiate(wasiRunner.store, wasiRunner.module) if err != nil { @@ -89,7 +89,7 @@ func wasiExecute(contID ContainerID, req *executor.InvocationRequest) (*executor res.Output = fmt.Sprintf("%s\n%s", string(stdout), string(stderr)) } return res, time.Now().Sub(t0), nil - } else if wasiRunner.wasiType == WasiComponent { + } else if wasiRunner.wasiType == WASI_TYPE_COMPONENT { // Create wasmtime CLI command execCmd := exec.Command("wasmtime", wasiRunner.cliArgs...) diff --git a/internal/container/wasi.go b/internal/container/wasi.go index 6300a6bd..def4ea12 100644 --- a/internal/container/wasi.go +++ b/internal/container/wasi.go @@ -15,8 +15,8 @@ import ( type WasiType string -const WasiModule WasiType = "module" -const WasiComponent WasiType = "component" +const WASI_TYPE_MODULE WasiType = "module" +const WASI_TYPE_COMPONENT WasiType = "component" type WasiFactory struct { ctx context.Context @@ -174,20 +174,20 @@ func (wf *WasiFactory) Start(contID ContainerID) error { wasiRunner.cliArgs = append(wasiRunner.cliArgs, v) } wasiRunner.cliArgs = append(wasiRunner.cliArgs, wasmFileName) - wasiRunner.wasiType = WasiComponent + wasiRunner.wasiType = WASI_TYPE_COMPONENT return nil } wasiRunner.Close() return fmt.Errorf("[WasiFactory] Failed to create WASI Module for %s: %v", contID, err) } wasiRunner.module = module - wasiRunner.wasiType = WasiModule + wasiRunner.wasiType = WASI_TYPE_MODULE return nil } func (wf *WasiFactory) Destroy(id ContainerID) error { wasiRunner := wf.runners[id] - if wasiRunner.wasiType == WasiModule { + if wasiRunner.wasiType == WASI_TYPE_MODULE { wasiRunner.Close() } delete(wf.runners, id) From d521a1a671bdedbf3e3d74ae73c2190ac7e165e4 Mon Sep 17 00:00:00 2001 From: Alessandro Lioi Date: Sat, 12 Oct 2024 14:53:35 +0200 Subject: [PATCH 07/29] feat(wasi): handle URL in `src` field in etcd --- internal/cli/cli.go | 17 +++++++++++++---- internal/container/container.go | 25 +++++++++++++++++++++---- 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/internal/cli/cli.go b/internal/cli/cli.go index 85667c19..e79c30cf 100644 --- a/internal/cli/cli.go +++ b/internal/cli/cli.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net/http" + "net/url" "os" "strings" @@ -195,10 +196,18 @@ func create(cmd *cobra.Command, args []string) { var encoded string if runtime != "custom" { - srcContent, err := readSourcesAsTar(src) - if err != nil { - fmt.Printf("%v\n", err) - os.Exit(3) + var srcContent []byte + u, err := url.ParseRequestURI(src) + if err == nil && u.Scheme != "" && u.Host != "" { + // src is a URL + srcContent = []byte(src) + } else { + // src is a folder; a tar has to be created to be uploaded to etcd + srcContent, err = readSourcesAsTar(src) + if err != nil { + fmt.Printf("%v\n", err) + os.Exit(3) + } } encoded = base64.StdEncoding.EncodeToString(srcContent) } else { diff --git a/internal/container/container.go b/internal/container/container.go index df44b5d2..7dc55946 100644 --- a/internal/container/container.go +++ b/internal/container/container.go @@ -8,6 +8,7 @@ import ( "io" "log" "net/http" + "net/url" "os/exec" "time" @@ -16,7 +17,7 @@ import ( ) // NewContainer creates and starts a new container. -func NewContainer(image, codeTar string, opts *ContainerOptions, f *function.Function) (ContainerID, error) { +func NewContainer(image, base64Src string, opts *ContainerOptions, f *function.Function) (ContainerID, error) { cf := GetFactoryFromRuntime(f.Runtime) contID, err := cf.Create(image, opts) if err != nil { @@ -24,9 +25,25 @@ func NewContainer(image, codeTar string, opts *ContainerOptions, f *function.Fun return "", err } - if len(codeTar) > 0 { - decodedCode, _ := base64.StdEncoding.DecodeString(codeTar) - err = cf.CopyToContainer(contID, bytes.NewReader(decodedCode), "/app/") + if len(base64Src) > 0 { + var r io.Reader + // Decoding src + decodedSrc, _ := base64.StdEncoding.DecodeString(base64Src) + // Check if decoded src is a url + u, err := url.ParseRequestURI(string(decodedSrc)) + if err == nil && u.Scheme != "" && u.Host != "" { + // src is url; it has to be downloaded + resp, err := http.Get(string(decodedSrc)) + if err != nil { + log.Printf("Failed to download code %s", decodedSrc) + return "", err + } + r = resp.Body + } else { + // assuming decodedSrc is Base64 encoded tar + r = bytes.NewReader(decodedSrc) + } + err = cf.CopyToContainer(contID, r, "/app/") if err != nil { log.Printf("Failed code copy\n") return "", err From 8f68087a3a68be81db2146f512f574970fdb39d3 Mon Sep 17 00:00:00 2001 From: Alessandro Lioi Date: Tue, 15 Oct 2024 11:50:41 +0200 Subject: [PATCH 08/29] fix(wasi): `GetFactoryFromFunction` is using `function.Function` Note: it internally still uses the `Runtime` field --- internal/container/container.go | 6 +++--- internal/container/factory.go | 10 ++++++---- internal/node/pool.go | 2 +- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/internal/container/container.go b/internal/container/container.go index 7dc55946..6105eff8 100644 --- a/internal/container/container.go +++ b/internal/container/container.go @@ -18,7 +18,7 @@ import ( // NewContainer creates and starts a new container. func NewContainer(image, base64Src string, opts *ContainerOptions, f *function.Function) (ContainerID, error) { - cf := GetFactoryFromRuntime(f.Runtime) + cf := GetFactoryFromFunction(f) contID, err := cf.Create(image, opts) if err != nil { log.Printf("Failed container creation\n") @@ -176,11 +176,11 @@ func dockerExecute(contID ContainerID, req *executor.InvocationRequest) (*execut } func GetMemoryMB(id ContainerID, f *function.Function) (int64, error) { - return GetFactoryFromRuntime(f.Runtime).GetMemoryMB(id) + return GetFactoryFromFunction(f).GetMemoryMB(id) } func Destroy(id ContainerID, f *function.Function) error { - return GetFactoryFromRuntime(f.Runtime).Destroy(id) + return GetFactoryFromFunction(f).Destroy(id) } func sendPostRequestWithRetries(url string, body *bytes.Buffer) (*http.Response, time.Duration, error) { diff --git a/internal/container/factory.go b/internal/container/factory.go index 68818eeb..5cbb6cc8 100644 --- a/internal/container/factory.go +++ b/internal/container/factory.go @@ -2,6 +2,8 @@ package container import ( "io" + + "github.com/grussorusso/serverledge/internal/function" ) const WASI_FACTORY_KEY = "wasi" @@ -32,16 +34,16 @@ type ContainerID = string // Factories for this node; currently supporting only Docker and WASI var factories map[string]Factory -func GetFactoryFromRuntime(runtime string) Factory { - if runtime == WASI_RUNTIME { +func GetFactoryFromFunction(f *function.Function) Factory { + if f.Runtime == WASI_RUNTIME { return factories[WASI_FACTORY_KEY] } else { return factories[DOCKER_FACTORY_KEY] } } -func DownloadImage(image string, forceRefresh bool, runtime string) error { - cf := GetFactoryFromRuntime(runtime) +func DownloadImage(image string, forceRefresh bool, f *function.Function) error { + cf := GetFactoryFromFunction(f) if forceRefresh || !cf.HasImage(image) { return cf.PullImage(image) } diff --git a/internal/node/pool.go b/internal/node/pool.go index e456663b..9e803667 100644 --- a/internal/node/pool.go +++ b/internal/node/pool.go @@ -415,7 +415,7 @@ func PrewarmInstances(f *function.Function, count int64, forcePull bool) (int64, if err != nil { return 0, err } - err = container.DownloadImage(image, forcePull, f.Runtime) + err = container.DownloadImage(image, forcePull, f) if err != nil { return 0, err } From cebb965b219af4123fe93a58182f34f510c50ef3 Mon Sep 17 00:00:00 2001 From: Alessandro Lioi Date: Tue, 15 Oct 2024 15:34:12 +0200 Subject: [PATCH 09/29] fix(wasi): create wasi engine once --- internal/container/wasi.go | 34 ++++++++++++++++------------------ 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/internal/container/wasi.go b/internal/container/wasi.go index def4ea12..5aab395c 100644 --- a/internal/container/wasi.go +++ b/internal/container/wasi.go @@ -21,6 +21,7 @@ const WASI_TYPE_COMPONENT WasiType = "component" type WasiFactory struct { ctx context.Context runners map[string]*wasiRunner + engine *wasmtime.Engine } type wasiRunner struct { @@ -36,10 +37,6 @@ type wasiRunner struct { stderr *os.File // Temporary file for the stderr // WASI Component Specifics cliArgs []string - - // TODO: check if it's possible to set RAM and CPU quota in CLI and wasmtime-go - // - config.SetMaxWasmStack can be used to set the max "RAM" - // - fuel or epoch interruption can probably be used to simulate the CPU quota } func (wr *wasiRunner) Close() { @@ -62,7 +59,18 @@ func (wr *wasiRunner) Close() { func InitWasiFactory() *WasiFactory { ctx := context.Background() - wasiFactory := &WasiFactory{ctx, make(map[string]*wasiRunner)} + // Create Engine configuration + engineConfig := wasmtime.NewConfig() + engineConfig.SetWasmRelaxedSIMD(true) + engineConfig.SetWasmBulkMemory(true) + engineConfig.SetWasmMultiValue(true) + engineConfig.SetCraneliftOptLevel(wasmtime.OptLevelSpeed) + engineConfig.SetStrategy(wasmtime.StrategyCranelift) + + // Create wasmtime engine, shared for all modules + engine := wasmtime.NewEngineWithConfig(engineConfig) + + wasiFactory := &WasiFactory{ctx, make(map[string]*wasiRunner), engine} if factories == nil { factories = make(map[string]Factory) } @@ -93,18 +101,8 @@ func (wf *WasiFactory) Start(contID ContainerID) error { return fmt.Errorf("[WasiFactory]: no runner with %s found", contID) } - // Create Store configuration - storeConfig := wasmtime.NewConfig() - storeConfig.SetWasmRelaxedSIMD(true) - storeConfig.SetWasmBulkMemory(true) - storeConfig.SetWasmMultiValue(true) - // NOTE: this can probably be the RAM limit - // storeConfig.SetMaxWasmStack() - - // Create wasmtime engine - engine := wasmtime.NewEngineWithConfig(storeConfig) // Create new store - wasiRunner.store = wasmtime.NewStore(engine) + wasiRunner.store = wasmtime.NewStore(wf.engine) // Create WASI Configuration wasiConfig := wasmtime.NewWasiConfig() @@ -144,7 +142,7 @@ func (wf *WasiFactory) Start(contID ContainerID) error { wasiRunner.store.SetWasi(wasiConfig) // Create a linker - wasiRunner.linker = wasmtime.NewLinker(engine) + wasiRunner.linker = wasmtime.NewLinker(wf.engine) if err := wasiRunner.linker.DefineWasi(); err != nil { wasiRunner.Close() return fmt.Errorf("[WasiFactory] Failed to define WASI in the linker for %s: %v", contID, err) @@ -164,7 +162,7 @@ func (wf *WasiFactory) Start(contID ContainerID) error { return fmt.Errorf("[WasiFactory] Failed to read the WASI code for %s: %v", contID, err) } // Compile the WASI Module - module, err := wasmtime.NewModule(engine, moduleData) + module, err := wasmtime.NewModule(wf.engine, moduleData) if err != nil { if strings.HasPrefix(err.Error(), "failed to parse WebAssembly module") { // File is a WASI Component From e36a4c30221b65b2eae182e6b85d2997f53dfc30 Mon Sep 17 00:00:00 2001 From: Alessandro Lioi Date: Sat, 26 Oct 2024 17:31:36 +0200 Subject: [PATCH 10/29] feat(wasi): untar in `CopyToContainer`; add comments --- internal/container/wasi.go | 61 +++++++++++++++++++++++--------------- 1 file changed, 37 insertions(+), 24 deletions(-) diff --git a/internal/container/wasi.go b/internal/container/wasi.go index 5aab395c..5f5c70cd 100644 --- a/internal/container/wasi.go +++ b/internal/container/wasi.go @@ -25,10 +25,10 @@ type WasiFactory struct { } type wasiRunner struct { - wasiType WasiType // WasiModule is executed using wasmtime-go; WasiComponent using Wasmtime CLI - env []string // List of KEY=VALUE - mount string // Directories are preloaded to this mount-point - tar io.Reader // Tar of .wasm and other required files + wasiType WasiType // WasiModule is executed using wasmtime-go; WasiComponent using Wasmtime CLI + env []string // List of KEY=VALUE pairs + dir string // Wasm Directory + mount string // Wasm Directory is preloaded to this mount-point // WASI Module Specifics store *wasmtime.Store // Group of WASM instances linker *wasmtime.Linker // Used to instantiate module @@ -64,12 +64,13 @@ func InitWasiFactory() *WasiFactory { engineConfig.SetWasmRelaxedSIMD(true) engineConfig.SetWasmBulkMemory(true) engineConfig.SetWasmMultiValue(true) - engineConfig.SetCraneliftOptLevel(wasmtime.OptLevelSpeed) engineConfig.SetStrategy(wasmtime.StrategyCranelift) + engineConfig.SetCraneliftOptLevel(wasmtime.OptLevelSpeed) // Create wasmtime engine, shared for all modules engine := wasmtime.NewEngineWithConfig(engineConfig) + // Create the factory wasiFactory := &WasiFactory{ctx, make(map[string]*wasiRunner), engine} if factories == nil { factories = make(map[string]Factory) @@ -81,14 +82,27 @@ func InitWasiFactory() *WasiFactory { // Image is the ID // NOTE: this approach requires Runtime to be set to wasi and CustomImage to an identifier (e.g. function name) func (wf *WasiFactory) Create(image string, opts *ContainerOptions) (ContainerID, error) { + // Create the new runner wf.runners[image] = &wasiRunner{env: opts.Env} return image, nil } -// Saves the decoded function code in the Wasi Runner +// Untar the decoded function code into a temporary directory func (wf *WasiFactory) CopyToContainer(contID ContainerID, content io.Reader, destPath string) error { - wf.runners[contID].tar = content - wf.runners[contID].mount = destPath + // Create temporary directory to store untar-ed wasm file + dir, err := os.MkdirTemp("", contID) + if err != nil { + return fmt.Errorf("[WasiFactory] Failed to create temporary directory for %s: %v", contID, err) + } + // Save directory name + wf.runners[contID].dir = dir + // Untar code + if err := utils.Untar(content, dir); err != nil { + return fmt.Errorf("[WasiFactory] Failed to untar code for %s: %v", contID, err) + } + // NOTE: hard-coding `destPath` as `/` + // this is required to correctly use the official Python interpreter + wf.runners[contID].mount = "/" return nil } @@ -96,6 +110,7 @@ func (wf *WasiFactory) CopyToContainer(contID ContainerID, content io.Reader, de // Component: creates the CLI command // NOTE: using contID (set as custom_image from CLI as the wasm filename inside the tar) func (wf *WasiFactory) Start(contID ContainerID) error { + // Get the wasi runner wasiRunner, ok := wf.runners[contID] if !ok { return fmt.Errorf("[WasiFactory]: no runner with %s found", contID) @@ -106,6 +121,7 @@ func (wf *WasiFactory) Start(contID ContainerID) error { // Create WASI Configuration wasiConfig := wasmtime.NewWasiConfig() + // Create temporary files for stdout and stderr for this function stdout, err := os.CreateTemp("", fmt.Sprintf("%s-stdout", contID)) if err != nil { return fmt.Errorf("[WasiFactory]: failed to create temp stdout file for %s: %v", contID, err) @@ -114,18 +130,18 @@ func (wf *WasiFactory) Start(contID ContainerID) error { if err != nil { return fmt.Errorf("[WasiFactory]: failed to create temp stderr file for %s: %v", contID, err) } + // Set wasmtime to use the temporary files for stdout and stderr wasiConfig.SetStdoutFile(stdout.Name()) wasiConfig.SetStderrFile(stderr.Name()) + // Save the references to the temporary files wasiRunner.stdout = stdout wasiRunner.stderr = stderr - untarDest, err := os.MkdirTemp("", contID) - if err != nil { - return fmt.Errorf("[WasiFactory] Failed to create temporary directory for %s: %v", contID, err) + // Mount the temporary directory to the specified mount point + if err := wasiConfig.PreopenDir(wasiRunner.dir, wasiRunner.mount); err != nil { + return fmt.Errorf("[WasiFactory] Failed to preopen %s: %v", wasiRunner.mount, err) } - wasiConfig.PreopenDir(untarDest, wasiRunner.mount) - // Splitting the env array to separate keys and values // Assuming env is formatted correctly: KEY=VALUE var envKeys, envVals []string @@ -136,6 +152,7 @@ func (wf *WasiFactory) Start(contID ContainerID) error { envKeys = append(envKeys, key) envVals = append(envVals, value) } + // Set environment variables in WASI wasiConfig.SetEnv(envKeys, envVals) // Save the WASI Configuration to the store @@ -148,12 +165,8 @@ func (wf *WasiFactory) Start(contID ContainerID) error { return fmt.Errorf("[WasiFactory] Failed to define WASI in the linker for %s: %v", contID, err) } - // Untar the code in a temporary folder - if err := utils.Untar(wasiRunner.tar, untarDest); err != nil { - return fmt.Errorf("[WasiFactory] Faield to untar code for %s: %v", contID, err) - } - - wasmFileName := filepath.Join(untarDest, contID+".wasm") + // Determine wasm file name + wasmFileName := filepath.Join(wasiRunner.dir, contID+".wasm") // Read module code moduleData, err := os.ReadFile(wasmFileName) @@ -161,12 +174,12 @@ func (wf *WasiFactory) Start(contID ContainerID) error { wasiRunner.Close() return fmt.Errorf("[WasiFactory] Failed to read the WASI code for %s: %v", contID, err) } - // Compile the WASI Module + // Try to compile the WASI Module module, err := wasmtime.NewModule(wf.engine, moduleData) if err != nil { if strings.HasPrefix(err.Error(), "failed to parse WebAssembly module") { // File is a WASI Component - wasiRunner.cliArgs = append(wasiRunner.cliArgs, "--dir", untarDest+"::/app") + wasiRunner.cliArgs = append(wasiRunner.cliArgs, "--dir", wasiRunner.dir+"::"+wasiRunner.mount) for _, v := range wasiRunner.env { wasiRunner.cliArgs = append(wasiRunner.cliArgs, "--env") wasiRunner.cliArgs = append(wasiRunner.cliArgs, v) @@ -175,17 +188,18 @@ func (wf *WasiFactory) Start(contID ContainerID) error { wasiRunner.wasiType = WASI_TYPE_COMPONENT return nil } + // There was another error; wasm file is incorrect wasiRunner.Close() return fmt.Errorf("[WasiFactory] Failed to create WASI Module for %s: %v", contID, err) } + // File was compiled successfully wasiRunner.module = module wasiRunner.wasiType = WASI_TYPE_MODULE return nil } func (wf *WasiFactory) Destroy(id ContainerID) error { - wasiRunner := wf.runners[id] - if wasiRunner.wasiType == WASI_TYPE_MODULE { + if wasiRunner, ok := wf.runners[id]; ok { wasiRunner.Close() } delete(wf.runners, id) @@ -208,7 +222,6 @@ func (wf *WasiFactory) GetIPAddress(ContainerID) (string, error) { } func (wf *WasiFactory) GetMemoryMB(id ContainerID) (int64, error) { - // NOTE: this can probably be the WasmStackSize log.Println("[WasiFactory] GetMemoryMB unimplemented") return 0, nil } From 33c61f20e77ebed3a2174e1843eed3048f543fdb Mon Sep 17 00:00:00 2001 From: Alessandro Lioi Date: Sat, 26 Oct 2024 17:31:51 +0200 Subject: [PATCH 11/29] feat(wasi): remove files and folders when deleting runner --- internal/container/wasi.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/internal/container/wasi.go b/internal/container/wasi.go index 5f5c70cd..12ddaf4a 100644 --- a/internal/container/wasi.go +++ b/internal/container/wasi.go @@ -51,9 +51,20 @@ func (wr *wasiRunner) Close() { } if wr.stdout != nil { wr.stdout.Close() + if err := os.Remove(wr.stdout.Name()); err != nil { + log.Printf("[WasiFactory] Failed to delete temporary stdout file: %v", err) + } } if wr.stderr != nil { wr.stderr.Close() + if err := os.Remove(wr.stderr.Name()); err != nil { + log.Printf("[WasiFactory] Failed to delete temporary stderr file: %v", err) + } + } + if wr.dir != "" { + if err := os.RemoveAll(wr.dir); err != nil { + log.Printf("[WasiFactory] Failed to delete temporary directory: %v", err) + } } } From de1ca1f431c92e1622988ae15f2a112aa80de138 Mon Sep 17 00:00:00 2001 From: Alessandro Lioi Date: Mon, 28 Oct 2024 10:34:39 +0100 Subject: [PATCH 12/29] feat(wasi): lock on runner creation to avoid duplicates fix multi-thread use: Store and WasiConfig can be used only once --- internal/container/container.go | 32 ++++-- internal/container/wasi.go | 170 +++++++++++++++++--------------- 2 files changed, 112 insertions(+), 90 deletions(-) diff --git a/internal/container/container.go b/internal/container/container.go index 6105eff8..ce39c51e 100644 --- a/internal/container/container.go +++ b/internal/container/container.go @@ -12,6 +12,7 @@ import ( "os/exec" "time" + "github.com/bytecodealliance/wasmtime-go/v25" "github.com/grussorusso/serverledge/internal/executor" "github.com/grussorusso/serverledge/internal/function" ) @@ -67,35 +68,48 @@ func Execute(contID ContainerID, req *executor.InvocationRequest, f *function.Fu } func wasiExecute(contID ContainerID, req *executor.InvocationRequest) (*executor.InvocationResult, time.Duration, error) { - wasiRunner := factories[WASI_FACTORY_KEY].(*WasiFactory).runners[contID] + wf := factories[WASI_FACTORY_KEY].(*WasiFactory) + wr := wf.runners[contID] t0 := time.Now() - if wasiRunner.wasiType == WASI_TYPE_MODULE { + if wr.wasiType == WASI_TYPE_MODULE { + // Create a new Wasi Configuration + wasiConfig, err := wr.BuildWasiConfiguration(contID) + if err != nil { + return nil, time.Now().Sub(t0), err + } + defer wasiConfig.Close() + + // Create new store for this module + store := wasmtime.NewStore(wf.engine) + store.SetWasi(wasiConfig) + defer store.Close() + // Create an instance of the module - instance, err := wasiRunner.linker.Instantiate(wasiRunner.store, wasiRunner.module) + instance, err := wr.linker.Instantiate(store, wr.module) if err != nil { return nil, time.Now().Sub(t0), fmt.Errorf("Failed to instantiate WASI module: %v", err) } // Get the _start function (entrypoint of any wasm module) - start := instance.GetFunc(wasiRunner.store, "_start") + start := instance.GetFunc(store, "_start") if start == nil { return nil, time.Now().Sub(t0), fmt.Errorf("WASI Module does not have a _start function") } // Call the _start function - if _, err := start.Call(wasiRunner.store); err != nil { + if _, err := start.Call(store); err != nil { return nil, time.Now().Sub(t0), fmt.Errorf("Failed to run WASI module: %v", err) } // Read stdout from the temp file - stdout, err := io.ReadAll(wasiRunner.stdout) + stdout, err := io.ReadAll(wr.stdout) if err != nil { return nil, time.Now().Sub(t0), fmt.Errorf("Failed to read stdout for WASI: %v", err) } // Read stderr from the temp file - stderr, err := io.ReadAll(wasiRunner.stderr) + stderr, err := io.ReadAll(wr.stderr) if err != nil { return nil, time.Now().Sub(t0), fmt.Errorf("Failed to read stderr for WASI: %v", err) } @@ -106,9 +120,9 @@ func wasiExecute(contID ContainerID, req *executor.InvocationRequest) (*executor res.Output = fmt.Sprintf("%s\n%s", string(stdout), string(stderr)) } return res, time.Now().Sub(t0), nil - } else if wasiRunner.wasiType == WASI_TYPE_COMPONENT { + } else if wr.wasiType == WASI_TYPE_COMPONENT { // Create wasmtime CLI command - execCmd := exec.Command("wasmtime", wasiRunner.cliArgs...) + execCmd := exec.Command("wasmtime", wr.cliArgs...) // Save stdout and stderr to another buffer var stdoutBuffer, stderrBuffer bytes.Buffer diff --git a/internal/container/wasi.go b/internal/container/wasi.go index 12ddaf4a..a8cc927a 100644 --- a/internal/container/wasi.go +++ b/internal/container/wasi.go @@ -8,6 +8,7 @@ import ( "os" "path/filepath" "strings" + "sync" "github.com/bytecodealliance/wasmtime-go/v25" "github.com/grussorusso/serverledge/utils" @@ -17,6 +18,7 @@ type WasiType string const WASI_TYPE_MODULE WasiType = "module" const WASI_TYPE_COMPONENT WasiType = "component" +const WASI_TYPE_UNDEFINED WasiType = "undefined" type WasiFactory struct { ctx context.Context @@ -25,16 +27,15 @@ type WasiFactory struct { } type wasiRunner struct { - wasiType WasiType // WasiModule is executed using wasmtime-go; WasiComponent using Wasmtime CLI - env []string // List of KEY=VALUE pairs - dir string // Wasm Directory - mount string // Wasm Directory is preloaded to this mount-point + lock sync.Mutex // Mutex used to have a single untar-ed copy of the runner + wasiType WasiType // WasiModule is executed using wasmtime-go; WasiComponent using Wasmtime CLI // WASI Module Specifics - store *wasmtime.Store // Group of WASM instances - linker *wasmtime.Linker // Used to instantiate module - module *wasmtime.Module // Compiled WASM - stdout *os.File // Temporary file for the stdout - stderr *os.File // Temporary file for the stderr + envKeys, envValues []string // List of environment variables keys and values + dir, mount string // Wasm Directory and its mount point + linker *wasmtime.Linker // Used to instantiate module + module *wasmtime.Module // Compiled WASM + stdout *os.File // Temporary file for the stdout + stderr *os.File // Temporary file for the stderr // WASI Component Specifics cliArgs []string } @@ -46,9 +47,6 @@ func (wr *wasiRunner) Close() { if wr.linker != nil { wr.linker.Close() } - if wr.store != nil { - wr.store.Close() - } if wr.stdout != nil { wr.stdout.Close() if err := os.Remove(wr.stdout.Name()); err != nil { @@ -93,27 +91,53 @@ func InitWasiFactory() *WasiFactory { // Image is the ID // NOTE: this approach requires Runtime to be set to wasi and CustomImage to an identifier (e.g. function name) func (wf *WasiFactory) Create(image string, opts *ContainerOptions) (ContainerID, error) { - // Create the new runner - wf.runners[image] = &wasiRunner{env: opts.Env} + // Create new runner if it does not exists + if _, ok := wf.runners[image]; !ok { + var envKeys, envVals, cliArgs []string + for _, v := range opts.Env { + cliArgs = append(cliArgs, "--env", v) + // Splitting the env array to separate keys and values + // Assuming env is formatted correctly: KEY=VALUE + split := strings.Split(v, "=") + key := split[0] + value := split[1] + envKeys = append(envKeys, key) + envVals = append(envVals, value) + } + + wf.runners[image] = &wasiRunner{ + envKeys: envKeys, + envValues: envVals, + cliArgs: cliArgs, + wasiType: WASI_TYPE_UNDEFINED, + } + } return image, nil } // Untar the decoded function code into a temporary directory func (wf *WasiFactory) CopyToContainer(contID ContainerID, content io.Reader, destPath string) error { + wr := wf.runners[contID] + wr.lock.Lock() + defer wr.lock.Unlock() + // Code was already copied by another thread + if wr.dir != "" { + return nil + } // Create temporary directory to store untar-ed wasm file dir, err := os.MkdirTemp("", contID) if err != nil { return fmt.Errorf("[WasiFactory] Failed to create temporary directory for %s: %v", contID, err) } - // Save directory name - wf.runners[contID].dir = dir // Untar code if err := utils.Untar(content, dir); err != nil { return fmt.Errorf("[WasiFactory] Failed to untar code for %s: %v", contID, err) } // NOTE: hard-coding `destPath` as `/` // this is required to correctly use the official Python interpreter - wf.runners[contID].mount = "/" + wr.mount = "/" + wr.dir = dir + wr.cliArgs = append(wr.cliArgs, "--dir", wr.dir+"::"+wr.mount) return nil } @@ -122,90 +146,43 @@ func (wf *WasiFactory) CopyToContainer(contID ContainerID, content io.Reader, de // NOTE: using contID (set as custom_image from CLI as the wasm filename inside the tar) func (wf *WasiFactory) Start(contID ContainerID) error { // Get the wasi runner - wasiRunner, ok := wf.runners[contID] + wr, ok := wf.runners[contID] if !ok { return fmt.Errorf("[WasiFactory]: no runner with %s found", contID) } - - // Create new store - wasiRunner.store = wasmtime.NewStore(wf.engine) - // Create WASI Configuration - wasiConfig := wasmtime.NewWasiConfig() - - // Create temporary files for stdout and stderr for this function - stdout, err := os.CreateTemp("", fmt.Sprintf("%s-stdout", contID)) - if err != nil { - return fmt.Errorf("[WasiFactory]: failed to create temp stdout file for %s: %v", contID, err) - } - stderr, err := os.CreateTemp("", fmt.Sprintf("%s-stdout", contID)) - if err != nil { - return fmt.Errorf("[WasiFactory]: failed to create temp stderr file for %s: %v", contID, err) - } - // Set wasmtime to use the temporary files for stdout and stderr - wasiConfig.SetStdoutFile(stdout.Name()) - wasiConfig.SetStderrFile(stderr.Name()) - // Save the references to the temporary files - wasiRunner.stdout = stdout - wasiRunner.stderr = stderr - - // Mount the temporary directory to the specified mount point - if err := wasiConfig.PreopenDir(wasiRunner.dir, wasiRunner.mount); err != nil { - return fmt.Errorf("[WasiFactory] Failed to preopen %s: %v", wasiRunner.mount, err) - } - - // Splitting the env array to separate keys and values - // Assuming env is formatted correctly: KEY=VALUE - var envKeys, envVals []string - for _, v := range wasiRunner.env { - split := strings.Split(v, "=") - key := split[0] - value := split[1] - envKeys = append(envKeys, key) - envVals = append(envVals, value) + wr.lock.Lock() + defer wr.lock.Unlock() + // File was already compiled by another thread + if wr.wasiType != WASI_TYPE_UNDEFINED { + return nil } - // Set environment variables in WASI - wasiConfig.SetEnv(envKeys, envVals) - - // Save the WASI Configuration to the store - wasiRunner.store.SetWasi(wasiConfig) // Create a linker - wasiRunner.linker = wasmtime.NewLinker(wf.engine) - if err := wasiRunner.linker.DefineWasi(); err != nil { - wasiRunner.Close() + wr.linker = wasmtime.NewLinker(wf.engine) + if err := wr.linker.DefineWasi(); err != nil { + wr.Close() return fmt.Errorf("[WasiFactory] Failed to define WASI in the linker for %s: %v", contID, err) } // Determine wasm file name - wasmFileName := filepath.Join(wasiRunner.dir, contID+".wasm") + wasmFileName := filepath.Join(wr.dir, contID+".wasm") - // Read module code - moduleData, err := os.ReadFile(wasmFileName) - if err != nil { - wasiRunner.Close() - return fmt.Errorf("[WasiFactory] Failed to read the WASI code for %s: %v", contID, err) - } // Try to compile the WASI Module - module, err := wasmtime.NewModule(wf.engine, moduleData) + module, err := wasmtime.NewModuleFromFile(wf.engine, wasmFileName) if err != nil { - if strings.HasPrefix(err.Error(), "failed to parse WebAssembly module") { + if strings.HasPrefix(err.Error(), "expected a WebAssembly module but was given a WebAssembly component") { // File is a WASI Component - wasiRunner.cliArgs = append(wasiRunner.cliArgs, "--dir", wasiRunner.dir+"::"+wasiRunner.mount) - for _, v := range wasiRunner.env { - wasiRunner.cliArgs = append(wasiRunner.cliArgs, "--env") - wasiRunner.cliArgs = append(wasiRunner.cliArgs, v) - } - wasiRunner.cliArgs = append(wasiRunner.cliArgs, wasmFileName) - wasiRunner.wasiType = WASI_TYPE_COMPONENT + wr.cliArgs = append(wr.cliArgs, wasmFileName) + wr.wasiType = WASI_TYPE_COMPONENT return nil } // There was another error; wasm file is incorrect - wasiRunner.Close() + wr.Close() return fmt.Errorf("[WasiFactory] Failed to create WASI Module for %s: %v", contID, err) } // File was compiled successfully - wasiRunner.module = module - wasiRunner.wasiType = WASI_TYPE_MODULE + wr.module = module + wr.wasiType = WASI_TYPE_MODULE return nil } @@ -236,3 +213,34 @@ func (wf *WasiFactory) GetMemoryMB(id ContainerID) (int64, error) { log.Println("[WasiFactory] GetMemoryMB unimplemented") return 0, nil } + +// Utility function to create a Wasi Configuration for this runner +// The WasiConfiguration cannot be shared among threads because it's not thread-safe +func (wr *wasiRunner) BuildWasiConfiguration(contID ContainerID) (*wasmtime.WasiConfig, error) { + // Create new Wasi Configuration + wasiConfig := wasmtime.NewWasiConfig() + // Set environment variables + wasiConfig.SetEnv(wr.envKeys, wr.envValues) + + // Create temporary files for stdout and stderr for this function + stdout, err := os.CreateTemp("", fmt.Sprintf("%s-stdout", contID)) + if err != nil { + return nil, fmt.Errorf("[WasiRunner]: failed to create temp stdout file for %s: %v", contID, err) + } + stderr, err := os.CreateTemp("", fmt.Sprintf("%s-stdout", contID)) + if err != nil { + return nil, fmt.Errorf("[WasiRunner]: failed to create temp stderr file for %s: %v", contID, err) + } + // Set wasmtime to use the temporary files for stdout and stderr + wasiConfig.SetStdoutFile(stdout.Name()) + wasiConfig.SetStderrFile(stderr.Name()) + // Save the references to the temporary files + wr.stdout = stdout + wr.stderr = stderr + // Mount the temporary directory to the specified mount point + if err := wasiConfig.PreopenDir(wr.dir, wr.mount); err != nil { + return nil, fmt.Errorf("[WasiRunner] Failed to preopen %s: %v", wr.mount, err) + } + + return wasiConfig, nil +} From 4b12e6bc8cc4b93833f1fa51650002f45dce785f Mon Sep 17 00:00:00 2001 From: Alessandro Lioi Date: Mon, 28 Oct 2024 18:12:31 +0100 Subject: [PATCH 13/29] feat(wasi): use function Handler as argv (used for Python execution) --- internal/container/container.go | 5 +++-- internal/container/wasi.go | 14 +++++++++++--- internal/scheduling/execution.go | 1 + 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/internal/container/container.go b/internal/container/container.go index ce39c51e..513312e2 100644 --- a/internal/container/container.go +++ b/internal/container/container.go @@ -74,7 +74,7 @@ func wasiExecute(contID ContainerID, req *executor.InvocationRequest) (*executor if wr.wasiType == WASI_TYPE_MODULE { // Create a new Wasi Configuration - wasiConfig, err := wr.BuildWasiConfiguration(contID) + wasiConfig, err := wr.BuildWasiConfiguration(contID, req.Handler) if err != nil { return nil, time.Now().Sub(t0), err } @@ -122,7 +122,8 @@ func wasiExecute(contID ContainerID, req *executor.InvocationRequest) (*executor return res, time.Now().Sub(t0), nil } else if wr.wasiType == WASI_TYPE_COMPONENT { // Create wasmtime CLI command - execCmd := exec.Command("wasmtime", wr.cliArgs...) + args := append(wr.cliArgs, req.Handler) + execCmd := exec.Command("wasmtime", args...) // Save stdout and stderr to another buffer var stdoutBuffer, stderrBuffer bytes.Buffer diff --git a/internal/container/wasi.go b/internal/container/wasi.go index a8cc927a..42398f22 100644 --- a/internal/container/wasi.go +++ b/internal/container/wasi.go @@ -216,7 +216,7 @@ func (wf *WasiFactory) GetMemoryMB(id ContainerID) (int64, error) { // Utility function to create a Wasi Configuration for this runner // The WasiConfiguration cannot be shared among threads because it's not thread-safe -func (wr *wasiRunner) BuildWasiConfiguration(contID ContainerID) (*wasmtime.WasiConfig, error) { +func (wr *wasiRunner) BuildWasiConfiguration(contID ContainerID, handler string) (*wasmtime.WasiConfig, error) { // Create new Wasi Configuration wasiConfig := wasmtime.NewWasiConfig() // Set environment variables @@ -232,8 +232,12 @@ func (wr *wasiRunner) BuildWasiConfiguration(contID ContainerID) (*wasmtime.Wasi return nil, fmt.Errorf("[WasiRunner]: failed to create temp stderr file for %s: %v", contID, err) } // Set wasmtime to use the temporary files for stdout and stderr - wasiConfig.SetStdoutFile(stdout.Name()) - wasiConfig.SetStderrFile(stderr.Name()) + if err := wasiConfig.SetStdoutFile(stdout.Name()); err != nil { + return nil, fmt.Errorf("[WasiRunner] Failed to set stdout file: %v", err) + } + if err := wasiConfig.SetStderrFile(stderr.Name()); err != nil { + return nil, fmt.Errorf("[WasiRunner] Failed to set stderr file: %v", err) + } // Save the references to the temporary files wr.stdout = stdout wr.stderr = stderr @@ -242,5 +246,9 @@ func (wr *wasiRunner) BuildWasiConfiguration(contID ContainerID) (*wasmtime.Wasi return nil, fmt.Errorf("[WasiRunner] Failed to preopen %s: %v", wr.mount, err) } + if handler != "" { + wasiConfig.SetArgv([]string{"", handler}) + } + return wasiConfig, nil } diff --git a/internal/scheduling/execution.go b/internal/scheduling/execution.go index 26f444e1..5ab7e8d6 100644 --- a/internal/scheduling/execution.go +++ b/internal/scheduling/execution.go @@ -20,6 +20,7 @@ func Execute(contID container.ContainerID, r *scheduledRequest, isWarm bool) (fu req = executor.InvocationRequest{ Params: r.Params, ReturnOutput: r.ReturnOutput, + Handler: r.Fun.Handler, // NOTE: this is required by Wasi for Python } } else { cmd := container.RuntimeToInfo[r.Fun.Runtime].InvocationCmd From d90667f3996c82379e7d34d55dc8ee2bb539a5e9 Mon Sep 17 00:00:00 2001 From: Alessandro Lioi Date: Mon, 28 Oct 2024 18:38:57 +0100 Subject: [PATCH 14/29] fix(wasi): stdout and stderr temp file removal after execution --- internal/container/container.go | 10 +++--- internal/container/wasi.go | 60 ++++++++++++++++++++------------- 2 files changed, 42 insertions(+), 28 deletions(-) diff --git a/internal/container/container.go b/internal/container/container.go index 513312e2..46a16de0 100644 --- a/internal/container/container.go +++ b/internal/container/container.go @@ -74,15 +74,15 @@ func wasiExecute(contID ContainerID, req *executor.InvocationRequest) (*executor if wr.wasiType == WASI_TYPE_MODULE { // Create a new Wasi Configuration - wasiConfig, err := wr.BuildWasiConfiguration(contID, req.Handler) + wcc, err := wr.BuildWasiConfiguration(contID, req.Handler) if err != nil { return nil, time.Now().Sub(t0), err } - defer wasiConfig.Close() + defer wcc.Close() // Create new store for this module store := wasmtime.NewStore(wf.engine) - store.SetWasi(wasiConfig) + store.SetWasi(wcc.wasiConfig) defer store.Close() // Create an instance of the module @@ -103,13 +103,13 @@ func wasiExecute(contID ContainerID, req *executor.InvocationRequest) (*executor } // Read stdout from the temp file - stdout, err := io.ReadAll(wr.stdout) + stdout, err := io.ReadAll(wcc.stdout) if err != nil { return nil, time.Now().Sub(t0), fmt.Errorf("Failed to read stdout for WASI: %v", err) } // Read stderr from the temp file - stderr, err := io.ReadAll(wr.stderr) + stderr, err := io.ReadAll(wcc.stderr) if err != nil { return nil, time.Now().Sub(t0), fmt.Errorf("Failed to read stderr for WASI: %v", err) } diff --git a/internal/container/wasi.go b/internal/container/wasi.go index 42398f22..89a3348a 100644 --- a/internal/container/wasi.go +++ b/internal/container/wasi.go @@ -34,12 +34,16 @@ type wasiRunner struct { dir, mount string // Wasm Directory and its mount point linker *wasmtime.Linker // Used to instantiate module module *wasmtime.Module // Compiled WASM - stdout *os.File // Temporary file for the stdout - stderr *os.File // Temporary file for the stderr // WASI Component Specifics cliArgs []string } +// Utility struct to keep configuration and temporary files +type wasiCustomConfig struct { + wasiConfig *wasmtime.WasiConfig // Actual Configuration + stdout, stderr *os.File // Temporary files for stdout and stderr +} + func (wr *wasiRunner) Close() { if wr.module != nil { wr.module.Close() @@ -47,21 +51,27 @@ func (wr *wasiRunner) Close() { if wr.linker != nil { wr.linker.Close() } - if wr.stdout != nil { - wr.stdout.Close() - if err := os.Remove(wr.stdout.Name()); err != nil { - log.Printf("[WasiFactory] Failed to delete temporary stdout file: %v", err) + if wr.dir != "" { + if err := os.RemoveAll(wr.dir); err != nil { + log.Printf("[WasiFactory] Failed to delete temporary directory: %v", err) } } - if wr.stderr != nil { - wr.stderr.Close() - if err := os.Remove(wr.stderr.Name()); err != nil { - log.Printf("[WasiFactory] Failed to delete temporary stderr file: %v", err) +} + +func (wcc *wasiCustomConfig) Close() { + if wcc.wasiConfig != nil { + wcc.wasiConfig.Close() + } + if wcc.stdout != nil { + wcc.stdout.Close() + if err := os.Remove(wcc.stdout.Name()); err != nil { + log.Printf("[WasiCustomConfig] Failed to remove stdout %s: %v", wcc.stdout.Name(), err) } } - if wr.dir != "" { - if err := os.RemoveAll(wr.dir); err != nil { - log.Printf("[WasiFactory] Failed to delete temporary directory: %v", err) + if wcc.stderr != nil { + wcc.stderr.Close() + if err := os.Remove(wcc.stderr.Name()); err != nil { + log.Printf("[WasiCustomConfig] Failed to remove stderr %s: %v", wcc.stderr.Name(), err) } } } @@ -216,7 +226,8 @@ func (wf *WasiFactory) GetMemoryMB(id ContainerID) (int64, error) { // Utility function to create a Wasi Configuration for this runner // The WasiConfiguration cannot be shared among threads because it's not thread-safe -func (wr *wasiRunner) BuildWasiConfiguration(contID ContainerID, handler string) (*wasmtime.WasiConfig, error) { +func (wr *wasiRunner) BuildWasiConfiguration(contID ContainerID, handler string) (wasiCustomConfig, error) { + var wcc wasiCustomConfig // Create new Wasi Configuration wasiConfig := wasmtime.NewWasiConfig() // Set environment variables @@ -225,30 +236,33 @@ func (wr *wasiRunner) BuildWasiConfiguration(contID ContainerID, handler string) // Create temporary files for stdout and stderr for this function stdout, err := os.CreateTemp("", fmt.Sprintf("%s-stdout", contID)) if err != nil { - return nil, fmt.Errorf("[WasiRunner]: failed to create temp stdout file for %s: %v", contID, err) + return wcc, fmt.Errorf("[WasiRunner]: failed to create temp stdout file for %s: %v", contID, err) } stderr, err := os.CreateTemp("", fmt.Sprintf("%s-stdout", contID)) if err != nil { - return nil, fmt.Errorf("[WasiRunner]: failed to create temp stderr file for %s: %v", contID, err) + return wcc, fmt.Errorf("[WasiRunner]: failed to create temp stderr file for %s: %v", contID, err) } + // Set wasmtime to use the temporary files for stdout and stderr if err := wasiConfig.SetStdoutFile(stdout.Name()); err != nil { - return nil, fmt.Errorf("[WasiRunner] Failed to set stdout file: %v", err) + return wcc, fmt.Errorf("[WasiRunner] Failed to set stdout file: %v", err) } if err := wasiConfig.SetStderrFile(stderr.Name()); err != nil { - return nil, fmt.Errorf("[WasiRunner] Failed to set stderr file: %v", err) + return wcc, fmt.Errorf("[WasiRunner] Failed to set stderr file: %v", err) } - // Save the references to the temporary files - wr.stdout = stdout - wr.stderr = stderr + // Mount the temporary directory to the specified mount point if err := wasiConfig.PreopenDir(wr.dir, wr.mount); err != nil { - return nil, fmt.Errorf("[WasiRunner] Failed to preopen %s: %v", wr.mount, err) + return wcc, fmt.Errorf("[WasiRunner] Failed to preopen %s: %v", wr.mount, err) } if handler != "" { wasiConfig.SetArgv([]string{"", handler}) } - return wasiConfig, nil + // Save references into custom configuration + wcc.wasiConfig = wasiConfig + wcc.stdout = stdout + wcc.stderr = stderr + return wcc, nil } From 8551335f7e3c561d897509fd30d0a3ccc91ce3f8 Mon Sep 17 00:00:00 2001 From: Alessandro Lioi Date: Wed, 30 Oct 2024 09:49:58 +0100 Subject: [PATCH 15/29] feat(wasi): pass params as json string (not tested) --- internal/container/container.go | 14 +++++++++++++- internal/container/wasi.go | 12 ++++++++++-- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/internal/container/container.go b/internal/container/container.go index 46a16de0..9306310b 100644 --- a/internal/container/container.go +++ b/internal/container/container.go @@ -72,9 +72,18 @@ func wasiExecute(contID ContainerID, req *executor.InvocationRequest) (*executor wr := wf.runners[contID] t0 := time.Now() + var paramsBytes []byte + if req.Params != nil { + var err error + paramsBytes, err = json.Marshal(req.Params) + if err != nil { + return nil, time.Now().Sub(t0), fmt.Errorf("Failed to convert params to JSON: %v", err) + } + } + if wr.wasiType == WASI_TYPE_MODULE { // Create a new Wasi Configuration - wcc, err := wr.BuildWasiConfiguration(contID, req.Handler) + wcc, err := wr.BuildWasiConfig(contID, req.Handler, string(paramsBytes)) if err != nil { return nil, time.Now().Sub(t0), err } @@ -123,6 +132,9 @@ func wasiExecute(contID ContainerID, req *executor.InvocationRequest) (*executor } else if wr.wasiType == WASI_TYPE_COMPONENT { // Create wasmtime CLI command args := append(wr.cliArgs, req.Handler) + if len(paramsBytes) > 0 { + args = append(args, string(paramsBytes)) + } execCmd := exec.Command("wasmtime", args...) // Save stdout and stderr to another buffer diff --git a/internal/container/wasi.go b/internal/container/wasi.go index 89a3348a..692fb252 100644 --- a/internal/container/wasi.go +++ b/internal/container/wasi.go @@ -226,7 +226,7 @@ func (wf *WasiFactory) GetMemoryMB(id ContainerID) (int64, error) { // Utility function to create a Wasi Configuration for this runner // The WasiConfiguration cannot be shared among threads because it's not thread-safe -func (wr *wasiRunner) BuildWasiConfiguration(contID ContainerID, handler string) (wasiCustomConfig, error) { +func (wr *wasiRunner) BuildWasiConfig(contID ContainerID, handler string, params string) (wasiCustomConfig, error) { var wcc wasiCustomConfig // Create new Wasi Configuration wasiConfig := wasmtime.NewWasiConfig() @@ -256,9 +256,17 @@ func (wr *wasiRunner) BuildWasiConfiguration(contID ContainerID, handler string) return wcc, fmt.Errorf("[WasiRunner] Failed to preopen %s: %v", wr.mount, err) } + // Create argv (first element is usually the program name, leaving empty) + argv := []string{""} if handler != "" { - wasiConfig.SetArgv([]string{"", handler}) + // Add handler if available (used in Python for the source file) + argv = append(argv, handler) } + // Add additional params as a JSON string + argv = append(argv, params) + + // Set argv in Wasi + wasiConfig.SetArgv(argv) // Save references into custom configuration wcc.wasiConfig = wasiConfig From 5961632704e4543cbf9928909a495a92632c1e68 Mon Sep 17 00:00:00 2001 From: Alessandro Lioi Date: Wed, 30 Oct 2024 09:50:53 +0100 Subject: [PATCH 16/29] feat(wasi): remove MemoryMB parameter in a Wasi function --- internal/api/api.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/internal/api/api.go b/internal/api/api.go index 34d676bb..f8c2e78e 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -142,6 +142,11 @@ func CreateFunction(c echo.Context) error { log.Printf("New request: creation of %s\n", f.Name) + if f.Runtime == container.WASI_RUNTIME { + // Dropping memory requirements because it cannot be enforced in Wasi + f.MemoryMB = 0 + } + // Check that the selected runtime exists if f.Runtime != container.CUSTOM_RUNTIME && f.Runtime != container.WASI_RUNTIME { _, ok := container.RuntimeToInfo[f.Runtime] From 61b051a0d352d8a43704dfd03afdc943ea80467f Mon Sep 17 00:00:00 2001 From: Alessandro Lioi Date: Thu, 31 Oct 2024 10:51:05 +0100 Subject: [PATCH 17/29] fix(wasi): correctly pass handler to argv --- internal/container/container.go | 2 +- internal/container/wasi.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/container/container.go b/internal/container/container.go index 9306310b..6d94f0c6 100644 --- a/internal/container/container.go +++ b/internal/container/container.go @@ -131,7 +131,7 @@ func wasiExecute(contID ContainerID, req *executor.InvocationRequest) (*executor return res, time.Now().Sub(t0), nil } else if wr.wasiType == WASI_TYPE_COMPONENT { // Create wasmtime CLI command - args := append(wr.cliArgs, req.Handler) + args := append(wr.cliArgs, wr.mount+req.Handler) if len(paramsBytes) > 0 { args = append(args, string(paramsBytes)) } diff --git a/internal/container/wasi.go b/internal/container/wasi.go index 692fb252..4ee46fab 100644 --- a/internal/container/wasi.go +++ b/internal/container/wasi.go @@ -260,7 +260,7 @@ func (wr *wasiRunner) BuildWasiConfig(contID ContainerID, handler string, params argv := []string{""} if handler != "" { // Add handler if available (used in Python for the source file) - argv = append(argv, handler) + argv = append(argv, wr.mount+handler) } // Add additional params as a JSON string argv = append(argv, params) From 47f391c6e73cca964e32cd88ef4b884947067eee Mon Sep 17 00:00:00 2001 From: Alessandro Lioi Date: Thu, 31 Oct 2024 14:25:01 +0100 Subject: [PATCH 18/29] feat(wasi): use RWMutex instead of Mutex; download in `CopyToContainer` --- internal/container/container.go | 25 ++----- internal/container/wasi.go | 112 +++++++++++++++++++++++--------- 2 files changed, 87 insertions(+), 50 deletions(-) diff --git a/internal/container/container.go b/internal/container/container.go index 6d94f0c6..29792566 100644 --- a/internal/container/container.go +++ b/internal/container/container.go @@ -8,7 +8,6 @@ import ( "io" "log" "net/http" - "net/url" "os/exec" "time" @@ -18,7 +17,7 @@ import ( ) // NewContainer creates and starts a new container. -func NewContainer(image, base64Src string, opts *ContainerOptions, f *function.Function) (ContainerID, error) { +func NewContainer(image, codeTar string, opts *ContainerOptions, f *function.Function) (ContainerID, error) { cf := GetFactoryFromFunction(f) contID, err := cf.Create(image, opts) if err != nil { @@ -26,25 +25,9 @@ func NewContainer(image, base64Src string, opts *ContainerOptions, f *function.F return "", err } - if len(base64Src) > 0 { - var r io.Reader - // Decoding src - decodedSrc, _ := base64.StdEncoding.DecodeString(base64Src) - // Check if decoded src is a url - u, err := url.ParseRequestURI(string(decodedSrc)) - if err == nil && u.Scheme != "" && u.Host != "" { - // src is url; it has to be downloaded - resp, err := http.Get(string(decodedSrc)) - if err != nil { - log.Printf("Failed to download code %s", decodedSrc) - return "", err - } - r = resp.Body - } else { - // assuming decodedSrc is Base64 encoded tar - r = bytes.NewReader(decodedSrc) - } - err = cf.CopyToContainer(contID, r, "/app/") + if len(codeTar) > 0 { + decodedCode, _ := base64.StdEncoding.DecodeString(codeTar) + err = cf.CopyToContainer(contID, bytes.NewReader(decodedCode), "/app/") if err != nil { log.Printf("Failed code copy\n") return "", err diff --git a/internal/container/wasi.go b/internal/container/wasi.go index 4ee46fab..fe5cbc68 100644 --- a/internal/container/wasi.go +++ b/internal/container/wasi.go @@ -1,10 +1,13 @@ package container import ( + "bytes" "context" "fmt" "io" "log" + "net/http" + "net/url" "os" "path/filepath" "strings" @@ -24,11 +27,12 @@ type WasiFactory struct { ctx context.Context runners map[string]*wasiRunner engine *wasmtime.Engine + lock sync.RWMutex } type wasiRunner struct { - lock sync.Mutex // Mutex used to have a single untar-ed copy of the runner - wasiType WasiType // WasiModule is executed using wasmtime-go; WasiComponent using Wasmtime CLI + lock sync.RWMutex // Mutex used to have a single untar-ed copy of the runner + wasiType WasiType // WasiModule is executed using wasmtime-go; WasiComponent using Wasmtime CLI // WASI Module Specifics envKeys, envValues []string // List of environment variables keys and values dir, mount string // Wasm Directory and its mount point @@ -90,7 +94,11 @@ func InitWasiFactory() *WasiFactory { engine := wasmtime.NewEngineWithConfig(engineConfig) // Create the factory - wasiFactory := &WasiFactory{ctx, make(map[string]*wasiRunner), engine} + wasiFactory := &WasiFactory{ + ctx: ctx, + runners: make(map[string]*wasiRunner), + engine: engine, + } if factories == nil { factories = make(map[string]Factory) } @@ -101,26 +109,36 @@ func InitWasiFactory() *WasiFactory { // Image is the ID // NOTE: this approach requires Runtime to be set to wasi and CustomImage to an identifier (e.g. function name) func (wf *WasiFactory) Create(image string, opts *ContainerOptions) (ContainerID, error) { + // Check if runner already exists + wf.lock.RLock() + _, ok := wf.runners[image] + if ok { + wf.lock.RUnlock() + return image, nil + } + wf.lock.RUnlock() + + // Runner does not exists, creating new one + wf.lock.Lock() + defer wf.lock.Unlock() // Create new runner if it does not exists - if _, ok := wf.runners[image]; !ok { - var envKeys, envVals, cliArgs []string - for _, v := range opts.Env { - cliArgs = append(cliArgs, "--env", v) - // Splitting the env array to separate keys and values - // Assuming env is formatted correctly: KEY=VALUE - split := strings.Split(v, "=") - key := split[0] - value := split[1] - envKeys = append(envKeys, key) - envVals = append(envVals, value) - } + var envKeys, envVals, cliArgs []string + for _, v := range opts.Env { + cliArgs = append(cliArgs, "--env", v) + // Splitting the env array to separate keys and values + // Assuming env is formatted correctly: KEY=VALUE + split := strings.Split(v, "=") + key := split[0] + value := split[1] + envKeys = append(envKeys, key) + envVals = append(envVals, value) + } - wf.runners[image] = &wasiRunner{ - envKeys: envKeys, - envValues: envVals, - cliArgs: cliArgs, - wasiType: WASI_TYPE_UNDEFINED, - } + wf.runners[image] = &wasiRunner{ + envKeys: envKeys, + envValues: envVals, + cliArgs: cliArgs, + wasiType: WASI_TYPE_UNDEFINED, } return image, nil } @@ -128,12 +146,46 @@ func (wf *WasiFactory) Create(image string, opts *ContainerOptions) (ContainerID // Untar the decoded function code into a temporary directory func (wf *WasiFactory) CopyToContainer(contID ContainerID, content io.Reader, destPath string) error { wr := wf.runners[contID] - wr.lock.Lock() - defer wr.lock.Unlock() + wr.lock.RLock() // Code was already copied by another thread if wr.dir != "" { + wr.lock.RUnlock() return nil } + wr.lock.RUnlock() + + // Code has to be downloaded, getting a write lock + wr.lock.Lock() + defer wr.lock.Unlock() + + // Additional buffer, used to determine if it's a URL or not + var buffer bytes.Buffer + + // Create new reader that reads from content and writes to buffer + teeReader := io.TeeReader(content, &buffer) + + // Read from the newly created reader + data, err := io.ReadAll(teeReader) + if err != nil { + return fmt.Errorf("[WasiFactory] Failed to read content: %v", err) + } + // Restore content value (assuming code is a tar) + content = &buffer + + // Check if data is a url + u, err := url.ParseRequestURI(string(data)) + if err == nil && u.Scheme != "" && u.Host != "" { + // data is url; it has to be downloaded + resp, err := http.Get(string(data)) + if err != nil { + return fmt.Errorf("[WasiFactory] Failed to download code for %s: %v", contID, err) + } + defer resp.Body.Close() + + // Content is now the downloaded tar file + content = resp.Body + } + // Create temporary directory to store untar-ed wasm file dir, err := os.MkdirTemp("", contID) if err != nil { @@ -156,16 +208,18 @@ func (wf *WasiFactory) CopyToContainer(contID ContainerID, content io.Reader, de // NOTE: using contID (set as custom_image from CLI as the wasm filename inside the tar) func (wf *WasiFactory) Start(contID ContainerID) error { // Get the wasi runner - wr, ok := wf.runners[contID] - if !ok { - return fmt.Errorf("[WasiFactory]: no runner with %s found", contID) - } - wr.lock.Lock() - defer wr.lock.Unlock() + wr := wf.runners[contID] + wr.lock.RLock() + // File was already compiled by another thread if wr.wasiType != WASI_TYPE_UNDEFINED { + wr.lock.RUnlock() return nil } + wr.lock.RUnlock() + + wr.lock.Lock() + defer wr.lock.Unlock() // Create a linker wr.linker = wasmtime.NewLinker(wf.engine) From 2e17748a1550ce6232101ec2a3f57b20c1ab3758 Mon Sep 17 00:00:00 2001 From: Alessandro Lioi Date: Sat, 2 Nov 2024 14:37:12 +0100 Subject: [PATCH 19/29] feat(wasi): replace locks with sync.Map and sync.Once for initialization --- internal/container/container.go | 27 ++-- internal/container/wasi.go | 211 +++++++++++++++----------------- 2 files changed, 114 insertions(+), 124 deletions(-) diff --git a/internal/container/container.go b/internal/container/container.go index 29792566..7928ad25 100644 --- a/internal/container/container.go +++ b/internal/container/container.go @@ -9,6 +9,7 @@ import ( "log" "net/http" "os/exec" + "runtime" "time" "github.com/bytecodealliance/wasmtime-go/v25" @@ -52,18 +53,24 @@ func Execute(contID ContainerID, req *executor.InvocationRequest, f *function.Fu func wasiExecute(contID ContainerID, req *executor.InvocationRequest) (*executor.InvocationResult, time.Duration, error) { wf := factories[WASI_FACTORY_KEY].(*WasiFactory) - wr := wf.runners[contID] - t0 := time.Now() + wrValue, _ := wf.runners.Load(contID) + wr := wrValue.(*wasiRunner) + + if wr.wasiType == WASI_TYPE_UNDEFINED { + return nil, 0, fmt.Errorf("Unrecognized WASI Type") + } var paramsBytes []byte if req.Params != nil { var err error paramsBytes, err = json.Marshal(req.Params) if err != nil { - return nil, time.Now().Sub(t0), fmt.Errorf("Failed to convert params to JSON: %v", err) + return nil, 0, fmt.Errorf("Failed to convert params to JSON: %v", err) } } + res := &executor.InvocationResult{Success: false} + t0 := time.Now() if wr.wasiType == WASI_TYPE_MODULE { // Create a new Wasi Configuration wcc, err := wr.BuildWasiConfig(contID, req.Handler, string(paramsBytes)) @@ -107,11 +114,11 @@ func wasiExecute(contID ContainerID, req *executor.InvocationRequest) (*executor } // Populate result - res := &executor.InvocationResult{Success: true, Result: string(stdout)} + res.Success = true + res.Result = string(stdout) if req.ReturnOutput { res.Output = fmt.Sprintf("%s\n%s", string(stdout), string(stderr)) } - return res, time.Now().Sub(t0), nil } else if wr.wasiType == WASI_TYPE_COMPONENT { // Create wasmtime CLI command args := append(wr.cliArgs, wr.mount+req.Handler) @@ -143,14 +150,14 @@ func wasiExecute(contID ContainerID, req *executor.InvocationRequest) (*executor } // Create response - resp := &executor.InvocationResult{Success: err == nil, Result: string(stdout)} + res.Success = err == nil + res.Result = string(stdout) if req.ReturnOutput { - resp.Output = fmt.Sprintf("%s\n%s", string(stdout), string(stderr)) + res.Output = fmt.Sprintf("%s\n%s", string(stdout), string(stderr)) } - return resp, time.Now().Sub(t0), nil - } else { - return nil, 0, fmt.Errorf("Unrecognized WASI Type") } + + return res, time.Now().Sub(t0), nil } // Execute interacts with the Executor running in the container to invoke the diff --git a/internal/container/wasi.go b/internal/container/wasi.go index fe5cbc68..d91536d3 100644 --- a/internal/container/wasi.go +++ b/internal/container/wasi.go @@ -25,14 +25,14 @@ const WASI_TYPE_UNDEFINED WasiType = "undefined" type WasiFactory struct { ctx context.Context - runners map[string]*wasiRunner + runners sync.Map // ContainerID -> *wasiRunner engine *wasmtime.Engine - lock sync.RWMutex } type wasiRunner struct { - lock sync.RWMutex // Mutex used to have a single untar-ed copy of the runner - wasiType WasiType // WasiModule is executed using wasmtime-go; WasiComponent using Wasmtime CLI + copyInit, startInit sync.Once // Single initialization + + wasiType WasiType // WasiModule is executed using wasmtime-go; WasiComponent using Wasmtime CLI // WASI Module Specifics envKeys, envValues []string // List of environment variables keys and values dir, mount string // Wasm Directory and its mount point @@ -94,11 +94,7 @@ func InitWasiFactory() *WasiFactory { engine := wasmtime.NewEngineWithConfig(engineConfig) // Create the factory - wasiFactory := &WasiFactory{ - ctx: ctx, - runners: make(map[string]*wasiRunner), - engine: engine, - } + wasiFactory := &WasiFactory{ctx: ctx, engine: engine} if factories == nil { factories = make(map[string]Factory) } @@ -109,19 +105,10 @@ func InitWasiFactory() *WasiFactory { // Image is the ID // NOTE: this approach requires Runtime to be set to wasi and CustomImage to an identifier (e.g. function name) func (wf *WasiFactory) Create(image string, opts *ContainerOptions) (ContainerID, error) { - // Check if runner already exists - wf.lock.RLock() - _, ok := wf.runners[image] + _, ok := wf.runners.Load(image) if ok { - wf.lock.RUnlock() return image, nil } - wf.lock.RUnlock() - - // Runner does not exists, creating new one - wf.lock.Lock() - defer wf.lock.Unlock() - // Create new runner if it does not exists var envKeys, envVals, cliArgs []string for _, v := range opts.Env { cliArgs = append(cliArgs, "--env", v) @@ -134,73 +121,73 @@ func (wf *WasiFactory) Create(image string, opts *ContainerOptions) (ContainerID envVals = append(envVals, value) } - wf.runners[image] = &wasiRunner{ + wasiConfig := wasmtime.NewWasiConfig() + wasiConfig.SetEnv(envKeys, envVals) + + wf.runners.Store(image, &wasiRunner{ envKeys: envKeys, envValues: envVals, cliArgs: cliArgs, wasiType: WASI_TYPE_UNDEFINED, - } + }) return image, nil } // Untar the decoded function code into a temporary directory func (wf *WasiFactory) CopyToContainer(contID ContainerID, content io.Reader, destPath string) error { - wr := wf.runners[contID] - wr.lock.RLock() - // Code was already copied by another thread - if wr.dir != "" { - wr.lock.RUnlock() - return nil - } - wr.lock.RUnlock() - - // Code has to be downloaded, getting a write lock - wr.lock.Lock() - defer wr.lock.Unlock() - - // Additional buffer, used to determine if it's a URL or not - var buffer bytes.Buffer - - // Create new reader that reads from content and writes to buffer - teeReader := io.TeeReader(content, &buffer) - - // Read from the newly created reader - data, err := io.ReadAll(teeReader) - if err != nil { - return fmt.Errorf("[WasiFactory] Failed to read content: %v", err) - } - // Restore content value (assuming code is a tar) - content = &buffer - - // Check if data is a url - u, err := url.ParseRequestURI(string(data)) - if err == nil && u.Scheme != "" && u.Host != "" { - // data is url; it has to be downloaded - resp, err := http.Get(string(data)) + wrValue, _ := wf.runners.Load(contID) // assuming runners already exists + wr := wrValue.(*wasiRunner) + externalError := *new(error) + wr.copyInit.Do(func() { + // Additional buffer, used to determine if it's a URL or not + var buffer bytes.Buffer + + // Create new reader that reads from content and writes to buffer + teeReader := io.TeeReader(content, &buffer) + + // Read from the newly created reader + data, err := io.ReadAll(teeReader) if err != nil { - return fmt.Errorf("[WasiFactory] Failed to download code for %s: %v", contID, err) + externalError = fmt.Errorf("[WasiFactory] Failed to read content: %v", err) + return + } + // Restore content value (assuming code is a tar) + content = &buffer + + // Check if data is a url + u, err := url.ParseRequestURI(string(data)) + if err == nil && u.Scheme != "" && u.Host != "" { + // data is url; it has to be downloaded + resp, err := http.Get(string(data)) + if err != nil { + externalError = fmt.Errorf("[WasiFactory] Failed to download code for %s: %v", contID, err) + return + } + defer resp.Body.Close() + + // Content is now the downloaded tar file + content = resp.Body } - defer resp.Body.Close() - - // Content is now the downloaded tar file - content = resp.Body - } - // Create temporary directory to store untar-ed wasm file - dir, err := os.MkdirTemp("", contID) - if err != nil { - return fmt.Errorf("[WasiFactory] Failed to create temporary directory for %s: %v", contID, err) - } - // Untar code - if err := utils.Untar(content, dir); err != nil { - return fmt.Errorf("[WasiFactory] Failed to untar code for %s: %v", contID, err) - } - // NOTE: hard-coding `destPath` as `/` - // this is required to correctly use the official Python interpreter - wr.mount = "/" - wr.dir = dir - wr.cliArgs = append(wr.cliArgs, "--dir", wr.dir+"::"+wr.mount) - return nil + // Create temporary directory to store untar-ed wasm file + dir, err := os.MkdirTemp("", contID) + if err != nil { + externalError = fmt.Errorf("[WasiFactory] Failed to create temporary directory for %s: %v", contID, err) + return + } + // Untar code + if err := utils.Untar(content, dir); err != nil { + externalError = fmt.Errorf("[WasiFactory] Failed to untar code for %s: %v", contID, err) + return + } + // NOTE: hard-coding `destPath` as `/` + // this is required to correctly use the official Python interpreter + wr.mount = "/" + wr.dir = dir + wr.cliArgs = append(wr.cliArgs, "--dir", wr.dir+"::"+wr.mount) + }) + + return externalError } // WASI Module: compiles the module @@ -208,53 +195,49 @@ func (wf *WasiFactory) CopyToContainer(contID ContainerID, content io.Reader, de // NOTE: using contID (set as custom_image from CLI as the wasm filename inside the tar) func (wf *WasiFactory) Start(contID ContainerID) error { // Get the wasi runner - wr := wf.runners[contID] - wr.lock.RLock() - - // File was already compiled by another thread - if wr.wasiType != WASI_TYPE_UNDEFINED { - wr.lock.RUnlock() - return nil - } - wr.lock.RUnlock() - - wr.lock.Lock() - defer wr.lock.Unlock() - - // Create a linker - wr.linker = wasmtime.NewLinker(wf.engine) - if err := wr.linker.DefineWasi(); err != nil { - wr.Close() - return fmt.Errorf("[WasiFactory] Failed to define WASI in the linker for %s: %v", contID, err) - } + wrValue, _ := wf.runners.Load(contID) + wr := wrValue.(*wasiRunner) + + externalError := *new(error) + wr.startInit.Do(func() { + // Create a linker + wr.linker = wasmtime.NewLinker(wf.engine) + if err := wr.linker.DefineWasi(); err != nil { + wr.Close() + externalError = fmt.Errorf("[WasiFactory] Failed to define WASI in the linker for %s: %v", contID, err) + return + } - // Determine wasm file name - wasmFileName := filepath.Join(wr.dir, contID+".wasm") + // Determine wasm file name + wasmFileName := filepath.Join(wr.dir, contID+".wasm") - // Try to compile the WASI Module - module, err := wasmtime.NewModuleFromFile(wf.engine, wasmFileName) - if err != nil { - if strings.HasPrefix(err.Error(), "expected a WebAssembly module but was given a WebAssembly component") { - // File is a WASI Component - wr.cliArgs = append(wr.cliArgs, wasmFileName) - wr.wasiType = WASI_TYPE_COMPONENT - return nil + // Try to compile the WASI Module + module, err := wasmtime.NewModuleFromFile(wf.engine, wasmFileName) + if err != nil { + if strings.HasPrefix(err.Error(), "expected a WebAssembly module but was given a WebAssembly component") { + // File is a WASI Component + wr.cliArgs = append(wr.cliArgs, wasmFileName) + wr.wasiType = WASI_TYPE_COMPONENT + return + } + // There was another error; wasm file is incorrect + wr.Close() + externalError = fmt.Errorf("[WasiFactory] Failed to create WASI Module for %s: %v", contID, err) + return } - // There was another error; wasm file is incorrect - wr.Close() - return fmt.Errorf("[WasiFactory] Failed to create WASI Module for %s: %v", contID, err) - } - // File was compiled successfully - wr.module = module - wr.wasiType = WASI_TYPE_MODULE - return nil + // File was compiled successfully + wr.module = module + wr.wasiType = WASI_TYPE_MODULE + }) + return externalError } func (wf *WasiFactory) Destroy(id ContainerID) error { - if wasiRunner, ok := wf.runners[id]; ok { - wasiRunner.Close() + wrValue, ok := wf.runners.Load(id) + if ok { + wrValue.(*wasiRunner).Close() + wf.runners.Delete(id) } - delete(wf.runners, id) return nil } From 79ae99394f6979cccb9cd2aed0aaa9ddc55a00f1 Mon Sep 17 00:00:00 2001 From: Alessandro Lioi Date: Sat, 2 Nov 2024 15:26:30 +0100 Subject: [PATCH 20/29] fix(wasi): improve creations and closing --- internal/container/container.go | 15 +++------- internal/container/wasi.go | 53 ++++++++++++++++----------------- 2 files changed, 30 insertions(+), 38 deletions(-) diff --git a/internal/container/container.go b/internal/container/container.go index 7928ad25..f824c779 100644 --- a/internal/container/container.go +++ b/internal/container/container.go @@ -9,10 +9,8 @@ import ( "log" "net/http" "os/exec" - "runtime" "time" - "github.com/bytecodealliance/wasmtime-go/v25" "github.com/grussorusso/serverledge/internal/executor" "github.com/grussorusso/serverledge/internal/function" ) @@ -73,31 +71,26 @@ func wasiExecute(contID ContainerID, req *executor.InvocationRequest) (*executor t0 := time.Now() if wr.wasiType == WASI_TYPE_MODULE { // Create a new Wasi Configuration - wcc, err := wr.BuildWasiConfig(contID, req.Handler, string(paramsBytes)) + wcc, err := wr.BuildStore(contID, wf.engine, req.Handler, string(paramsBytes)) if err != nil { return nil, time.Now().Sub(t0), err } defer wcc.Close() - // Create new store for this module - store := wasmtime.NewStore(wf.engine) - store.SetWasi(wcc.wasiConfig) - defer store.Close() - // Create an instance of the module - instance, err := wr.linker.Instantiate(store, wr.module) + instance, err := wr.linker.Instantiate(wcc.store, wr.module) if err != nil { return nil, time.Now().Sub(t0), fmt.Errorf("Failed to instantiate WASI module: %v", err) } // Get the _start function (entrypoint of any wasm module) - start := instance.GetFunc(store, "_start") + start := instance.GetFunc(wcc.store, "_start") if start == nil { return nil, time.Now().Sub(t0), fmt.Errorf("WASI Module does not have a _start function") } // Call the _start function - if _, err := start.Call(store); err != nil { + if _, err := start.Call(wcc.store); err != nil { return nil, time.Now().Sub(t0), fmt.Errorf("Failed to run WASI module: %v", err) } diff --git a/internal/container/wasi.go b/internal/container/wasi.go index d91536d3..45052838 100644 --- a/internal/container/wasi.go +++ b/internal/container/wasi.go @@ -43,8 +43,9 @@ type wasiRunner struct { } // Utility struct to keep configuration and temporary files -type wasiCustomConfig struct { - wasiConfig *wasmtime.WasiConfig // Actual Configuration +type wasiInternalStore struct { + store *wasmtime.Store // Actual Store + config *wasmtime.WasiConfig // Wasi Config stdout, stderr *os.File // Temporary files for stdout and stderr } @@ -62,21 +63,17 @@ func (wr *wasiRunner) Close() { } } -func (wcc *wasiCustomConfig) Close() { - if wcc.wasiConfig != nil { - wcc.wasiConfig.Close() - } - if wcc.stdout != nil { - wcc.stdout.Close() - if err := os.Remove(wcc.stdout.Name()); err != nil { - log.Printf("[WasiCustomConfig] Failed to remove stdout %s: %v", wcc.stdout.Name(), err) - } +func (wcc *wasiInternalStore) Close() { + wcc.store.Close() + wcc.config.Close() + wcc.stdout.Close() + wcc.stderr.Close() + + if err := os.Remove(wcc.stdout.Name()); err != nil { + log.Printf("[WasiCustomConfig] Failed to remove stdout %s: %v", wcc.stdout.Name(), err) } - if wcc.stderr != nil { - wcc.stderr.Close() - if err := os.Remove(wcc.stderr.Name()); err != nil { - log.Printf("[WasiCustomConfig] Failed to remove stderr %s: %v", wcc.stderr.Name(), err) - } + if err := os.Remove(wcc.stderr.Name()); err != nil { + log.Printf("[WasiCustomConfig] Failed to remove stderr %s: %v", wcc.stderr.Name(), err) } } @@ -263,33 +260,33 @@ func (wf *WasiFactory) GetMemoryMB(id ContainerID) (int64, error) { // Utility function to create a Wasi Configuration for this runner // The WasiConfiguration cannot be shared among threads because it's not thread-safe -func (wr *wasiRunner) BuildWasiConfig(contID ContainerID, handler string, params string) (wasiCustomConfig, error) { - var wcc wasiCustomConfig +func (wr *wasiRunner) BuildStore(contID ContainerID, engine *wasmtime.Engine, handler, params string) (wasiInternalStore, error) { + var wcc wasiInternalStore // Create new Wasi Configuration - wasiConfig := wasmtime.NewWasiConfig() + wcc.config = wasmtime.NewWasiConfig() // Set environment variables - wasiConfig.SetEnv(wr.envKeys, wr.envValues) + wcc.config.SetEnv(wr.envKeys, wr.envValues) // Create temporary files for stdout and stderr for this function - stdout, err := os.CreateTemp("", fmt.Sprintf("%s-stdout", contID)) + stdout, err := os.CreateTemp("", fmt.Sprintf("%s-stdout-*", contID)) if err != nil { return wcc, fmt.Errorf("[WasiRunner]: failed to create temp stdout file for %s: %v", contID, err) } - stderr, err := os.CreateTemp("", fmt.Sprintf("%s-stdout", contID)) + stderr, err := os.CreateTemp("", fmt.Sprintf("%s-stderr-*", contID)) if err != nil { return wcc, fmt.Errorf("[WasiRunner]: failed to create temp stderr file for %s: %v", contID, err) } // Set wasmtime to use the temporary files for stdout and stderr - if err := wasiConfig.SetStdoutFile(stdout.Name()); err != nil { + if err := wcc.config.SetStdoutFile(stdout.Name()); err != nil { return wcc, fmt.Errorf("[WasiRunner] Failed to set stdout file: %v", err) } - if err := wasiConfig.SetStderrFile(stderr.Name()); err != nil { + if err := wcc.config.SetStderrFile(stderr.Name()); err != nil { return wcc, fmt.Errorf("[WasiRunner] Failed to set stderr file: %v", err) } // Mount the temporary directory to the specified mount point - if err := wasiConfig.PreopenDir(wr.dir, wr.mount); err != nil { + if err := wcc.config.PreopenDir(wr.dir, wr.mount); err != nil { return wcc, fmt.Errorf("[WasiRunner] Failed to preopen %s: %v", wr.mount, err) } @@ -303,11 +300,13 @@ func (wr *wasiRunner) BuildWasiConfig(contID ContainerID, handler string, params argv = append(argv, params) // Set argv in Wasi - wasiConfig.SetArgv(argv) + wcc.config.SetArgv(argv) // Save references into custom configuration - wcc.wasiConfig = wasiConfig wcc.stdout = stdout wcc.stderr = stderr + + wcc.store = wasmtime.NewStore(engine) + wcc.store.SetWasi(wcc.config) return wcc, nil } From 2ad59f357f02f25d48700fff5b2325516744ef93 Mon Sep 17 00:00:00 2001 From: Alessandro Lioi Date: Mon, 4 Nov 2024 14:36:21 +0100 Subject: [PATCH 21/29] fix(wasi): better detect execution error --- internal/container/container.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/container/container.go b/internal/container/container.go index f824c779..e002ec86 100644 --- a/internal/container/container.go +++ b/internal/container/container.go @@ -9,6 +9,7 @@ import ( "log" "net/http" "os/exec" + "strings" "time" "github.com/grussorusso/serverledge/internal/executor" @@ -90,7 +91,8 @@ func wasiExecute(contID ContainerID, req *executor.InvocationRequest) (*executor } // Call the _start function - if _, err := start.Call(wcc.store); err != nil { + if _, err := start.Call(wcc.store); err != nil && + !strings.Contains(err.Error(), "exit status 0") { return nil, time.Now().Sub(t0), fmt.Errorf("Failed to run WASI module: %v", err) } From f788cbbb11a076247b38e24518befc2f5748ea6e Mon Sep 17 00:00:00 2001 From: Alessandro Lioi Date: Tue, 5 Nov 2024 13:02:14 +0100 Subject: [PATCH 22/29] fix(wasi): moved URL download back in NewContainer --- internal/container/container.go | 20 ++++++++++++++++++- internal/container/wasi.go | 34 --------------------------------- 2 files changed, 19 insertions(+), 35 deletions(-) diff --git a/internal/container/container.go b/internal/container/container.go index e002ec86..05b8ba75 100644 --- a/internal/container/container.go +++ b/internal/container/container.go @@ -8,6 +8,7 @@ import ( "io" "log" "net/http" + "net/url" "os/exec" "strings" "time" @@ -26,8 +27,25 @@ func NewContainer(image, codeTar string, opts *ContainerOptions, f *function.Fun } if len(codeTar) > 0 { + var r io.Reader + // Decoding codeTar decodedCode, _ := base64.StdEncoding.DecodeString(codeTar) - err = cf.CopyToContainer(contID, bytes.NewReader(decodedCode), "/app/") + // Check if decoded src is a url + u, err := url.ParseRequestURI(string(decodedCode)) + if err == nil && u.Scheme != "" && u.Host != "" { + // codeTar is an URL; it has to be downloaded + resp, err := http.Get(string(decodedCode)) + if err != nil { + log.Printf("Failed to download code %s", decodedCode) + return "", err + } + defer resp.Body.Close() + r = resp.Body + } else { + // assuming decodedCode is base64 encoded tar + r = bytes.NewReader(decodedCode) + } + err = cf.CopyToContainer(contID, r, "/app/") if err != nil { log.Printf("Failed code copy\n") return "", err diff --git a/internal/container/wasi.go b/internal/container/wasi.go index 45052838..7033d1e0 100644 --- a/internal/container/wasi.go +++ b/internal/container/wasi.go @@ -1,13 +1,10 @@ package container import ( - "bytes" "context" "fmt" "io" "log" - "net/http" - "net/url" "os" "path/filepath" "strings" @@ -136,36 +133,6 @@ func (wf *WasiFactory) CopyToContainer(contID ContainerID, content io.Reader, de wr := wrValue.(*wasiRunner) externalError := *new(error) wr.copyInit.Do(func() { - // Additional buffer, used to determine if it's a URL or not - var buffer bytes.Buffer - - // Create new reader that reads from content and writes to buffer - teeReader := io.TeeReader(content, &buffer) - - // Read from the newly created reader - data, err := io.ReadAll(teeReader) - if err != nil { - externalError = fmt.Errorf("[WasiFactory] Failed to read content: %v", err) - return - } - // Restore content value (assuming code is a tar) - content = &buffer - - // Check if data is a url - u, err := url.ParseRequestURI(string(data)) - if err == nil && u.Scheme != "" && u.Host != "" { - // data is url; it has to be downloaded - resp, err := http.Get(string(data)) - if err != nil { - externalError = fmt.Errorf("[WasiFactory] Failed to download code for %s: %v", contID, err) - return - } - defer resp.Body.Close() - - // Content is now the downloaded tar file - content = resp.Body - } - // Create temporary directory to store untar-ed wasm file dir, err := os.MkdirTemp("", contID) if err != nil { @@ -254,7 +221,6 @@ func (wf *WasiFactory) GetIPAddress(ContainerID) (string, error) { } func (wf *WasiFactory) GetMemoryMB(id ContainerID) (int64, error) { - log.Println("[WasiFactory] GetMemoryMB unimplemented") return 0, nil } From e0cda669aa9d7a81f6017510b8b3dd7f57de9d23 Mon Sep 17 00:00:00 2001 From: Alessandro Lioi Date: Tue, 5 Nov 2024 14:46:42 +0100 Subject: [PATCH 23/29] fix(wasi): remove unused WasiConfig declaration --- internal/container/wasi.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/internal/container/wasi.go b/internal/container/wasi.go index 7033d1e0..b13550f7 100644 --- a/internal/container/wasi.go +++ b/internal/container/wasi.go @@ -115,9 +115,6 @@ func (wf *WasiFactory) Create(image string, opts *ContainerOptions) (ContainerID envVals = append(envVals, value) } - wasiConfig := wasmtime.NewWasiConfig() - wasiConfig.SetEnv(envKeys, envVals) - wf.runners.Store(image, &wasiRunner{ envKeys: envKeys, envValues: envVals, From 2b26845e85ed8dd40d302f31a0ad4c3bd50ace27 Mon Sep 17 00:00:00 2001 From: Alessandro Lioi Date: Wed, 6 Nov 2024 11:50:10 +0100 Subject: [PATCH 24/29] feat(wasi): enable modules cache and threads support --- go.mod | 2 +- internal/container/wasi.go | 62 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index cb85de6b..0df92b5b 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( go.opentelemetry.io/otel/sdk v1.28.0 go.opentelemetry.io/otel/trace v1.28.0 golang.org/x/net v0.0.0-20220225172249-27dd8689420f + golang.org/x/sys v0.21.0 ) require ( @@ -71,7 +72,6 @@ require ( go.uber.org/multierr v1.6.0 // indirect go.uber.org/zap v1.17.0 // indirect golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 // indirect - golang.org/x/sys v0.21.0 // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 // indirect google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect diff --git a/internal/container/wasi.go b/internal/container/wasi.go index b13550f7..6659e7bc 100644 --- a/internal/container/wasi.go +++ b/internal/container/wasi.go @@ -7,11 +7,13 @@ import ( "log" "os" "path/filepath" + "runtime" "strings" "sync" "github.com/bytecodealliance/wasmtime-go/v25" "github.com/grussorusso/serverledge/utils" + "golang.org/x/sys/cpu" ) type WasiType string @@ -76,13 +78,20 @@ func (wcc *wasiInternalStore) Close() { func InitWasiFactory() *WasiFactory { ctx := context.Background() + // Create Engine configuration engineConfig := wasmtime.NewConfig() engineConfig.SetWasmRelaxedSIMD(true) engineConfig.SetWasmBulkMemory(true) engineConfig.SetWasmMultiValue(true) + engineConfig.SetWasmThreads(true) engineConfig.SetStrategy(wasmtime.StrategyCranelift) engineConfig.SetCraneliftOptLevel(wasmtime.OptLevelSpeed) + engineConfig.SetCraneliftDebugVerifier(false) + enableCraneliftFlags(engineConfig) + if err := engineConfig.CacheConfigLoadDefault(); err != nil { + log.Printf("Failed to setup cache: %v", err) + } // Create wasmtime engine, shared for all modules engine := wasmtime.NewEngineWithConfig(engineConfig) @@ -273,3 +282,56 @@ func (wr *wasiRunner) BuildStore(contID ContainerID, engine *wasmtime.Engine, ha wcc.store.SetWasi(wcc.config) return wcc, nil } + +func enableCraneliftFlags(config *wasmtime.Config) { + // Cranelift only supports x86 and x86-64 compilation flags + if runtime.GOARCH == "386" || runtime.GOARCH == "amd64" { + if cpu.X86.HasSSE3 { + config.EnableCraneliftFlag("has_sse3") + } + if cpu.X86.HasSSSE3 { + config.EnableCraneliftFlag("has_ssse3") + } + if cpu.X86.HasSSE41 { + config.EnableCraneliftFlag("has_sse41") + } + if cpu.X86.HasSSE42 { + config.EnableCraneliftFlag("has_sse42") + } + if cpu.X86.HasAVX { + config.EnableCraneliftFlag("has_avx") + } + if cpu.X86.HasAVX2 { + config.EnableCraneliftFlag("has_avx2") + } + if cpu.X86.HasFMA { + config.EnableCraneliftFlag("has_fma") + } + if cpu.X86.HasAVX512BITALG { + config.EnableCraneliftFlag("has_avx512bitalg") + } + if cpu.X86.HasAVX512DQ { + config.EnableCraneliftFlag("has_avx512dq") + } + if cpu.X86.HasAVX512VL { + config.EnableCraneliftFlag("has_avx512vl") + } + if cpu.X86.HasAVX512VBMI { + config.EnableCraneliftFlag("has_avx512vbmi") + } + if cpu.X86.HasAVX512F { + config.EnableCraneliftFlag("has_avx512f") + } + if cpu.X86.HasPOPCNT { + config.EnableCraneliftFlag("has_popcnt") + } + if cpu.X86.HasBMI1 { + config.EnableCraneliftFlag("has_bmi1") + config.EnableCraneliftFlag("has_lzcnt") + } + if cpu.X86.HasBMI2 { + config.EnableCraneliftFlag("has_bmi2") + config.EnableCraneliftFlag("has_lzcnt") + } + } +} From 23c68a9646d4497afb2548ca5ff855ba6426643e Mon Sep 17 00:00:00 2001 From: Alessandro Lioi Date: Thu, 7 Nov 2024 15:05:21 +0100 Subject: [PATCH 25/29] fix(wasi): switch to component --- internal/container/wasi.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/container/wasi.go b/internal/container/wasi.go index 6659e7bc..d1761c0c 100644 --- a/internal/container/wasi.go +++ b/internal/container/wasi.go @@ -184,7 +184,7 @@ func (wf *WasiFactory) Start(contID ContainerID) error { // Try to compile the WASI Module module, err := wasmtime.NewModuleFromFile(wf.engine, wasmFileName) if err != nil { - if strings.HasPrefix(err.Error(), "expected a WebAssembly module but was given a WebAssembly component") { + if strings.Contains(err.Error(), "expected a WebAssembly module but was given a WebAssembly component") { // File is a WASI Component wr.cliArgs = append(wr.cliArgs, wasmFileName) wr.wasiType = WASI_TYPE_COMPONENT From e8f96d89aee30e6d48656557be175e56842d1307 Mon Sep 17 00:00:00 2001 From: Alessandro Lioi Date: Fri, 8 Nov 2024 16:47:57 +0100 Subject: [PATCH 26/29] feat(wasi): add network support for wasmtime CLI --- internal/container/wasi.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/container/wasi.go b/internal/container/wasi.go index d1761c0c..f81dde83 100644 --- a/internal/container/wasi.go +++ b/internal/container/wasi.go @@ -154,7 +154,10 @@ func (wf *WasiFactory) CopyToContainer(contID ContainerID, content io.Reader, de // this is required to correctly use the official Python interpreter wr.mount = "/" wr.dir = dir - wr.cliArgs = append(wr.cliArgs, "--dir", wr.dir+"::"+wr.mount) + wr.cliArgs = append(wr.cliArgs, + "--wasi", "preview2", + "--wasi", "inherit-network", + "--dir", wr.dir+"::"+wr.mount) }) return externalError From 08461d889eb99d2c7e1802fd6766e817b94eb193 Mon Sep 17 00:00:00 2001 From: Alessandro Lioi Date: Sat, 23 Nov 2024 17:10:49 +0100 Subject: [PATCH 27/29] docs(wasi): WASI function creation; internal docs for future references --- docs/writing-functions.md | 54 ++++++++++++++++++++++++++++++++++++++ internal/container/wasi.go | 4 ++- 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/docs/writing-functions.md b/docs/writing-functions.md index 6bba4f95..3e79fa05 100644 --- a/docs/writing-functions.md +++ b/docs/writing-functions.md @@ -28,6 +28,60 @@ Available runtime: `nodejs17` (NodeJS 17) Specify the handler as `.js` (e.g., `myfile.js`). An example is given in `examples/sieve.js`. +## WebAssembly/WASI + +Serverledge supports the execution of WebAssembly modules through WASI using +[wasmtime-go](https://github.com/bytecodealliance/wasmtime-go). + +A WebAssembly function can be defined in Serverledge by specifying the following +arguments to `serverledge-cli`: + +- `runtime`: `wasi` +- `src`: the path of the `.wasm` file. If the file is larger than 2MB (`etcd` + message size limit), it has to point to a URL of a `.tar` containing the + `.wasm` file +- `custom_image`: the name of the `.wasm` file inside the `.tar` file + +When running a function written in Python, the `handler` argument is **required** +and it has to point to the `.py` file inside the `.tar` passed in `src`. + +NOTE: if the WebAssembly module is a component, the execution of the function +will be handled by the `wasmtime` CLI which has to be installed and its +installation path added to the `PATH` environment variable. + +### Examples + +**Wasm file smaller than 2MB**: assuming we have a `hello.wasm` file inside the +`/home/user/code/` directory + +A function can be created using this command: + + serverledge-cli create -f func-name \ + --runtime wasi \ + --src /home/user/code/hello.wasm \ + --custom_image hello + +**Python** (using the official build): assuming the `.tar` is hosted on +`localhost:8000/python.tar` and inside the `.tar` file there is a `func.py` + +At the time of writing, the official build is provided by the maintainer of the +WASI platform in Python at the following [link](https://github.com/brettcannon/cpython-wasi-build/releases/tag/v3.13.0) + +Serverledge assumes the structure of the `python.tar` file to be the following: + +- `func.py`: this can also be in a sub-directory and is specified in the + `handler` argument of the function creation +- `python.wasm` +- `lib/` + +A function can be created using this command: + + serverledge-cli create -f func-name \ + --runtime wasi \ + --src http://localhost:8000/python.tar \ + --custom_image python \ + --handler func.py + ## Custom function runtimes Follow [these instructions](./custom_runtime.md). diff --git a/internal/container/wasi.go b/internal/container/wasi.go index f81dde83..2956f00b 100644 --- a/internal/container/wasi.go +++ b/internal/container/wasi.go @@ -151,7 +151,9 @@ func (wf *WasiFactory) CopyToContainer(contID ContainerID, content io.Reader, de return } // NOTE: hard-coding `destPath` as `/` - // this is required to correctly use the official Python interpreter + // This is required to correctly use the official Python interpreter + // In the future this won't be necessary as python.wasm will be distribuited + // as a self-contained WebAssembly file wr.mount = "/" wr.dir = dir wr.cliArgs = append(wr.cliArgs, From 9e4fa4660856a6fb4516885e2809bf48501b3282 Mon Sep 17 00:00:00 2001 From: Alessandro Lioi Date: Tue, 17 Dec 2024 12:47:04 +0100 Subject: [PATCH 28/29] fix(wasi): correctly compute InitTime --- internal/container/container.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/internal/container/container.go b/internal/container/container.go index 05b8ba75..3fc5b913 100644 --- a/internal/container/container.go +++ b/internal/container/container.go @@ -87,43 +87,42 @@ func wasiExecute(contID ContainerID, req *executor.InvocationRequest) (*executor } res := &executor.InvocationResult{Success: false} - t0 := time.Now() if wr.wasiType == WASI_TYPE_MODULE { // Create a new Wasi Configuration wcc, err := wr.BuildStore(contID, wf.engine, req.Handler, string(paramsBytes)) if err != nil { - return nil, time.Now().Sub(t0), err + return nil, 0, err } defer wcc.Close() // Create an instance of the module instance, err := wr.linker.Instantiate(wcc.store, wr.module) if err != nil { - return nil, time.Now().Sub(t0), fmt.Errorf("Failed to instantiate WASI module: %v", err) + return nil, 0, fmt.Errorf("Failed to instantiate WASI module: %v", err) } // Get the _start function (entrypoint of any wasm module) start := instance.GetFunc(wcc.store, "_start") if start == nil { - return nil, time.Now().Sub(t0), fmt.Errorf("WASI Module does not have a _start function") + return nil, 0, fmt.Errorf("WASI Module does not have a _start function") } // Call the _start function if _, err := start.Call(wcc.store); err != nil && !strings.Contains(err.Error(), "exit status 0") { - return nil, time.Now().Sub(t0), fmt.Errorf("Failed to run WASI module: %v", err) + return nil, 0, fmt.Errorf("Failed to run WASI module: %v", err) } // Read stdout from the temp file stdout, err := io.ReadAll(wcc.stdout) if err != nil { - return nil, time.Now().Sub(t0), fmt.Errorf("Failed to read stdout for WASI: %v", err) + return nil, 0, fmt.Errorf("Failed to read stdout for WASI: %v", err) } // Read stderr from the temp file stderr, err := io.ReadAll(wcc.stderr) if err != nil { - return nil, time.Now().Sub(t0), fmt.Errorf("Failed to read stderr for WASI: %v", err) + return nil, 0, fmt.Errorf("Failed to read stderr for WASI: %v", err) } // Populate result @@ -170,7 +169,7 @@ func wasiExecute(contID ContainerID, req *executor.InvocationRequest) (*executor } } - return res, time.Now().Sub(t0), nil + return res, 0, nil } // Execute interacts with the Executor running in the container to invoke the From 5f598a965735b80ad3b83444a3015ae77aa67591 Mon Sep 17 00:00:00 2001 From: Alessandro Lioi Date: Tue, 17 Dec 2024 14:14:58 +0100 Subject: [PATCH 29/29] fix(wasi): correctly compute init time (with module instantiation) --- internal/container/container.go | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/internal/container/container.go b/internal/container/container.go index 3fc5b913..1949dc25 100644 --- a/internal/container/container.go +++ b/internal/container/container.go @@ -87,42 +87,45 @@ func wasiExecute(contID ContainerID, req *executor.InvocationRequest) (*executor } res := &executor.InvocationResult{Success: false} + t0 := time.Now() + var invocationWait time.Duration if wr.wasiType == WASI_TYPE_MODULE { // Create a new Wasi Configuration wcc, err := wr.BuildStore(contID, wf.engine, req.Handler, string(paramsBytes)) if err != nil { - return nil, 0, err + return nil, time.Now().Sub(t0), err } defer wcc.Close() // Create an instance of the module instance, err := wr.linker.Instantiate(wcc.store, wr.module) if err != nil { - return nil, 0, fmt.Errorf("Failed to instantiate WASI module: %v", err) + return nil, time.Now().Sub(t0), fmt.Errorf("Failed to instantiate WASI module: %v", err) } // Get the _start function (entrypoint of any wasm module) start := instance.GetFunc(wcc.store, "_start") if start == nil { - return nil, 0, fmt.Errorf("WASI Module does not have a _start function") + return nil, time.Now().Sub(t0), fmt.Errorf("WASI Module does not have a _start function") } + invocationWait = time.Now().Sub(t0) // Call the _start function if _, err := start.Call(wcc.store); err != nil && !strings.Contains(err.Error(), "exit status 0") { - return nil, 0, fmt.Errorf("Failed to run WASI module: %v", err) + return nil, invocationWait, fmt.Errorf("Failed to run WASI module: %v", err) } // Read stdout from the temp file stdout, err := io.ReadAll(wcc.stdout) if err != nil { - return nil, 0, fmt.Errorf("Failed to read stdout for WASI: %v", err) + return nil, invocationWait, fmt.Errorf("Failed to read stdout for WASI: %v", err) } // Read stderr from the temp file stderr, err := io.ReadAll(wcc.stderr) if err != nil { - return nil, 0, fmt.Errorf("Failed to read stderr for WASI: %v", err) + return nil, invocationWait, fmt.Errorf("Failed to read stderr for WASI: %v", err) } // Populate result @@ -143,6 +146,9 @@ func wasiExecute(contID ContainerID, req *executor.InvocationRequest) (*executor var stdoutBuffer, stderrBuffer bytes.Buffer execCmd.Stdout = &stdoutBuffer execCmd.Stderr = &stderrBuffer + + invocationWait = time.Now().Sub(t0) + // Execute wasmtime CLI err := execCmd.Run() if err != nil { @@ -169,7 +175,7 @@ func wasiExecute(contID ContainerID, req *executor.InvocationRequest) (*executor } } - return res, 0, nil + return res, invocationWait, nil } // Execute interacts with the Executor running in the container to invoke the