Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 19 additions & 30 deletions entries/epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ type EpochEntry struct {
InitialBlockHeight uint64
InitialView uint64
FinalBlockHeight uint64
CreatedAtBlockTimestampNanoSecs uint64

BadgerKey []byte `pg:",pk,use_zero"`
CreatedAtBlockTimestampNanoSecs int64
SnapshotAtEpochNumber uint64
}

type PGEpochEntry struct {
Expand All @@ -33,12 +32,19 @@ type PGEpochUtxoOps struct {

// Convert the EpochEntry DeSo encoder to the PGEpochEntry struct used by bun.
func EpochEntryEncoderToPGStruct(epochEntry *lib.EpochEntry, keyBytes []byte, params *lib.DeSoParams) EpochEntry {

var snapshotAtEpochNumber uint64
// Epochs use data snapshotted from two epochs ago. Epochs 0 and 1 use data from epoch 0.
if epochEntry.EpochNumber >= 2 {
snapshotAtEpochNumber = epochEntry.EpochNumber - 2
}
return EpochEntry{
EpochNumber: epochEntry.EpochNumber,
InitialBlockHeight: epochEntry.InitialBlockHeight,
InitialView: epochEntry.InitialView,
FinalBlockHeight: epochEntry.FinalBlockHeight,
BadgerKey: keyBytes,
EpochNumber: epochEntry.EpochNumber,
InitialBlockHeight: epochEntry.InitialBlockHeight,
InitialView: epochEntry.InitialView,
FinalBlockHeight: epochEntry.FinalBlockHeight,
CreatedAtBlockTimestampNanoSecs: epochEntry.CreatedAtBlockTimestampNanoSecs,
SnapshotAtEpochNumber: snapshotAtEpochNumber,
}
}

Expand All @@ -49,8 +55,11 @@ func EpochEntryBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, param
// We also ensure before this that all entries have the same operation type.
operationType := entries[0].OperationType
var err error
// Core only tracks the current epoch entry and never deletes them.
// In order to track all historical epoch entries, we don't use the badger
// key to uniquely identify them, but rather the epoch number.
if operationType == lib.DbOperationTypeDelete {
err = bulkDeleteEpochEntry(entries, db, operationType)
return errors.Wrapf(err, "entries.EpochEntryBatchOperation: Delete operation type not supported")
} else {
err = bulkInsertEpochEntry(entries, db, operationType, params)
}
Expand All @@ -76,31 +85,11 @@ func bulkInsertEpochEntry(entries []*lib.StateChangeEntry, db *bun.DB, operation
query := db.NewInsert().Model(&pgEntrySlice)

if operationType == lib.DbOperationTypeUpsert {
query = query.On("CONFLICT (badger_key) DO UPDATE")
query = query.On("CONFLICT (epoch_number) DO UPDATE")
}

if _, err := query.Returning("").Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkInsertEpochEntry: Error inserting entries")
}
return nil
}

// bulkDeleteEpochEntry deletes a batch of locked stake entries from the database.
func bulkDeleteEpochEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)

// Transform the entries into a list of keys to delete.
keysToDelete := consumer.KeysToDelete(uniqueEntries)

// Execute the delete query.
if _, err := db.NewDelete().
Model(&PGEpochEntry{}).
Where("badger_key IN (?)", bun.In(keysToDelete)).
Returning("").
Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkDeleteEpochEntry: Error deleting entries")
}

return nil
}
109 changes: 106 additions & 3 deletions entries/pkid.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"github.com/deso-protocol/core/lib"
"github.com/deso-protocol/state-consumer/consumer"
"github.com/golang/glog"
"github.com/pkg/errors"
"github.com/uptrace/bun"
)
Expand All @@ -25,18 +26,48 @@ type PGPkidEntryUtxoOps struct {
UtxoOperation
}

type LeaderScheduleEntry struct {
SnapshotAtEpochNumber uint64 `pg:",use_zero"`
LeaderIndex uint16 `pg:",use_zero"`
ValidatorPKID string `pg:",use_zero"`
BadgerKey []byte `pg:",pk,use_zero"`
}

type PGLeaderScheduleEntry struct {
bun.BaseModel `bun:"table:leader_schedule_entry"`
LeaderScheduleEntry
}

// Convert the Diamond DeSo encoder to the PG struct used by bun.
func PkidEncoderToPGStruct(pkidEntry *lib.PKIDEntry, keyBytes []byte, params *lib.DeSoParams) PkidEntry {
func PkidEntryEncoderToPGStruct(pkidEntry *lib.PKIDEntry, keyBytes []byte, params *lib.DeSoParams) PkidEntry {
return PkidEntry{
Pkid: consumer.PublicKeyBytesToBase58Check(pkidEntry.PKID[:], params),
PublicKey: consumer.PublicKeyBytesToBase58Check(pkidEntry.PublicKey[:], params),
BadgerKey: keyBytes,
}
}

// Convert the leader schedule entry to the PG struct used by bun.
func LeaderScheduleEncoderToPGStruct(validatorPKID *lib.PKID, keyBytes []byte, params *lib.DeSoParams,
) *LeaderScheduleEntry {
prefixRemovedKeyBytes := keyBytes[1:]
if len(prefixRemovedKeyBytes) != 10 {
glog.Errorf("LeaderScheduleEncoderToPGStruct: Invalid key length: %d", len(prefixRemovedKeyBytes))
return nil
}
epochNumber := lib.DecodeUint64(prefixRemovedKeyBytes[:8])
leaderIndex := lib.DecodeUint16(prefixRemovedKeyBytes[8:10])
return &LeaderScheduleEntry{
ValidatorPKID: consumer.PublicKeyBytesToBase58Check(validatorPKID[:], params),
SnapshotAtEpochNumber: epochNumber,
LeaderIndex: leaderIndex,
BadgerKey: keyBytes,
}
}

// PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler
// based on the operation type and executes it.
func PkidBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error {
func PkidEntryBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error {
// We check before we call this function that there is at least one operation type.
// We also ensure before this that all entries have the same operation type.
operationType := entries[0].OperationType
Expand All @@ -61,7 +92,7 @@ func bulkInsertPkidEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationT

// Loop through the entries and convert them to PGPostEntry.
for ii, entry := range uniqueEntries {
pgEntrySlice[ii] = &PGPkidEntry{PkidEntry: PkidEncoderToPGStruct(entry.Encoder.(*lib.PKIDEntry), entry.KeyBytes, params)}
pgEntrySlice[ii] = &PGPkidEntry{PkidEntry: PkidEntryEncoderToPGStruct(entry.Encoder.(*lib.PKIDEntry), entry.KeyBytes, params)}
}

query := db.NewInsert().Model(&pgEntrySlice)
Expand Down Expand Up @@ -95,3 +126,75 @@ func bulkDeletePkidEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationT

return nil
}

func PkidBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error {
// We check before we call this function that there is at least one operation type.
// We also ensure before this that all entries have the same operation type.
operationType := entries[0].OperationType
var err error
if operationType == lib.DbOperationTypeDelete {
err = bulkDeletePkid(entries, db, operationType)
} else {
err = bulkInsertPkid(entries, db, operationType, params)
}
if err != nil {
return errors.Wrapf(err, "entries.PostBatchOperation: Problem with operation type %v", operationType)
}
return nil
}

// bulkInsertPkid inserts a batch of PKIDs into the database.
func bulkInsertPkid(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)

uniqueLeaderScheduleEntries := consumer.FilterEntriesByPrefix(
uniqueEntries, lib.Prefixes.PrefixSnapshotLeaderSchedule)
// NOTE: if we need to support parsing other indexes for PKIDs beyond LeaderSchedule,
// we will need to filter the uniqueEntries by the appropriate prefix and then convert
// the entries to the appropriate PG struct.
// Create a new array to hold the bun struct.
pgEntrySlice := make([]*PGLeaderScheduleEntry, len(uniqueLeaderScheduleEntries))

// Loop through the entries and convert them to PGPostEntry.
for ii, entry := range uniqueLeaderScheduleEntries {
leaderScheduleEntry := LeaderScheduleEncoderToPGStruct(entry.Encoder.(*lib.PKID), entry.KeyBytes, params)
if leaderScheduleEntry == nil {
glog.Errorf("bulkInsertPkid: Error converting LeaderScheduleEntry to PG struct")
continue
}
pgEntrySlice[ii] = &PGLeaderScheduleEntry{LeaderScheduleEntry: *leaderScheduleEntry}
}

query := db.NewInsert().Model(&pgEntrySlice)

if operationType == lib.DbOperationTypeUpsert {
query = query.On("CONFLICT (badger_key) DO UPDATE")
}

if _, err := query.Returning("").Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkInsertPkid: Error inserting entries")
}
return nil
}

// bulkDeletePKID deletes a batch of PKIDs from the database.
func bulkDeletePkid(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)

// Transform the entries into a list of keys to delete.
keysToDelete := consumer.KeysToDelete(uniqueEntries)
leaderSchedKeysToDelete := consumer.FilterKeysByPrefix(keysToDelete, lib.Prefixes.PrefixSnapshotLeaderSchedule)

// Execute the delete query.
if _, err := db.NewDelete().
Model(&LeaderScheduleEntry{}).
Where("badger_key IN (?)", bun.In(leaderSchedKeysToDelete)).
Returning("").
Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkDeletePkid: Error deleting entries")
}

return nil
}
4 changes: 3 additions & 1 deletion handler/data_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (postgresDataHandler *PostgresDataHandler) HandleEntryBatch(batchedEntries
case lib.EncoderTypePostAssociationEntry:
err = entries.PostAssociationBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params)
case lib.EncoderTypePKIDEntry:
err = entries.PkidBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params)
err = entries.PkidEntryBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params)
case lib.EncoderTypeDeSoBalanceEntry:
err = entries.DesoBalanceBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params)
case lib.EncoderTypeDAOCoinLimitOrderEntry:
Expand All @@ -86,6 +86,8 @@ func (postgresDataHandler *PostgresDataHandler) HandleEntryBatch(batchedEntries
err = entries.LockupYieldCurvePointBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params)
case lib.EncoderTypeEpochEntry:
err = entries.EpochEntryBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params)
case lib.EncoderTypePKID:
err = entries.PkidBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params)
}

if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,18 @@ import (
func createEpochEntryTable(db *bun.DB, tableName string) error {
_, err := db.Exec(strings.Replace(`
CREATE TABLE {tableName} (
epoch_number BIGINT NOT NULL,
epoch_number BIGINT PRIMARY KEY NOT NULL,
initial_block_height BIGINT NOT NULL,
initial_view BIGINT NOT NULL,
final_block_height BIGINT NOT NULL,
created_at_block_timestamp_nano_secs BIGINT NOT NULL,

badger_key BYTEA PRIMARY KEY
snapshot_at_epoch_number BIGINT NOT NULL
);

CREATE INDEX {tableName}_epoch_number_idx ON {tableName} (epoch_number);
CREATE INDEX {tableName}_initial_block_height_idx ON {tableName} (initial_block_height);
CREATE INDEX {tableName}_final_block_height_idx ON {tableName} (final_block_height);
CREATE INDEX {tableName}_snapshot_at_epoch_number_idx ON {tableName} (snapshot_at_epoch_number);
`, "{tableName}", tableName, -1))
// TODO: What other fields do we need indexed?
return err
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package initial_migrations

import (
"context"
"strings"

"github.com/uptrace/bun"
)

func createLeaderScheduleTable(db *bun.DB, tableName string) error {
_, err := db.Exec(strings.Replace(`
CREATE TABLE {tableName} (
validator_pkid VARCHAR NOT NULL,
snapshot_at_epoch_number BIGINT NOT NULL,
leader_index INTEGER NOT NULL,
badger_key BYTEA PRIMARY KEY NOT NULL
);
CREATE INDEX {tableName}_validator_pkid_idx ON {tableName} (validator_pkid);
CREATE INDEX {tableName}_snapshot_at_epoch_number_idx ON {tableName} (snapshot_at_epoch_number);
CREATE INDEX {tableName}_snapshot_at_epoch_number_leader_index_idx ON {tableName} (snapshot_at_epoch_number, leader_index);
`, "{tableName}", tableName, -1))
return err
}

func init() {
Migrations.MustRegister(func(ctx context.Context, db *bun.DB) error {
return createLeaderScheduleTable(db, "leader_schedule_entry")
}, func(ctx context.Context, db *bun.DB) error {
_, err := db.Exec(`
DROP TABLE IF EXISTS leader_schedule_entry;
`)
if err != nil {
return err
}
return nil
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ func init() {
comment on column locked_stake_entry.badger_key is E'@omit';
comment on column yield_curve_point.badger_key is E'@omit';
comment on column locked_balance_entry.badger_key is E'@omit';
comment on column epoch_entry.badger_key is E'@omit';
comment on table transaction_partition_34 is E'@omit';
comment on table transaction_partition_35 is E'@omit';
comment on table transaction_partition_36 is E'@omit';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package post_sync_migrations

import (
"context"

"github.com/uptrace/bun"
)

func init() {
Migrations.MustRegister(func(ctx context.Context, db *bun.DB) error {
_, err := db.Exec(`
comment on table leader_schedule_entry is E'@foreignKey (validator_pkid) references account (pkid)|@foreignFieldName leaderScheduleEntries|@fieldName leaderAccount\n@foreignKey (validator_pkid) references validator_entry (validator_pkid)|@foreignFieldName leaderScheduleEntries|@fieldName validatorEntry\n@foreignKey (snapshot_at_epoch_number) references epoch_entry (snapshot_at_epoch_number)|@foreignFieldName leaderScheduleEntries|@fieldName epochEntryBySnapshot';
comment on column leader_schedule_entry.badger_key is E'@omit';
`)
if err != nil {
return err
}

return nil
}, func(ctx context.Context, db *bun.DB) error {
_, err := db.Exec(`
comment on table leader_schedule_entry is NULL;
comment on column leader_schedule_entry.badger_key is NULL;
`)
if err != nil {
return err
}

return nil
})
}