diff --git a/bft/bft.go b/bft/bft.go index 9f5c4984..fd01f219 100644 --- a/bft/bft.go +++ b/bft/bft.go @@ -943,7 +943,7 @@ type ( CommitCertificate(qc *lib.QuorumCertificate, block *lib.Block, blockResult *lib.BlockResult, ts uint64) (err lib.ErrorI) // GossipBlock() is a P2P call to gossip a completed Quorum Certificate with a Proposal GossipBlock(certificate *lib.QuorumCertificate, sender []byte, timestamp uint64) - // GossipConsensus() is a P2P call to gossip a completed Quorum Certificate with a Proposal + // GossipConsensus() is a P2P call to gossip a consensus message GossipConsensus(message *Message, senderPubExclude []byte) // SendToSelf() is a P2P call to directly send a completed Quorum Certificate to self SelfSendBlock(qc *lib.QuorumCertificate, timestamp uint64) diff --git a/cmd/rpc/client.go b/cmd/rpc/client.go index 9abb925b..7341164f 100644 --- a/cmd/rpc/client.go +++ b/cmd/rpc/client.go @@ -399,14 +399,18 @@ func (c *Client) Transaction(tx lib.TransactionI) (hash *string, err lib.ErrorI) return } -func (c *Client) Transactions(txs []lib.TransactionI) (hash *string, err lib.ErrorI) { +func (c *Client) Transactions(txs []lib.TransactionI) (hashes []*string, err lib.ErrorI) { bz, err := lib.MarshalJSON(txs) if err != nil { return nil, err } - hash = new(string) - err = c.post(TxsRouteName, bz, hash) - return + // a single transaction returns a single string hash + if len(txs) == 1 { + hash := new(string) + err = c.post(TxsRouteName, bz, hash) + return []*string{hash}, err + } + return hashes, c.post(TxsRouteName, bz, &hashes) } func (c *Client) Keystore() (keystore *crypto.Keystore, err lib.ErrorI) { diff --git a/cmd/rpc/query.go b/cmd/rpc/query.go index 59740775..79878843 100644 --- a/cmd/rpc/query.go +++ b/cmd/rpc/query.go @@ -38,13 +38,18 @@ func (s *Server) Transaction(w http.ResponseWriter, r *http.Request, _ httproute // Transactions handles multiple transactions in a single request func (s *Server) Transactions(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { // create a slice to hold the incoming transactions - var txs []lib.TransactionI + var txs []lib.Transaction // unmarshal the HTTP request body into the transactions slice if ok := unmarshal(w, r, &txs); !ok { return } + // cast txs to lib.TransactionI + txsI := make([]lib.TransactionI, len(txs)) + for i := range txs { + txsI[i] = &txs[i] + } // submit transactions to RPC server - s.submitTxs(w, txs) + s.submitTxs(w, txsI) } // Height responds with the next block version diff --git a/cmd/rpc/server.go b/cmd/rpc/server.go index 7f47afd9..65280a71 100644 --- a/cmd/rpc/server.go +++ b/cmd/rpc/server.go @@ -197,7 +197,7 @@ func (s *Server) startHeapProfiler() { } } -// submitTxs submits transactions to the controller and writes http response +// submitTx submits transactions to the controller and writes http response func (s *Server) submitTxs(w http.ResponseWriter, txs []lib.TransactionI) (ok bool) { // marshal each transaction to bytes var txBytes [][]byte @@ -215,9 +215,10 @@ func (s *Server) submitTxs(w http.ResponseWriter, txs []lib.TransactionI) (ok bo return } // return hashes of all submitted transactions - var hashes []string + var hashes []*string for _, bz := range txBytes { - hashes = append(hashes, crypto.HashString(bz)) + hash := crypto.HashString(bz) + hashes = append(hashes, &hash) } // if only one transaction was submitted, return the hash as a string if len(hashes) == 1 { diff --git a/controller/block.go b/controller/block.go index 5b49b2bd..418cd983 100644 --- a/controller/block.go +++ b/controller/block.go @@ -573,7 +573,7 @@ func (c *Controller) HandlePeerBlock(msg *lib.BlockMessage, syncing bool) (*lib. result = nil } // attempts to commit the QC to persistence of chain by playing it against the state machine - if err = c.CommitCertificate(qc, block, result, msg.Time); err != nil { + if err = c.CommitCertificateParallel(qc, block, result, msg.Time); err != nil { // exit with error return nil, err } diff --git a/controller/tx.go b/controller/tx.go index c500a09c..c1907ed3 100644 --- a/controller/tx.go +++ b/controller/tx.go @@ -38,13 +38,13 @@ func (c *Controller) ListenForTx() { // exit continue } - func() { - // check and add the message to the cache to prevent duplicates - if ok := cache.Add(msg); !ok { - // if duplicate, exit - return - } - c.log.Debug("Handling transaction") + // check and add the message to the cache to prevent duplicates + if ok := cache.Add(msg); !ok { + // if duplicate, exit + continue + } + go func() { + // c.log.Debug("Handling transaction async") // create a convenience variable for the identity of the sender senderID := msg.Sender.Address.PublicKey // try to unmarshal the p2p message as a tx message diff --git a/fsm/gov.go b/fsm/gov.go index 444c3c8d..9880436f 100644 --- a/fsm/gov.go +++ b/fsm/gov.go @@ -382,10 +382,16 @@ func (s *StateMachine) ParsePollTransactions(b *lib.BlockResult) { } // for each transaction in the block for _, tx := range b.Transactions { - // get the public key object - pub, e := crypto.NewPublicKeyFromBytes(tx.Transaction.Signature.PublicKey) - if e != nil { - return + // get the public key object from the cache + pub, ok := s.cache.publicKey.Get(string(tx.Transaction.Signature.PublicKey)) + if !ok { + var e error + pub, e = crypto.NewPublicKeyFromBytes(tx.Transaction.Signature.PublicKey) + if e != nil { + return + } + // add the public key to the cache + s.cache.publicKey.Add(string(tx.Transaction.Signature.PublicKey), pub) } // check for a poll transaction if err := ap.CheckForPollTransaction(pub.Address(), tx.Transaction.Memo, s.Height()); err != nil { diff --git a/fsm/state.go b/fsm/state.go index 23aa68a1..6f545b8f 100644 --- a/fsm/state.go +++ b/fsm/state.go @@ -9,6 +9,7 @@ import ( "github.com/canopy-network/canopy/lib" "github.com/canopy-network/canopy/lib/crypto" + lru "github.com/hashicorp/golang-lru/v2" ) const ( @@ -39,14 +40,16 @@ type StateMachine struct { // cache is the set of items to be cached used by the state machine type cache struct { - accounts map[uint64]*Account // cache of accounts accessed - feeParams *FeeParams // fee params for the current block - valParams *ValidatorParams // validator params for the current block + accounts map[uint64]*Account // cache of accounts accessed + feeParams *FeeParams // fee params for the current block + valParams *ValidatorParams // validator params for the current block + publicKey *lru.Cache[string, crypto.PublicKeyI] // public keys for block processing } // New() creates a new instance of a StateMachine func New(c lib.Config, store lib.StoreI, plugin *lib.Plugin, metrics *lib.Metrics, log lib.LoggerI) (*StateMachine, lib.ErrorI) { // create the state machine object reference + publicKeyCache, _ := lru.New[string, crypto.PublicKeyI](10_000) sm := &StateMachine{ store: nil, ProtocolVersion: CurrentProtocolVersion, @@ -59,7 +62,8 @@ func New(c lib.Config, store lib.StoreI, plugin *lib.Plugin, metrics *lib.Metric log: log, events: new(lib.EventsTracker), cache: &cache{ - accounts: make(map[uint64]*Account), + accounts: make(map[uint64]*Account), + publicKey: publicKeyCache, }, } // initialize the state machine @@ -520,7 +524,8 @@ func (s *StateMachine) Copy() (*StateMachine, lib.ErrorI) { Plugin: s.Plugin, log: s.log, cache: &cache{ - accounts: make(map[uint64]*Account), + accounts: make(map[uint64]*Account), + publicKey: s.cache.publicKey, }, LastValidatorSet: s.LastValidatorSet, }, nil diff --git a/fsm/state_test.go b/fsm/state_test.go index 7b296ddb..68986024 100644 --- a/fsm/state_test.go +++ b/fsm/state_test.go @@ -10,6 +10,7 @@ import ( "github.com/canopy-network/canopy/lib" "github.com/canopy-network/canopy/lib/crypto" "github.com/canopy-network/canopy/store" + lru "github.com/hashicorp/golang-lru/v2" "github.com/stretchr/testify/require" ) @@ -335,6 +336,7 @@ func newTestStateMachine(t *testing.T) StateMachine { log := lib.NewDefaultLogger() db, err := store.NewStoreInMemory(log) require.NoError(t, err) + publicKeyCache, _ := lru.New[string, crypto.PublicKeyI](10_000) sm := StateMachine{ store: db, ProtocolVersion: 0, @@ -350,7 +352,8 @@ func newTestStateMachine(t *testing.T) StateMachine { events: new(lib.EventsTracker), log: log, cache: &cache{ - accounts: make(map[uint64]*Account), + accounts: make(map[uint64]*Account), + publicKey: publicKeyCache, }, } require.NoError(t, sm.SetParams(DefaultParams())) diff --git a/fsm/swap.go b/fsm/swap.go index 2ed27020..c8e09638 100644 --- a/fsm/swap.go +++ b/fsm/swap.go @@ -3,9 +3,10 @@ package fsm import ( "bytes" "encoding/json" + "sort" + "github.com/canopy-network/canopy/lib" "github.com/canopy-network/canopy/lib/crypto" - "sort" ) /* This file contains state machine changes related to 'token swapping' */ @@ -77,7 +78,7 @@ func (s *StateMachine) ParseCloseOrder(tx *lib.Transaction) (co *lib.CloseOrder, // ProcessRootChainOrderBook() processes the order book from the root-chain and cross-references blocks on this chain to determine // actions that warrant committee level changes to the root-chain order book like: LockOrder, ResetOrder and CloseOrder func (s *StateMachine) ProcessRootChainOrderBook(book *lib.OrderBook, proposalBlock *lib.BlockResult) (lockOrders []*lib.LockOrder, closedOrders, resetOrders [][]byte) { - if book == nil { + if book == nil || len(book.Orders) == 0 { return } blocks := []*lib.BlockResult{proposalBlock} diff --git a/fsm/transaction.go b/fsm/transaction.go index cf297839..1c1f47de 100644 --- a/fsm/transaction.go +++ b/fsm/transaction.go @@ -1,10 +1,11 @@ package fsm import ( + "time" + "github.com/canopy-network/canopy/lib" "github.com/canopy-network/canopy/lib/crypto" "google.golang.org/protobuf/types/known/anypb" - "time" ) /* This file contains transaction handling logic - for the payload handling check message.go */ @@ -151,10 +152,17 @@ func (s *StateMachine) CheckSignature(tx *lib.Transaction, authorizedSigners [][ if err != nil { return nil, ErrTxSignBytes(err) } - // convert signature bytes to public key object - publicKey, e := crypto.NewPublicKeyFromBytes(tx.Signature.PublicKey) - if e != nil { - return nil, ErrInvalidPublicKey(e) + // try to obtain the public key from the cache + var e error + publicKey, ok := s.cache.publicKey.Get(string(tx.Signature.PublicKey)) + if !ok { + // convert signature bytes to public key object + publicKey, e = crypto.NewPublicKeyFromBytes(tx.Signature.PublicKey) + if e != nil { + return nil, ErrInvalidPublicKey(e) + } + // add the public key to the cache + s.cache.publicKey.Add(string(tx.Signature.PublicKey), publicKey) } // special case: check for a special RLP transaction if _, hasEthPubKey := publicKey.(*crypto.ETHSECP256K1PublicKey); hasEthPubKey && tx.Memo == RLPIndicator { diff --git a/lib/mempool.go b/lib/mempool.go index 387f979d..c22a2b26 100644 --- a/lib/mempool.go +++ b/lib/mempool.go @@ -1,12 +1,13 @@ package lib import ( - "github.com/canopy-network/canopy/lib/crypto" "maps" "math" "sort" "sync" "time" + + "github.com/canopy-network/canopy/lib/crypto" ) /* This file defines and implements a mempool that maintains an ordered list of 'valid, pending to be included' transactions in memory */ diff --git a/store/store.go b/store/store.go index fb3e68d6..88e8707b 100644 --- a/store/store.go +++ b/store/store.go @@ -353,7 +353,7 @@ func (s *Store) Root() (root []byte, err lib.ErrorI) { // set up the state commit store s.sc = NewDefaultSMT(NewTxn(s.ss.reader, s.ss.writer, stateCommitIDPrefix, false, false, true, nextVersion)) // commit the SMT directly using the txn ops - if err = s.sc.Commit(s.ss.txn.ops); err != nil { + if err = s.sc.CommitParallel(s.ss.txn.ops); err != nil { return nil, err } }