Skip to content
2 changes: 1 addition & 1 deletion bft/bft.go
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,7 @@ type (
CommitCertificate(qc *lib.QuorumCertificate, block *lib.Block, blockResult *lib.BlockResult, ts uint64) (err lib.ErrorI)
// GossipBlock() is a P2P call to gossip a completed Quorum Certificate with a Proposal
GossipBlock(certificate *lib.QuorumCertificate, sender []byte, timestamp uint64)
// GossipConsensus() is a P2P call to gossip a completed Quorum Certificate with a Proposal
// GossipConsensus() is a P2P call to gossip a consensus message
GossipConsensus(message *Message, senderPubExclude []byte)
// SendToSelf() is a P2P call to directly send a completed Quorum Certificate to self
SelfSendBlock(qc *lib.QuorumCertificate, timestamp uint64)
Expand Down
12 changes: 8 additions & 4 deletions cmd/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,14 +399,18 @@ func (c *Client) Transaction(tx lib.TransactionI) (hash *string, err lib.ErrorI)
return
}

func (c *Client) Transactions(txs []lib.TransactionI) (hash *string, err lib.ErrorI) {
func (c *Client) Transactions(txs []lib.TransactionI) (hashes []*string, err lib.ErrorI) {
bz, err := lib.MarshalJSON(txs)
if err != nil {
return nil, err
}
hash = new(string)
err = c.post(TxsRouteName, bz, hash)
return
// a single transaction returns a single string hash
if len(txs) == 1 {
hash := new(string)
err = c.post(TxsRouteName, bz, hash)
return []*string{hash}, err
}
return hashes, c.post(TxsRouteName, bz, &hashes)
}

func (c *Client) Keystore() (keystore *crypto.Keystore, err lib.ErrorI) {
Expand Down
9 changes: 7 additions & 2 deletions cmd/rpc/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,18 @@ func (s *Server) Transaction(w http.ResponseWriter, r *http.Request, _ httproute
// Transactions handles multiple transactions in a single request
func (s *Server) Transactions(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
// create a slice to hold the incoming transactions
var txs []lib.TransactionI
var txs []lib.Transaction
// unmarshal the HTTP request body into the transactions slice
if ok := unmarshal(w, r, &txs); !ok {
return
}
// cast txs to lib.TransactionI
txsI := make([]lib.TransactionI, len(txs))
for i := range txs {
txsI[i] = &txs[i]
}
// submit transactions to RPC server
s.submitTxs(w, txs)
s.submitTxs(w, txsI)
}

// Height responds with the next block version
Expand Down
7 changes: 4 additions & 3 deletions cmd/rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (s *Server) startHeapProfiler() {
}
}

// submitTxs submits transactions to the controller and writes http response
// submitTx submits transactions to the controller and writes http response
func (s *Server) submitTxs(w http.ResponseWriter, txs []lib.TransactionI) (ok bool) {
// marshal each transaction to bytes
var txBytes [][]byte
Expand All @@ -215,9 +215,10 @@ func (s *Server) submitTxs(w http.ResponseWriter, txs []lib.TransactionI) (ok bo
return
}
// return hashes of all submitted transactions
var hashes []string
var hashes []*string
for _, bz := range txBytes {
hashes = append(hashes, crypto.HashString(bz))
hash := crypto.HashString(bz)
hashes = append(hashes, &hash)
}
// if only one transaction was submitted, return the hash as a string
if len(hashes) == 1 {
Expand Down
2 changes: 1 addition & 1 deletion controller/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ func (c *Controller) HandlePeerBlock(msg *lib.BlockMessage, syncing bool) (*lib.
result = nil
}
// attempts to commit the QC to persistence of chain by playing it against the state machine
if err = c.CommitCertificate(qc, block, result, msg.Time); err != nil {
if err = c.CommitCertificateParallel(qc, block, result, msg.Time); err != nil {
// exit with error
return nil, err
}
Expand Down
14 changes: 7 additions & 7 deletions controller/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ func (c *Controller) ListenForTx() {
// exit
continue
}
func() {
// check and add the message to the cache to prevent duplicates
if ok := cache.Add(msg); !ok {
// if duplicate, exit
return
}
c.log.Debug("Handling transaction")
// check and add the message to the cache to prevent duplicates
if ok := cache.Add(msg); !ok {
// if duplicate, exit
continue
}
go func() {
// c.log.Debug("Handling transaction async")
// create a convenience variable for the identity of the sender
senderID := msg.Sender.Address.PublicKey
// try to unmarshal the p2p message as a tx message
Expand Down
14 changes: 10 additions & 4 deletions fsm/gov.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,10 +382,16 @@ func (s *StateMachine) ParsePollTransactions(b *lib.BlockResult) {
}
// for each transaction in the block
for _, tx := range b.Transactions {
// get the public key object
pub, e := crypto.NewPublicKeyFromBytes(tx.Transaction.Signature.PublicKey)
if e != nil {
return
// get the public key object from the cache
pub, ok := s.cache.publicKey.Get(string(tx.Transaction.Signature.PublicKey))
if !ok {
var e error
pub, e = crypto.NewPublicKeyFromBytes(tx.Transaction.Signature.PublicKey)
if e != nil {
return
}
// add the public key to the cache
s.cache.publicKey.Add(string(tx.Transaction.Signature.PublicKey), pub)
}
// check for a poll transaction
if err := ap.CheckForPollTransaction(pub.Address(), tx.Transaction.Memo, s.Height()); err != nil {
Expand Down
15 changes: 10 additions & 5 deletions fsm/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/canopy-network/canopy/lib"
"github.com/canopy-network/canopy/lib/crypto"
lru "github.com/hashicorp/golang-lru/v2"
)

const (
Expand Down Expand Up @@ -39,14 +40,16 @@ type StateMachine struct {

// cache is the set of items to be cached used by the state machine
type cache struct {
accounts map[uint64]*Account // cache of accounts accessed
feeParams *FeeParams // fee params for the current block
valParams *ValidatorParams // validator params for the current block
accounts map[uint64]*Account // cache of accounts accessed
feeParams *FeeParams // fee params for the current block
valParams *ValidatorParams // validator params for the current block
publicKey *lru.Cache[string, crypto.PublicKeyI] // public keys for block processing
}

// New() creates a new instance of a StateMachine
func New(c lib.Config, store lib.StoreI, plugin *lib.Plugin, metrics *lib.Metrics, log lib.LoggerI) (*StateMachine, lib.ErrorI) {
// create the state machine object reference
publicKeyCache, _ := lru.New[string, crypto.PublicKeyI](10_000)
sm := &StateMachine{
store: nil,
ProtocolVersion: CurrentProtocolVersion,
Expand All @@ -59,7 +62,8 @@ func New(c lib.Config, store lib.StoreI, plugin *lib.Plugin, metrics *lib.Metric
log: log,
events: new(lib.EventsTracker),
cache: &cache{
accounts: make(map[uint64]*Account),
accounts: make(map[uint64]*Account),
publicKey: publicKeyCache,
},
}
// initialize the state machine
Expand Down Expand Up @@ -520,7 +524,8 @@ func (s *StateMachine) Copy() (*StateMachine, lib.ErrorI) {
Plugin: s.Plugin,
log: s.log,
cache: &cache{
accounts: make(map[uint64]*Account),
accounts: make(map[uint64]*Account),
publicKey: s.cache.publicKey,
},
LastValidatorSet: s.LastValidatorSet,
}, nil
Expand Down
5 changes: 4 additions & 1 deletion fsm/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/canopy-network/canopy/lib"
"github.com/canopy-network/canopy/lib/crypto"
"github.com/canopy-network/canopy/store"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -335,6 +336,7 @@ func newTestStateMachine(t *testing.T) StateMachine {
log := lib.NewDefaultLogger()
db, err := store.NewStoreInMemory(log)
require.NoError(t, err)
publicKeyCache, _ := lru.New[string, crypto.PublicKeyI](10_000)
sm := StateMachine{
store: db,
ProtocolVersion: 0,
Expand All @@ -350,7 +352,8 @@ func newTestStateMachine(t *testing.T) StateMachine {
events: new(lib.EventsTracker),
log: log,
cache: &cache{
accounts: make(map[uint64]*Account),
accounts: make(map[uint64]*Account),
publicKey: publicKeyCache,
},
}
require.NoError(t, sm.SetParams(DefaultParams()))
Expand Down
5 changes: 3 additions & 2 deletions fsm/swap.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package fsm
import (
"bytes"
"encoding/json"
"sort"

"github.com/canopy-network/canopy/lib"
"github.com/canopy-network/canopy/lib/crypto"
"sort"
)

/* This file contains state machine changes related to 'token swapping' */
Expand Down Expand Up @@ -77,7 +78,7 @@ func (s *StateMachine) ParseCloseOrder(tx *lib.Transaction) (co *lib.CloseOrder,
// ProcessRootChainOrderBook() processes the order book from the root-chain and cross-references blocks on this chain to determine
// actions that warrant committee level changes to the root-chain order book like: LockOrder, ResetOrder and CloseOrder
func (s *StateMachine) ProcessRootChainOrderBook(book *lib.OrderBook, proposalBlock *lib.BlockResult) (lockOrders []*lib.LockOrder, closedOrders, resetOrders [][]byte) {
if book == nil {
if book == nil || len(book.Orders) == 0 {
return
}
blocks := []*lib.BlockResult{proposalBlock}
Expand Down
18 changes: 13 additions & 5 deletions fsm/transaction.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package fsm

import (
"time"

"github.com/canopy-network/canopy/lib"
"github.com/canopy-network/canopy/lib/crypto"
"google.golang.org/protobuf/types/known/anypb"
"time"
)

/* This file contains transaction handling logic - for the payload handling check message.go */
Expand Down Expand Up @@ -151,10 +152,17 @@ func (s *StateMachine) CheckSignature(tx *lib.Transaction, authorizedSigners [][
if err != nil {
return nil, ErrTxSignBytes(err)
}
// convert signature bytes to public key object
publicKey, e := crypto.NewPublicKeyFromBytes(tx.Signature.PublicKey)
if e != nil {
return nil, ErrInvalidPublicKey(e)
// try to obtain the public key from the cache
var e error
publicKey, ok := s.cache.publicKey.Get(string(tx.Signature.PublicKey))
if !ok {
// convert signature bytes to public key object
publicKey, e = crypto.NewPublicKeyFromBytes(tx.Signature.PublicKey)
if e != nil {
return nil, ErrInvalidPublicKey(e)
}
// add the public key to the cache
s.cache.publicKey.Add(string(tx.Signature.PublicKey), publicKey)
}
// special case: check for a special RLP transaction
if _, hasEthPubKey := publicKey.(*crypto.ETHSECP256K1PublicKey); hasEthPubKey && tx.Memo == RLPIndicator {
Expand Down
3 changes: 2 additions & 1 deletion lib/mempool.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package lib

import (
"github.com/canopy-network/canopy/lib/crypto"
"maps"
"math"
"sort"
"sync"
"time"

"github.com/canopy-network/canopy/lib/crypto"
)

/* This file defines and implements a mempool that maintains an ordered list of 'valid, pending to be included' transactions in memory */
Expand Down
2 changes: 1 addition & 1 deletion store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ func (s *Store) Root() (root []byte, err lib.ErrorI) {
// set up the state commit store
s.sc = NewDefaultSMT(NewTxn(s.ss.reader, s.ss.writer, stateCommitIDPrefix, false, false, true, nextVersion))
// commit the SMT directly using the txn ops
if err = s.sc.Commit(s.ss.txn.ops); err != nil {
if err = s.sc.CommitParallel(s.ss.txn.ops); err != nil {
return nil, err
}
}
Expand Down
Loading