diff --git a/.gitignore b/.gitignore index e374580..b1455a1 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ pelacli_data/ test_consensus/ test_consensus_app/ multichain/ +as/node_modules/ diff --git a/Makefile b/Makefile index 45fb0aa..62b3820 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,11 @@ VERSION=v2.4.0 +ASC=npx --prefix as asc +WASM_TESTDATA_DIR=wasmstrategy/testdata +WASM_TESTDATA=$(WASM_TESTDATA_DIR)/uniswap_strategy.wasm \ + $(WASM_TESTDATA_DIR)/forbidden_import.wasm \ + $(WASM_TESTDATA_DIR)/memory_import.wasm \ + $(WASM_TESTDATA_DIR)/instruction_gas.wasm \ + $(WASM_TESTDATA_DIR)/memory_grow.wasm run: go run cmd/main.go \ @@ -32,6 +39,29 @@ clean: tidy: go mod tidy +wasm-deps: + npm --prefix as ci + +wasm-build: wasm-deps + npm --prefix as run build + +wasm-testdata: wasm-deps $(WASM_TESTDATA) + +$(WASM_TESTDATA_DIR)/uniswap_strategy.wasm: $(WASM_TESTDATA_DIR)/as/uniswap_strategy.ts + $(ASC) $< --runtime stub --optimize --shrinkLevel 2 --noAssert -o $@ + +$(WASM_TESTDATA_DIR)/forbidden_import.wasm: $(WASM_TESTDATA_DIR)/as/forbidden_import.ts + $(ASC) $< --runtime stub --optimize --shrinkLevel 2 --noAssert -o $@ + +$(WASM_TESTDATA_DIR)/memory_import.wasm: $(WASM_TESTDATA_DIR)/as/memory_import.ts + $(ASC) $< --runtime stub --optimize --shrinkLevel 2 --noAssert -o $@ + +$(WASM_TESTDATA_DIR)/instruction_gas.wasm: $(WASM_TESTDATA_DIR)/as/instruction_gas.ts + $(ASC) $< --runtime stub --optimize --shrinkLevel 2 --noAssert -o $@ + +$(WASM_TESTDATA_DIR)/memory_grow.wasm: $(WASM_TESTDATA_DIR)/as/memory_grow.ts + $(ASC) $< --runtime stub --optimize --shrinkLevel 2 --noAssert -o $@ + tests: go test -short -timeout 20m -failfast -shuffle=on -v ./... $(params) diff --git a/application/buckets.go b/application/buckets.go index 18a439b..9eb554e 100644 --- a/application/buckets.go +++ b/application/buckets.go @@ -3,11 +3,13 @@ package application import "github.com/ledgerwatch/erigon-lib/kv" const ( - AccountsBucket = "appaccounts" // token+account -> value + AccountsBucket = "appaccounts" // token+account -> value + StrategyStateBucket = "strategykv" ) func Tables() kv.TableCfg { return kv.TableCfg{ - AccountsBucket: {}, + AccountsBucket: {}, + StrategyStateBucket: {}, } } diff --git a/as/asconfig.json b/as/asconfig.json new file mode 100644 index 0000000..c08b386 --- /dev/null +++ b/as/asconfig.json @@ -0,0 +1,22 @@ +{ + "targets": { + "debug": { + "optimizeLevel": 0, + "shrinkLevel": 0, + "debug": true, + "outFile": "../build/strategy.debug.wasm" + }, + "release": { + "optimizeLevel": 3, + "shrinkLevel": 2, + "noAssert": true, + "converge": false, + "outFile": "../build/strategy.wasm" + } + }, + "options": { + "runtime": "stub", + "exportTable": true, + "exportMemory": true + } +} diff --git a/as/package-lock.json b/as/package-lock.json new file mode 100644 index 0000000..2fc53a2 --- /dev/null +++ b/as/package-lock.json @@ -0,0 +1,53 @@ +{ + "name": "pelagos-strategy", + "version": "0.1.0", + "lockfileVersion": 3, + "requires": true, + "packages": { + "": { + "name": "pelagos-strategy", + "version": "0.1.0", + "devDependencies": { + "assemblyscript": "^0.27.0" + } + }, + "node_modules/assemblyscript": { + "version": "0.27.37", + "resolved": "https://registry.npmjs.org/assemblyscript/-/assemblyscript-0.27.37.tgz", + "integrity": "sha512-YtY5k3PiV3SyUQ6gRlR2OCn8dcVRwkpiG/k2T5buoL2ymH/Z/YbaYWbk/f9mO2HTgEtGWjPiAQrIuvA7G/63Gg==", + "dev": true, + "dependencies": { + "binaryen": "116.0.0-nightly.20240114", + "long": "^5.2.4" + }, + "bin": { + "asc": "bin/asc.js", + "asinit": "bin/asinit.js" + }, + "engines": { + "node": ">=18", + "npm": ">=10" + }, + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/assemblyscript" + } + }, + "node_modules/binaryen": { + "version": "116.0.0-nightly.20240114", + "resolved": "https://registry.npmjs.org/binaryen/-/binaryen-116.0.0-nightly.20240114.tgz", + "integrity": "sha512-0GZrojJnuhoe+hiwji7QFaL3tBlJoA+KFUN7ouYSDGZLSo9CKM8swQX8n/UcbR0d1VuZKU+nhogNzv423JEu5A==", + "dev": true, + "bin": { + "wasm-opt": "bin/wasm-opt", + "wasm2js": "bin/wasm2js" + } + }, + "node_modules/long": { + "version": "5.3.2", + "resolved": "https://registry.npmjs.org/long/-/long-5.3.2.tgz", + "integrity": "sha512-mNAgZ1GmyNhD7AuqnTG3/VQ26o760+ZYBPKjPvugO8+nLbYfX6TVpJPseBvopbdY+qpZ/lKUnmEc1LeZYS3QAA==", + "dev": true + } + } +} diff --git a/as/package.json b/as/package.json new file mode 100644 index 0000000..9e43c0e --- /dev/null +++ b/as/package.json @@ -0,0 +1,11 @@ +{ + "name": "pelagos-strategy", + "version": "0.1.0", + "private": true, + "scripts": { + "build": "asc src/strategy.ts --target release --config asconfig.json" + }, + "devDependencies": { + "assemblyscript": "^0.27.0" + } +} diff --git a/as/src/sdk.ts b/as/src/sdk.ts new file mode 100644 index 0000000..5dccf7b --- /dev/null +++ b/as/src/sdk.ts @@ -0,0 +1,100 @@ +// SPDX-License-Identifier: MIT +// Pelagos AssemblyScript SDK shim exposing host functions provided by the Go runtime. + +// EventKind enumerates the event payloads that the Go host exposes to strategies. +export enum EventKind { + Unknown = 0, + ERC20Transfer = 1, +} + +// AddressId groups frequently used contracts so strategies can branch on a small enum. +export enum AddressId { + Unknown = 0, + UniswapV2Pair = 1, + UniswapV3Pool = 2, +} + +// DbSlot identifiers backed by the runtime key/value store. +export const SLOT_LAST_UNI_TRANSFER_BLOCK: i32 = 1; + +// LogLevel mirrors the numeric contract expected by env.log. +export enum LogLevel { + Debug = 10, + Info = 20, + Warn = 30, + Error = 40, +} + +@external("env", "get_block_number") +declare function hostGetBlockNumber(): i64; + +@external("env", "get_chain_id") +declare function hostGetChainId(): i64; + +@external("env", "get_event_count") +declare function hostGetEventCount(): i32; + +@external("env", "get_event_kind") +declare function hostGetEventKind(index: i32): i32; + +@external("env", "get_event_address_id") +declare function hostGetEventAddressId(index: i32): i32; + +@external("env", "db_get_u64") +declare function hostDbGet(slot: i32): i64; + +@external("env", "db_put_u64") +declare function hostDbPut(slot: i32, value: i64): void; + +@external("env", "log") +declare function hostLog(level: i32, ptr: usize, len: i32): void; + +// getBlockNumber returns the current Pelagos block number. +export function getBlockNumber(): i64 { + return hostGetBlockNumber(); +} + +// getChainId returns the appchain identifier. +export function getChainId(): i64 { + return hostGetChainId(); +} + +// getEventCount returns how many events are available for this block. +export function getEventCount(): i32 { + return hostGetEventCount(); +} + +// getEventKind exposes the strongly typed EventKind for the event at index. +export function getEventKind(index: i32): EventKind { + return changetype(hostGetEventKind(index)); +} + +// getEventAddressId returns the AddressId classification for the event at index. +export function getEventAddressId(index: i32): AddressId { + return changetype(hostGetEventAddressId(index)); +} + +// dbGetU64 reads a 64-bit slot from the strategy KV store. +export function dbGetU64(slot: i32): i64 { + return hostDbGet(slot); +} + +// dbPutU64 writes a 64-bit slot to the strategy KV store. +export function dbPutU64(slot: i32, value: i64): void { + hostDbPut(slot, value); +} + +function writeLog(level: LogLevel, message: string): void { + const encoded = String.UTF8.encode(message); + hostLog(level, changetype(encoded), encoded.byteLength); +} + +// logDebug emits a debugging log line. +export function logDebug(message: string): void { + writeLog(LogLevel.Debug, message); +} + +// logInfo emits an informational log line. +export function logInfo(message: string): void { + writeLog(LogLevel.Info, message); +} diff --git a/as/src/strategy.ts b/as/src/strategy.ts new file mode 100644 index 0000000..cfe50a1 --- /dev/null +++ b/as/src/strategy.ts @@ -0,0 +1,62 @@ +import { + AddressId, + EventKind, + SLOT_LAST_UNI_TRANSFER_BLOCK, + dbGetU64, + dbPutU64, + getBlockNumber, + getEventAddressId, + getEventCount, + getEventKind, + logInfo, +} from "./sdk"; + +const BLOCK_STALE_THRESHOLD: i64 = 6000; + +function hasUniswapTransfer(events: i32): bool { + for (let index = 0; index < events; index++) { + if (getEventKind(index) != EventKind.ERC20Transfer) { + continue; + } + + const addressId = getEventAddressId(index); + if ( + addressId == AddressId.UniswapV2Pair || + addressId == AddressId.UniswapV3Pool + ) { + return true; + } + } + + return false; +} + +// on_block is invoked by the Go runtime every Pelagos block. +export function on_block(): void { + const blockNumber = getBlockNumber(); + const eventCount = getEventCount(); + + if (hasUniswapTransfer(eventCount)) { + dbPutU64(SLOT_LAST_UNI_TRANSFER_BLOCK, blockNumber); + logInfo( + "Uniswap ERC20 transfer detected at block " + blockNumber.toString() + ); + + return; + } + + const lastSeen = dbGetU64(SLOT_LAST_UNI_TRANSFER_BLOCK); + if (lastSeen <= 0) { + return; + } + + const delta = blockNumber - lastSeen; + if (delta > BLOCK_STALE_THRESHOLD) { + logInfo( + "No Uniswap transfers observed since block " + + lastSeen.toString() + + ", delta=" + + delta.toString() + ); + } +} diff --git a/build/strategy.wasm b/build/strategy.wasm new file mode 100644 index 0000000..7178724 Binary files /dev/null and b/build/strategy.wasm differ diff --git a/cmd/main.go b/cmd/main.go index 25d1e19..4db1375 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -8,8 +8,10 @@ import ( "os" "os/signal" "syscall" + "time" "github.com/0xAtelerix/sdk/gosdk" + "github.com/0xAtelerix/sdk/gosdk/apptypes" "github.com/0xAtelerix/sdk/gosdk/rpc" "github.com/0xAtelerix/sdk/gosdk/txpool" "github.com/fxamacker/cbor/v2" @@ -21,6 +23,7 @@ import ( "github.com/0xAtelerix/example/application" "github.com/0xAtelerix/example/application/api" + "github.com/0xAtelerix/example/wasmstrategy" ) const ChainID = 42 @@ -34,6 +37,11 @@ type RuntimeArgs struct { RPCPort string MutlichainConfig gosdk.MultichainConfig LogLevel zerolog.Level + StrategyDir string + StrategyReload time.Duration + StrategyGasLimit uint64 + StrategyTimeout time.Duration + StrategyParallel int } func main() { @@ -59,6 +67,11 @@ func RunCLI(ctx context.Context) { rpcPort := fs.String("rpc-port", ":8080", "Port for the JSON-RPC server") multichainConfigJSON := fs.String("multichain-config", "", "Multichain config JSON path") logLevel := fs.Int("log-level", int(zerolog.InfoLevel), "Logging level") + strategyDir := fs.String("strategy-dir", "./build", "Directory containing strategy WASM modules") + strategyReload := fs.Duration("strategy-reload-interval", 5*time.Second, "Interval for rescanning the strategy directory") + strategyGasLimit := fs.Uint64("strategy-gas-limit", 100000, "Per-strategy gas limit when executing on_block") + strategyTimeout := fs.Duration("strategy-timeout", 50*time.Millisecond, "Per-strategy execution timeout") + strategyParallel := fs.Int("strategy-max-parallel", 4, "Maximum number of strategies executed in parallel") if *logLevel > int(zerolog.Disabled) { *logLevel = int(zerolog.DebugLevel) @@ -91,6 +104,11 @@ func RunCLI(ctx context.Context) { RPCPort: *rpcPort, LogLevel: zerolog.Level(*logLevel), MutlichainConfig: mcDbs, + StrategyDir: *strategyDir, + StrategyReload: *strategyReload, + StrategyGasLimit: *strategyGasLimit, + StrategyTimeout: *strategyTimeout, + StrategyParallel: *strategyParallel, } Run(ctx, args, nil) @@ -192,16 +210,61 @@ func Run(ctx context.Context, args RuntimeArgs, _ chan<- int) { log.Info().Msg("Starting appchain...") - appchainExample := gosdk.NewAppchain( - stateTransition, - application.BlockConstructor, - txPool, - config, - appchainDB, - subs, - msa, - txBatchDB, - ) + var strategyManager *wasmstrategy.Manager + if args.StrategyDir != "" { + if info, errStat := os.Stat(args.StrategyDir); errStat == nil && info.IsDir() { + manager, err := wasmstrategy.NewManager(ctx, wasmstrategy.ManagerConfig{ + Logger: &log.Logger, + DB: appchainDB, + Multichain: msa, + StrategyDir: args.StrategyDir, + ReloadInterval: args.StrategyReload, + AddressBook: wasmstrategy.DefaultAddressBook(), + ChainID: apptypes.ChainType(ChainID), + Limits: wasmstrategy.StrategyLimits{ + GasLimit: args.StrategyGasLimit, + Timeout: args.StrategyTimeout, + }, + MaxParallel: args.StrategyParallel, + }) + if err != nil { + log.Warn().Err(err).Msg("Failed to initialize WASM strategy manager") + } else { + strategyManager = manager + defer strategyManager.Close(ctx) + } + } else if errStat != nil { + log.Warn().Err(errStat).Str("path", args.StrategyDir).Msg("Strategy directory unavailable, skipping WASM runtime") + } + } else { + log.Debug().Msg("No strategy directory configured, skipping WASM runtime") + } + + var appchainExample gosdk.Appchain[*gosdk.BatchProcesser[application.Transaction[application.Receipt], application.Receipt], application.Transaction[application.Receipt], application.Receipt, *application.Block] + if strategyManager != nil { + appchainExample = gosdk.NewAppchain( + stateTransition, + application.BlockConstructor, + txPool, + config, + appchainDB, + subs, + msa, + txBatchDB, + gosdk.WithBlockObservers[*gosdk.BatchProcesser[application.Transaction[application.Receipt], application.Receipt], application.Transaction[application.Receipt], application.Receipt, *application.Block](strategyManager), + ) + } else { + appchainExample = gosdk.NewAppchain( + stateTransition, + application.BlockConstructor, + txPool, + config, + appchainDB, + subs, + msa, + txBatchDB, + ) + } if err != nil { log.Fatal().Err(err).Msg("Failed to start appchain") diff --git a/cmd/main_test.go b/cmd/main_test.go index fe0aa9c..32e9f4a 100644 --- a/cmd/main_test.go +++ b/cmd/main_test.go @@ -80,17 +80,19 @@ func TestEndToEnd(t *testing.T) { defer cancel() if err = waitUntil(ctx, func() bool { - // GET is fine; we only care the port is bound. - var req *http.Request - req, err = http.NewRequestWithContext(ctx, http.MethodGet, rpcURL, nil) - require.NoError(t, err, "GET req /rpc") + req, reqErr := http.NewRequestWithContext(ctx, http.MethodGet, rpcURL, nil) + if reqErr != nil { + t.Fatalf("GET req /rpc: %v", reqErr) + } - var resp *http.Response - resp, err = http.DefaultClient.Do(req) - require.NoError(t, err, "GET res /rpc") + resp, respErr := http.DefaultClient.Do(req) + if respErr != nil { + return false + } - err = resp.Body.Close() - require.NoError(t, err) + if closeErr := resp.Body.Close(); closeErr != nil { + t.Fatalf("close resp: %v", closeErr) + } return true }); err != nil { diff --git a/go.mod b/go.mod index 532d499..15ff614 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/ledgerwatch/log/v3 v3.9.0 github.com/rs/zerolog v1.34.0 github.com/stretchr/testify v1.11.1 + github.com/tetratelabs/wazero v1.10.1 ) require ( diff --git a/go.sum b/go.sum index 735fda3..8a5b64d 100644 --- a/go.sum +++ b/go.sum @@ -144,6 +144,8 @@ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/supranational/blst v0.3.16 h1:bTDadT+3fK497EvLdWRQEjiGnUtzJ7jjIUMF0jqwYhE= github.com/supranational/blst v0.3.16/go.mod h1:jZJtfjgudtNl4en1tzwPIV3KjUnQUvG3/j+w+fVonLw= +github.com/tetratelabs/wazero v1.10.1 h1:2DugeJf6VVk58KTPszlNfeeN8AhhpwcZqkJj2wwFuH8= +github.com/tetratelabs/wazero v1.10.1/go.mod h1:DRm5twOQ5Gr1AoEdSi0CLjDQF1J9ZAuyqFIjl1KKfQU= github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= diff --git a/wasmstrategy/addressbook.go b/wasmstrategy/addressbook.go new file mode 100644 index 0000000..3558f1a --- /dev/null +++ b/wasmstrategy/addressbook.go @@ -0,0 +1,11 @@ +package wasmstrategy + +import "github.com/0xAtelerix/example/application" + +// DefaultAddressBook seeds the runtime with known pools so the example strategy can react. +func DefaultAddressBook() map[string]AddressID { + return map[string]AddressID{ + normalizeAddress(application.ExampleContractAddress): AddressIDUniswapV2Pair, + "0x8ad599c3a0ff1de082011efddc58f1908eb6e6d8": AddressIDUniswapV3Pool, // Uniswap v3 USDC/ETH pool + } +} diff --git a/wasmstrategy/host.go b/wasmstrategy/host.go new file mode 100644 index 0000000..eb715ee --- /dev/null +++ b/wasmstrategy/host.go @@ -0,0 +1,472 @@ +package wasmstrategy + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + "strings" + "sync" + "time" + + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/rs/zerolog" + "github.com/tetratelabs/wazero" + "github.com/tetratelabs/wazero/api" + "github.com/tetratelabs/wazero/experimental" + "github.com/tetratelabs/wazero/sys" + + "github.com/0xAtelerix/example/application" +) + +const ( + gasExitCode uint32 = 0xfffffff0 + hostAbortExitCode uint32 = 0xfffffff1 + defaultGasLimit = 50_000 + defaultFunctionCost = 50 + costCheap uint64 = 1 + costEvent uint64 = 2 + costDbRead uint64 = 15 + costDbWrite uint64 = 40 + costLog uint64 = 5 +) + +var ( + errGasLimitExceeded = errors.New("strategy gas limit exceeded") + errNoEntryPoint = errors.New("strategy does not export on_block") +) + +var allowedEnvImports = map[string]struct{}{ + "get_block_number": {}, + "get_chain_id": {}, + "get_event_count": {}, + "get_event_kind": {}, + "get_event_address_id": {}, + "db_get_u64": {}, + "db_put_u64": {}, + "log": {}, + "abort": {}, + "gas": {}, +} + +// StrategyLimits configures execution guard rails for a single strategy. +type StrategyLimits struct { + GasLimit uint64 + Timeout time.Duration + FunctionCallCost uint64 +} + +// hostEnv backs the env.* host functions consumed by AssemblyScript strategies. +type hostEnv struct { + logger *zerolog.Logger + db kv.RwDB + + mu sync.RWMutex + blockCtx BlockContext + gasLimit uint64 + gasRemaining uint64 +} + +func newHostEnv(logger *zerolog.Logger, db kv.RwDB) *hostEnv { + return &hostEnv{logger: logger, db: db} +} + +func (h *hostEnv) prepare(ctx BlockContext, gasLimit uint64) { + h.mu.Lock() + h.blockCtx = ctx + h.gasLimit = gasLimit + h.gasRemaining = gasLimit + h.mu.Unlock() +} + +func (h *hostEnv) register(ctx context.Context, runtime wazero.Runtime) error { + builder := runtime.NewHostModuleBuilder("env") + + builder.NewFunctionBuilder(). + WithGoModuleFunction(api.GoModuleFunc(h.getBlockNumber), []api.ValueType{}, []api.ValueType{api.ValueTypeI64}). + Export("get_block_number") + + builder.NewFunctionBuilder(). + WithGoModuleFunction(api.GoModuleFunc(h.getChainID), []api.ValueType{}, []api.ValueType{api.ValueTypeI64}). + Export("get_chain_id") + + builder.NewFunctionBuilder(). + WithGoModuleFunction(api.GoModuleFunc(h.getEventCount), []api.ValueType{}, []api.ValueType{api.ValueTypeI32}). + Export("get_event_count") + + builder.NewFunctionBuilder(). + WithGoModuleFunction(api.GoModuleFunc(h.getEventKind), []api.ValueType{api.ValueTypeI32}, []api.ValueType{api.ValueTypeI32}). + Export("get_event_kind") + + builder.NewFunctionBuilder(). + WithGoModuleFunction(api.GoModuleFunc(h.getEventAddressID), []api.ValueType{api.ValueTypeI32}, []api.ValueType{api.ValueTypeI32}). + Export("get_event_address_id") + + builder.NewFunctionBuilder(). + WithGoModuleFunction(api.GoModuleFunc(h.dbGetU64), []api.ValueType{api.ValueTypeI32}, []api.ValueType{api.ValueTypeI64}). + Export("db_get_u64") + + builder.NewFunctionBuilder(). + WithGoModuleFunction(api.GoModuleFunc(h.dbPutU64), []api.ValueType{api.ValueTypeI32, api.ValueTypeI64}, []api.ValueType{}). + Export("db_put_u64") + + builder.NewFunctionBuilder(). + WithGoModuleFunction(api.GoModuleFunc(h.log), []api.ValueType{api.ValueTypeI32, api.ValueTypeI32, api.ValueTypeI32}, []api.ValueType{}). + Export("log") + + builder.NewFunctionBuilder(). + WithGoModuleFunction(api.GoModuleFunc(h.gas), []api.ValueType{api.ValueTypeI64}, []api.ValueType{}). + Export("gas") + + builder.NewFunctionBuilder(). + WithGoModuleFunction(api.GoModuleFunc(h.abort), []api.ValueType{ + api.ValueTypeI32, + api.ValueTypeI32, + api.ValueTypeI32, + api.ValueTypeI32, + }, []api.ValueType{}). + Export("abort") + + _, err := builder.Instantiate(ctx) + + return err +} + +func (h *hostEnv) snapshot() BlockContext { + h.mu.RLock() + defer h.mu.RUnlock() + + return h.blockCtx +} + +func (h *hostEnv) charge(ctx context.Context, mod api.Module, cost uint64, reason string) { + if cost == 0 || h.gasLimit == 0 { + return + } + + h.mu.Lock() + if h.gasRemaining >= cost { + h.gasRemaining -= cost + h.mu.Unlock() + return + } + h.gasRemaining = 0 + h.mu.Unlock() + + h.logger.Warn(). + Uint64("cost", cost). + Uint64("limit", h.gasLimit). + Str("reason", reason). + Msg("strategy gas limit exceeded") + + h.trap(ctx, mod, gasExitCode) +} + +func (h *hostEnv) getBlockNumber(ctx context.Context, mod api.Module, stack []uint64) { + h.charge(ctx, mod, costCheap, "get_block_number") + stack[0] = h.snapshot().BlockNumber +} + +func (h *hostEnv) getChainID(ctx context.Context, mod api.Module, stack []uint64) { + h.charge(ctx, mod, costCheap, "get_chain_id") + stack[0] = uint64(h.snapshot().ChainID) +} + +func (h *hostEnv) getEventCount(ctx context.Context, mod api.Module, stack []uint64) { + h.charge(ctx, mod, costCheap, "get_event_count") + stack[0] = uint64(uint32(len(h.snapshot().Events))) +} + +func (h *hostEnv) getEventKind(ctx context.Context, mod api.Module, stack []uint64) { + h.charge(ctx, mod, costEvent, "get_event_kind") + idx := int32(int64(stack[0])) + events := h.snapshot().Events + if idx < 0 || int(idx) >= len(events) { + stack[0] = 0 + + return + } + + stack[0] = uint64(uint32(events[idx].Kind)) +} + +func (h *hostEnv) getEventAddressID(ctx context.Context, mod api.Module, stack []uint64) { + h.charge(ctx, mod, costEvent, "get_event_address_id") + idx := int32(int64(stack[0])) + events := h.snapshot().Events + if idx < 0 || int(idx) >= len(events) { + stack[0] = 0 + + return + } + + stack[0] = uint64(uint32(events[idx].Target)) +} + +func (h *hostEnv) slotKey(slot int32) []byte { + var k [4]byte + binary.BigEndian.PutUint32(k[:], uint32(slot)) + + return k[:] +} + +func (h *hostEnv) dbGetU64(ctx context.Context, mod api.Module, stack []uint64) { + h.charge(ctx, mod, costDbRead, "db_get_u64") + slot := int32(int64(stack[0])) + key := h.slotKey(slot) + + var val uint64 + err := h.db.View(ctx, func(tx kv.Tx) error { + v, err := tx.GetOne(application.StrategyStateBucket, key) + if err != nil { + return err + } + + if len(v) == 8 { + val = binary.BigEndian.Uint64(v) + } + + return nil + }) + if err != nil { + h.logger.Warn().Err(err).Msg("db_get_u64 failed") + stack[0] = 0 + + return + } + + stack[0] = val +} + +func (h *hostEnv) dbPutU64(ctx context.Context, mod api.Module, stack []uint64) { + h.charge(ctx, mod, costDbWrite, "db_put_u64") + slot := int32(int64(stack[0])) + value := stack[1] + key := h.slotKey(slot) + var buf [8]byte + binary.BigEndian.PutUint64(buf[:], value) + + err := h.db.Update(ctx, func(tx kv.RwTx) error { + return tx.Put(application.StrategyStateBucket, key, buf[:]) + }) + if err != nil { + h.logger.Warn().Err(err).Msg("db_put_u64 failed") + } +} + +func (h *hostEnv) log(ctx context.Context, mod api.Module, stack []uint64) { + h.charge(ctx, mod, costLog, "log") + level := int32(int64(stack[0])) + ptr := uint32(stack[1]) + length := uint32(stack[2]) + + memory := mod.Memory() + if memory == nil { + return + } + + msg := "" + if length > 0 { + if data, ok := memory.Read(ptr, length); ok { + msg = string(data) + } else { + h.logger.Warn().Uint32("ptr", ptr).Uint32("len", length).Msg("log read failed") + } + } + + switch level { + case 10: + h.logger.Debug().Msg(msg) + case 20: + h.logger.Info().Msg(msg) + case 30: + h.logger.Warn().Msg(msg) + case 40: + h.logger.Error().Msg(msg) + default: + h.logger.Info().Int32("level", level).Msg(msg) + } +} + +func (h *hostEnv) abort(ctx context.Context, mod api.Module, stack []uint64) { + messagePtr := uint32(stack[0]) + filePtr := uint32(stack[1]) + line := uint32(stack[2]) + column := uint32(stack[3]) + + h.logger.Error(). + Uint32("message_ptr", messagePtr). + Uint32("file_ptr", filePtr). + Uint32("line", line). + Uint32("column", column). + Msg("wasm abort invoked") + + h.trap(ctx, mod, hostAbortExitCode) +} + +func (h *hostEnv) trap(ctx context.Context, mod api.Module, code uint32) { + _ = mod.CloseWithExitCode(ctx, code) + panic(sys.NewExitError(code)) +} + +func (h *hostEnv) gas(ctx context.Context, mod api.Module, stack []uint64) { + cost := uint64(api.DecodeU32(stack[0])) + h.charge(ctx, mod, cost, "guest") +} + +// StrategyModule wraps a single WASM module instance plus host wiring. +type StrategyModule struct { + id string + runtime wazero.Runtime + module api.Module + host *hostEnv + limits StrategyLimits +} + +// ModuleConfig configures a StrategyModule instance. +type ModuleConfig struct { + ID string + Wasm []byte + DB kv.RwDB + Logger *zerolog.Logger + Limits StrategyLimits +} + +// NewStrategyModule compiles and instantiates the WASM strategy and host functions. +func NewStrategyModule(ctx context.Context, cfg ModuleConfig) (*StrategyModule, error) { + if len(cfg.Wasm) == 0 { + return nil, fmt.Errorf("strategy wasm is empty") + } + + logger := cfg.Logger + if logger == nil { + logger = zerolog.Ctx(ctx) + } + + runtime := wazero.NewRuntime(ctx) + host := newHostEnv(logger, cfg.DB) + + if err := host.register(ctx, runtime); err != nil { + return nil, fmt.Errorf("register host env: %w", err) + } + + limits := cfg.Limits + if limits.GasLimit == 0 { + limits.GasLimit = defaultGasLimit + } + if limits.FunctionCallCost == 0 { + limits.FunctionCallCost = defaultFunctionCost + } + + listenerCtx := experimental.WithFunctionListenerFactory(ctx, &gasListenerFactory{ + host: host, + cost: limits.FunctionCallCost, + }) + + compiled, err := runtime.CompileModule(listenerCtx, cfg.Wasm) + if err != nil { + return nil, fmt.Errorf("compile strategy wasm: %w", err) + } + + if err := validateModule(compiled); err != nil { + return nil, err + } + + moduleConfig := wazero.NewModuleConfig().WithName(fmt.Sprintf("strategy-%s", cfg.ID)) + module, err := runtime.InstantiateModule(listenerCtx, compiled, moduleConfig) + if err != nil { + return nil, fmt.Errorf("instantiate strategy wasm: %w", err) + } + + return &StrategyModule{ + id: cfg.ID, + runtime: runtime, + module: module, + host: host, + limits: limits, + }, nil +} + +// Close releases the underlying runtime resources. +func (s *StrategyModule) Close(ctx context.Context) error { + if s.module != nil { + _ = s.module.Close(ctx) + } + + if s.runtime != nil { + return s.runtime.Close(ctx) + } + + return nil +} + +// OnBlock invokes the exported on_block handler with the provided context. +func (s *StrategyModule) OnBlock(ctx context.Context, block BlockContext) error { + s.host.prepare(block, s.limits.GasLimit) + + execCtx := ctx + cancel := func() {} + if s.limits.Timeout > 0 { + execCtx, cancel = context.WithTimeout(ctx, s.limits.Timeout) + } + defer cancel() + + exported := s.module.ExportedFunction("on_block") + if exported == nil { + return fmt.Errorf("%s: %w", s.id, errNoEntryPoint) + } + + _, err := exported.Call(execCtx) + if err == nil { + return nil + } + + var exitErr *sys.ExitError + if errors.As(err, &exitErr) { + switch exitErr.ExitCode() { + case gasExitCode: + return errGasLimitExceeded + case hostAbortExitCode: + return fmt.Errorf("strategy %s aborted execution", s.id) + case sys.ExitCodeDeadlineExceeded: + return fmt.Errorf("strategy %s timed out: %w", s.id, context.DeadlineExceeded) + case sys.ExitCodeContextCanceled: + return fmt.Errorf("strategy %s canceled: %w", s.id, context.Canceled) + default: + return exitErr + } + } + + if errors.Is(err, context.DeadlineExceeded) { + return fmt.Errorf("strategy %s timed out: %w", s.id, err) + } + + return err +} + +func normalizeAddress(addr string) string { + return strings.ToLower(addr) +} + +func validateModule(compiled wazero.CompiledModule) error { + for _, fn := range compiled.ImportedFunctions() { + if _, _, imported := fn.Import(); !imported { + continue + } + + moduleName, funcName, _ := fn.Import() + if moduleName != "env" { + return fmt.Errorf("imports from module %q are not allowed", moduleName) + } + + if _, ok := allowedEnvImports[funcName]; !ok { + return fmt.Errorf("import env.%s is not allowed", funcName) + } + } + + if mems := compiled.ImportedMemories(); len(mems) > 0 { + return fmt.Errorf("imported memories are not supported") + } + + return nil +} diff --git a/wasmstrategy/host_test.go b/wasmstrategy/host_test.go new file mode 100644 index 0000000..49d017b --- /dev/null +++ b/wasmstrategy/host_test.go @@ -0,0 +1,302 @@ +package wasmstrategy + +import ( + "bytes" + "context" + _ "embed" + "encoding/binary" + "fmt" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon-lib/kv/mdbx" + mdbxlog "github.com/ledgerwatch/log/v3" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + "github.com/tetratelabs/wazero" + + "github.com/0xAtelerix/example/application" +) + +//go:embed testdata/uniswap_strategy.wasm +var testStrategyWasm []byte + +//go:embed testdata/forbidden_import.wasm +var testForbiddenImport []byte + +//go:embed testdata/memory_import.wasm +var testMemoryImport []byte + +//go:embed testdata/instruction_gas.wasm +var instructionGasTestWasm []byte + +//go:embed testdata/memory_grow.wasm +var memoryGrowGasTestWasm []byte + +func newTestDB(t *testing.T) kv.RwDB { + t.Helper() + + dbPath := filepath.Join(t.TempDir(), "strategy.mdbx") + + db, err := mdbx.NewMDBX(mdbxlog.New()). + Path(dbPath). + WithTableCfg(func(_ kv.TableCfg) kv.TableCfg { + return kv.TableCfg{ + application.StrategyStateBucket: {}, + } + }).Open() + require.NoError(t, err) + + t.Cleanup(func() { + db.Close() + }) + + return db +} + +func readSlot(t *testing.T, ctx context.Context, db kv.RwDB, slot int32) uint64 { + t.Helper() + + key := make([]byte, 4) + binary.BigEndian.PutUint32(key, uint32(slot)) + + var out uint64 + + require.NoError(t, db.View(ctx, func(tx kv.Tx) error { + data, err := tx.GetOne(application.StrategyStateBucket, key) + if err != nil { + return err + } + + if len(data) == 8 { + out = binary.BigEndian.Uint64(data) + } + + return nil + })) + + return out +} + +func writeSlot(t *testing.T, ctx context.Context, db kv.RwDB, slot int32, value uint64) { + t.Helper() + + key := make([]byte, 4) + binary.BigEndian.PutUint32(key, uint32(slot)) + + var buf [8]byte + binary.BigEndian.PutUint64(buf[:], value) + + require.NoError(t, db.Update(ctx, func(tx kv.RwTx) error { + return tx.Put(application.StrategyStateBucket, key, buf[:]) + })) +} + +func newStrategyModule(t *testing.T, ctx context.Context, db kv.RwDB, sink *bytes.Buffer) *StrategyModule { + logger := zerolog.New(sink).Level(zerolog.DebugLevel) + + module, err := NewStrategyModule(ctx, ModuleConfig{ + ID: "test-strategy", + Wasm: testStrategyWasm, + DB: db, + Logger: &logger, + Limits: StrategyLimits{ + GasLimit: 5000, + Timeout: time.Second, + FunctionCallCost: defaultFunctionCost, + }, + }) + require.NoError(t, err) + + t.Cleanup(func() { + require.NoError(t, module.Close(ctx)) + }) + + return module +} + +func newStrategyModuleFromBytes(t *testing.T, ctx context.Context, db kv.RwDB, wasmBytes []byte, limits StrategyLimits) *StrategyModule { + var sink bytes.Buffer + logger := zerolog.New(&sink).Level(zerolog.DebugLevel) + + module, err := NewStrategyModule(ctx, ModuleConfig{ + ID: "test-raw", + Wasm: wasmBytes, + DB: db, + Logger: &logger, + Limits: limits, + }) + require.NoError(t, err) + + t.Cleanup(func() { + require.NoError(t, module.Close(ctx)) + }) + + return module +} + +func TestStrategyModuleWritesSlotOnEvent(t *testing.T) { + ctx := context.Background() + db := newTestDB(t) + var logs bytes.Buffer + + module := newStrategyModule(t, ctx, db, &logs) + + blockNumber := uint64(42_000) + err := module.OnBlock(ctx, BlockContext{ + BlockNumber: blockNumber, + ChainID: 42, + Events: []StrategyEvent{ + {Kind: EventKindERC20Transfer, Target: AddressIDUniswapV2Pair}, + }, + }) + require.NoError(t, err) + + value := readSlot(t, ctx, db, SlotLastUniTransferBlock) + require.Equal(t, blockNumber, value, "strategy should persist last observed block number") + require.Contains(t, logs.String(), "Uniswap transfer at block", "expected info log from WASM strategy") +} + +func TestStrategyModuleLogsStaleTransferWarning(t *testing.T) { + ctx := context.Background() + db := newTestDB(t) + var logs bytes.Buffer + + module := newStrategyModule(t, ctx, db, &logs) + + lastSeen := uint64(1) + writeSlot(t, ctx, db, SlotLastUniTransferBlock, lastSeen) + + blockNumber := lastSeen + 7000 + err := module.OnBlock(ctx, BlockContext{ + BlockNumber: blockNumber, + ChainID: 42, + }) + require.NoError(t, err) + + value := readSlot(t, ctx, db, SlotLastUniTransferBlock) + require.Equal(t, lastSeen, value, "slot should remain unchanged when no transfer is seen") + + expected := fmt.Sprintf("No Uniswap transfers since block %d", lastSeen) + require.Truef(t, strings.Contains(logs.String(), expected), "expected stale warning log containing %q", expected) +} + +func TestStrategyModuleGasLimitExceeded(t *testing.T) { + ctx := context.Background() + db := newTestDB(t) + var logs bytes.Buffer + + module := newStrategyModule(t, ctx, db, &logs) + module.limits = StrategyLimits{ + GasLimit: 10, + Timeout: time.Second, + FunctionCallCost: 20, + } + + err := module.OnBlock(ctx, BlockContext{ + BlockNumber: 1, + ChainID: 42, + }) + require.ErrorIs(t, err, errGasLimitExceeded) +} + +func TestValidateModuleRejectsUnknownImports(t *testing.T) { + ctx := context.Background() + rt := wazero.NewRuntime(ctx) + t.Cleanup(func() { + _ = rt.Close(ctx) + }) + + module, err := rt.CompileModule(ctx, testForbiddenImport) + require.NoError(t, err) + + err = validateModule(module) + require.EqualError(t, err, "import env.evil is not allowed") +} + +func TestValidateModuleRejectsMemoryImports(t *testing.T) { + ctx := context.Background() + rt := wazero.NewRuntime(ctx) + t.Cleanup(func() { + _ = rt.Close(ctx) + }) + + module, err := rt.CompileModule(ctx, testMemoryImport) + require.NoError(t, err) + + err = validateModule(module) + require.EqualError(t, err, "imported memories are not supported") +} + +func TestGasExecutionDeterministic(t *testing.T) { + ctx := context.Background() + db := newTestDB(t) + + module := newStrategyModuleFromBytes(t, ctx, db, instructionGasTestWasm, StrategyLimits{ + GasLimit: 60, + Timeout: time.Second, + FunctionCallCost: 1, + }) + + block := BlockContext{BlockNumber: 1, ChainID: 42} + + require.NoError(t, module.OnBlock(ctx, block)) + require.NoError(t, module.OnBlock(ctx, block)) +} + +func TestGasLimitExceededOnInstructionCalls(t *testing.T) { + ctx := context.Background() + db := newTestDB(t) + + module := newStrategyModuleFromBytes(t, ctx, db, instructionGasTestWasm, StrategyLimits{ + GasLimit: 20, + Timeout: time.Second, + FunctionCallCost: 1, + }) + + err := module.OnBlock(ctx, BlockContext{BlockNumber: 1, ChainID: 42}) + require.ErrorIs(t, err, errGasLimitExceeded) +} + +func TestGasLimitAllowsExactUsage(t *testing.T) { + ctx := context.Background() + db := newTestDB(t) + + module := newStrategyModuleFromBytes(t, ctx, db, instructionGasTestWasm, StrategyLimits{ + GasLimit: costDbRead*2 + 2, + Timeout: time.Second, + FunctionCallCost: 1, + }) + + require.NoError(t, module.OnBlock(ctx, BlockContext{BlockNumber: 1, ChainID: 42})) +} + +func TestGasLimitExceededOnMemoryGrowth(t *testing.T) { + ctx := context.Background() + db := newTestDB(t) + + module := newStrategyModuleFromBytes(t, ctx, db, memoryGrowGasTestWasm, StrategyLimits{ + GasLimit: 20, + Timeout: time.Second, + FunctionCallCost: 1, + }) + + err := module.OnBlock(ctx, BlockContext{BlockNumber: 1, ChainID: 42}) + require.ErrorIs(t, err, errGasLimitExceeded) +} + +func TestGasLimitMemoryGrowthPass(t *testing.T) { + ctx := context.Background() + db := newTestDB(t) + + module := newStrategyModuleFromBytes(t, ctx, db, memoryGrowGasTestWasm, StrategyLimits{ + GasLimit: 30, + Timeout: time.Second, + FunctionCallCost: 1, + }) + + require.NoError(t, module.OnBlock(ctx, BlockContext{BlockNumber: 1, ChainID: 42})) +} diff --git a/wasmstrategy/listener.go b/wasmstrategy/listener.go new file mode 100644 index 0000000..713365b --- /dev/null +++ b/wasmstrategy/listener.go @@ -0,0 +1,40 @@ +package wasmstrategy + +import ( + "context" + + "github.com/tetratelabs/wazero/api" + "github.com/tetratelabs/wazero/experimental" +) + +type gasListenerFactory struct { + host *hostEnv + cost uint64 +} + +func (f *gasListenerFactory) NewFunctionListener(definition api.FunctionDefinition) experimental.FunctionListener { + // If no gas accounting is configured, skip installing listeners. + if f.cost == 0 || f.host == nil { + return nil + } + + return &gasFunctionListener{ + host: f.host, + cost: f.cost, + name: definition.DebugName(), + } +} + +type gasFunctionListener struct { + host *hostEnv + cost uint64 + name string +} + +func (l *gasFunctionListener) Before(ctx context.Context, mod api.Module, _ api.FunctionDefinition, _ []uint64, _ experimental.StackIterator) { + l.host.charge(ctx, mod, l.cost, l.name) +} + +func (l *gasFunctionListener) After(context.Context, api.Module, api.FunctionDefinition, []uint64) {} + +func (l *gasFunctionListener) Abort(context.Context, api.Module, api.FunctionDefinition, error) {} diff --git a/wasmstrategy/manager.go b/wasmstrategy/manager.go new file mode 100644 index 0000000..0d5c2ac --- /dev/null +++ b/wasmstrategy/manager.go @@ -0,0 +1,320 @@ +package wasmstrategy + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/rs/zerolog" + + "github.com/0xAtelerix/example/application" + "github.com/0xAtelerix/sdk/gosdk" + "github.com/0xAtelerix/sdk/gosdk/apptypes" +) + +// ManagerConfig wires the WASM runtime into the Pelagos example node. +type ManagerConfig struct { + Logger *zerolog.Logger + DB kv.RwDB + Multichain *gosdk.MultichainStateAccess + StrategyDir string + ReloadInterval time.Duration + AddressBook map[string]AddressID + ChainID apptypes.ChainType + Limits StrategyLimits + MaxParallel int +} + +type strategyHandle struct { + module *StrategyModule + hash string +} + +// Manager owns the wasm modules and implements gosdk.BlockObserver. +type Manager struct { + logger *zerolog.Logger + multichain *gosdk.MultichainStateAccess + addressBook map[string]AddressID + chainID apptypes.ChainType + + strategyDir string + reloadInterval time.Duration + limits StrategyLimits + maxParallel int + + mu sync.RWMutex + modules map[string]*strategyHandle + lastReload time.Time + db kv.RwDB +} + +// NewManager loads all WASM artifacts available in StrategyDir and keeps them in sync. +func NewManager(ctx context.Context, cfg ManagerConfig) (*Manager, error) { + if cfg.Multichain == nil { + return nil, fmt.Errorf("multichain state access is required") + } + + if cfg.DB == nil { + return nil, fmt.Errorf("database handle is required") + } + + if cfg.StrategyDir == "" { + return nil, fmt.Errorf("strategy directory must be set") + } + + if _, err := os.Stat(cfg.StrategyDir); err != nil { + return nil, fmt.Errorf("strategy dir %q: %w", cfg.StrategyDir, err) + } + + logger := cfg.Logger + if logger == nil { + logger = zerolog.Ctx(ctx) + } + + manager := &Manager{ + logger: logger, + multichain: cfg.Multichain, + addressBook: normalizeAddressBook(cfg.AddressBook), + chainID: cfg.ChainID, + strategyDir: cfg.StrategyDir, + reloadInterval: cfg.ReloadInterval, + limits: cfg.Limits, + maxParallel: cfg.MaxParallel, + modules: make(map[string]*strategyHandle), + db: cfg.DB, + } + + if manager.maxParallel <= 0 { + manager.maxParallel = 4 + } + + if manager.reloadInterval <= 0 { + manager.reloadInterval = 5 * time.Second + } + + if manager.limits.GasLimit == 0 { + manager.limits.GasLimit = defaultGasLimit + } + if manager.limits.FunctionCallCost == 0 { + manager.limits.FunctionCallCost = defaultFunctionCost + } + if manager.limits.FunctionCallCost == 0 { + manager.limits.FunctionCallCost = defaultFunctionCost + } + + if err := manager.reload(ctx); err != nil { + return nil, err + } + + return manager, nil +} + +// Close frees all wasm runtimes. +func (m *Manager) Close(ctx context.Context) { + m.mu.Lock() + defer m.mu.Unlock() + + for id, handle := range m.modules { + if handle.module != nil { + _ = handle.module.Close(ctx) + } + + delete(m.modules, id) + } +} + +// OnBlock wires events and context into all configured WASM strategies. +func (m *Manager) OnBlock(ctx context.Context, meta gosdk.BlockObserverContext[application.Transaction[application.Receipt], application.Receipt]) error { + if err := m.maybeReload(ctx); err != nil { + m.logger.Error().Err(err).Msg("failed to reload strategies") + } + + modules := m.snapshotModules() + if len(modules) == 0 { + return nil + } + + blockCtx := BlockContext{ + BlockNumber: meta.BlockNumber, + ChainID: m.chainID, + Events: m.collectEvents(ctx, meta.Batch.ExternalBlocks), + } + + sem := make(chan struct{}, m.maxParallel) + var wg sync.WaitGroup + + for _, handle := range modules { + wg.Add(1) + go func(h *strategyHandle) { + defer wg.Done() + + sem <- struct{}{} + defer func() { <-sem }() + + if err := h.module.OnBlock(ctx, blockCtx); err != nil { + m.logger.Error().Err(err).Str("strategy", h.module.id).Msg("strategy execution failed") + } + }(handle) + } + + wg.Wait() + + return nil +} + +func (m *Manager) maybeReload(ctx context.Context) error { + m.mu.RLock() + last := m.lastReload + m.mu.RUnlock() + + if time.Since(last) < m.reloadInterval { + return nil + } + + return m.reload(ctx) +} + +func (m *Manager) reload(ctx context.Context) error { + entries, err := os.ReadDir(m.strategyDir) + if err != nil { + return fmt.Errorf("scan strategies dir: %w", err) + } + + newHandles := make(map[string]*strategyHandle, len(entries)) + seen := make(map[string]struct{}, len(entries)) + + for _, entry := range entries { + if entry.IsDir() || filepath.Ext(entry.Name()) != ".wasm" { + continue + } + + id := strings.TrimSuffix(entry.Name(), filepath.Ext(entry.Name())) + path := filepath.Join(m.strategyDir, entry.Name()) + + bytes, readErr := os.ReadFile(path) + if readErr != nil { + m.logger.Error().Err(readErr).Str("strategy", id).Msg("failed to read strategy file") + continue + } + + hash := sha256.Sum256(bytes) + hashHex := hex.EncodeToString(hash[:]) + seen[id] = struct{}{} + + if existing, ok := m.modules[id]; ok && existing.hash == hashHex { + newHandles[id] = existing + continue + } + + module, instErr := NewStrategyModule(ctx, ModuleConfig{ + ID: id, + Wasm: bytes, + DB: m.db, + Logger: m.logger, + Limits: m.limits, + }) + if instErr != nil { + m.logger.Error().Err(instErr).Str("strategy", id).Msg("failed to instantiate strategy") + continue + } + + newHandles[id] = &strategyHandle{ + module: module, + hash: hashHex, + } + + m.logger.Info().Str("strategy", id).Msg("loaded strategy module") + } + + m.mu.Lock() + defer m.mu.Unlock() + + for id, handle := range m.modules { + if _, ok := seen[id]; ok && newHandles[id] == handle { + continue + } + + if handle.module != nil { + _ = handle.module.Close(ctx) + } + } + + m.modules = newHandles + m.lastReload = time.Now() + + return nil +} + +func (m *Manager) snapshotModules() []*strategyHandle { + m.mu.RLock() + defer m.mu.RUnlock() + + out := make([]*strategyHandle, 0, len(m.modules)) + for _, handle := range m.modules { + out = append(out, handle) + } + + return out +} + +func (m *Manager) collectEvents(ctx context.Context, blocks []*apptypes.ExternalBlock) []StrategyEvent { + if len(blocks) == 0 || len(m.addressBook) == 0 { + return nil + } + + var events []StrategyEvent + + for _, blk := range blocks { + if blk == nil { + continue + } + + chainID := apptypes.ChainType(blk.ChainID) + if !gosdk.IsEvmChain(chainID) { + continue + } + + receipts, err := m.multichain.EthReceipts(ctx, *blk) + if err != nil { + m.logger.Debug().Err(err).Uint64("chain", blk.ChainID).Uint64("block", blk.BlockNumber).Msg("missing receipts") + continue + } + + for _, receipt := range receipts { + for _, vlog := range receipt.Logs { + if len(vlog.Topics) == 0 || vlog.Topics[0].Hex() != erc20TransferTopic { + continue + } + + addrID, ok := m.addressBook[normalizeAddress(vlog.Address.Hex())] + if !ok { + continue + } + + events = append(events, StrategyEvent{Kind: EventKindERC20Transfer, Target: addrID}) + } + } + } + + return events +} + +func normalizeAddressBook(in map[string]AddressID) map[string]AddressID { + if len(in) == 0 { + return in + } + + out := make(map[string]AddressID, len(in)) + for k, v := range in { + out[normalizeAddress(k)] = v + } + + return out +} diff --git a/wasmstrategy/testdata/as/forbidden_import.ts b/wasmstrategy/testdata/as/forbidden_import.ts new file mode 100644 index 0000000..8d45f5a --- /dev/null +++ b/wasmstrategy/testdata/as/forbidden_import.ts @@ -0,0 +1,6 @@ +@external("env", "evil") +declare function evil(): void; + +export function on_block(): void { + evil(); +} diff --git a/wasmstrategy/testdata/as/instruction_gas.ts b/wasmstrategy/testdata/as/instruction_gas.ts new file mode 100644 index 0000000..2aaf918 --- /dev/null +++ b/wasmstrategy/testdata/as/instruction_gas.ts @@ -0,0 +1,7 @@ +@external("env", "db_get_u64") +declare function dbGet(slot: i32): i64; + +export function on_block(): void { + dbGet(0); + dbGet(0); +} diff --git a/wasmstrategy/testdata/as/memory_grow.ts b/wasmstrategy/testdata/as/memory_grow.ts new file mode 100644 index 0000000..291676c --- /dev/null +++ b/wasmstrategy/testdata/as/memory_grow.ts @@ -0,0 +1,7 @@ +@external("env", "gas") +declare function useGas(cost: i64): void; + +export function on_block(): void { + useGas(25); + memory.grow(1); +} diff --git a/wasmstrategy/testdata/as/memory_import.ts b/wasmstrategy/testdata/as/memory_import.ts new file mode 100644 index 0000000..c6296fb --- /dev/null +++ b/wasmstrategy/testdata/as/memory_import.ts @@ -0,0 +1 @@ +export function on_block(): void {} diff --git a/wasmstrategy/testdata/as/uniswap_strategy.ts b/wasmstrategy/testdata/as/uniswap_strategy.ts new file mode 100644 index 0000000..24d114f --- /dev/null +++ b/wasmstrategy/testdata/as/uniswap_strategy.ts @@ -0,0 +1,67 @@ +@external("env", "get_block_number") +declare function hostGetBlockNumber(): i64; + +@external("env", "get_event_count") +declare function hostGetEventCount(): i32; + +@external("env", "get_event_kind") +declare function hostGetEventKind(index: i32): i32; + +@external("env", "get_event_address_id") +declare function hostGetEventAddressId(index: i32): i32; + +@external("env", "db_get_u64") +declare function hostDbGet(slot: i32): i64; + +@external("env", "db_put_u64") +declare function hostDbPut(slot: i32, value: i64): void; + +@external("env", "log") +declare function hostLog(level: i32, ptr: usize, len: i32): void; + +const SLOT_LAST_UNI_TRANSFER_BLOCK: i32 = 1; + +enum EventKind { + Transfer = 1, +} + +enum AddressId { + UniswapV2Pair = 1, + UniswapV3Pool = 2, +} + +function logInfo(message: string): void { + const encoded = String.UTF8.encode(message); + hostLog(20, changetype(encoded), encoded.byteLength); +} + +function hasUniswapTransfer(events: i32): bool { + for (let index = 0; index < events; index++) { + if (hostGetEventKind(index) != EventKind.Transfer) { + continue; + } + + const address = hostGetEventAddressId(index); + if (address == AddressId.UniswapV2Pair || address == AddressId.UniswapV3Pool) { + return true; + } + } + + return false; +} + +export function on_block(): void { + const block = hostGetBlockNumber(); + const events = hostGetEventCount(); + + if (hasUniswapTransfer(events)) { + hostDbPut(SLOT_LAST_UNI_TRANSFER_BLOCK, block); + logInfo("Uniswap transfer at block " + block.toString()); + return; + } + + const lastSeen = hostDbGet(SLOT_LAST_UNI_TRANSFER_BLOCK); + if (lastSeen > 0) { + logInfo("No Uniswap transfers since block " + lastSeen.toString()); + } +} diff --git a/wasmstrategy/testdata/forbidden_import.wasm b/wasmstrategy/testdata/forbidden_import.wasm new file mode 100644 index 0000000..705b2e0 Binary files /dev/null and b/wasmstrategy/testdata/forbidden_import.wasm differ diff --git a/wasmstrategy/testdata/instruction_gas.wasm b/wasmstrategy/testdata/instruction_gas.wasm new file mode 100644 index 0000000..fe119b9 Binary files /dev/null and b/wasmstrategy/testdata/instruction_gas.wasm differ diff --git a/wasmstrategy/testdata/memory_grow.wasm b/wasmstrategy/testdata/memory_grow.wasm new file mode 100644 index 0000000..3dd2d13 Binary files /dev/null and b/wasmstrategy/testdata/memory_grow.wasm differ diff --git a/wasmstrategy/testdata/memory_import.wasm b/wasmstrategy/testdata/memory_import.wasm new file mode 100644 index 0000000..f3ae50a Binary files /dev/null and b/wasmstrategy/testdata/memory_import.wasm differ diff --git a/wasmstrategy/testdata/uniswap_strategy.wasm b/wasmstrategy/testdata/uniswap_strategy.wasm new file mode 100644 index 0000000..bcdc62d Binary files /dev/null and b/wasmstrategy/testdata/uniswap_strategy.wasm differ diff --git a/wasmstrategy/types.go b/wasmstrategy/types.go new file mode 100644 index 0000000..d57b4f4 --- /dev/null +++ b/wasmstrategy/types.go @@ -0,0 +1,41 @@ +package wasmstrategy + +import "github.com/0xAtelerix/sdk/gosdk/apptypes" + +// EventKind mirrors AssemblyScript's enum for events surfaced to strategies. +type EventKind int32 + +const ( + EventKindUnknown EventKind = 0 + EventKindERC20Transfer EventKind = 1 +) + +// AddressID classifies contracts and venues that emit events. +type AddressID int32 + +const ( + AddressIDUnknown AddressID = 0 + AddressIDUniswapV2Pair AddressID = 1 + AddressIDUniswapV3Pool AddressID = 2 +) + +// StrategyEvent captures the minimal event metadata sent to WASM. +type StrategyEvent struct { + Kind EventKind + Target AddressID +} + +// BlockContext is stored inside the host functions before invoking WASM. +type BlockContext struct { + BlockNumber uint64 + ChainID apptypes.ChainType + Events []StrategyEvent +} + +const ( + // SlotLastUniTransferBlock mirrors SLOT_LAST_UNI_TRANSFER_BLOCK in AssemblyScript. + SlotLastUniTransferBlock int32 = 1 +) + +// erc20TransferTopic is the Keccak hash of Transfer(address,address,uint256). +const erc20TransferTopic = "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"