summaryrefslogtreecommitdiff
path: root/node/node.go
diff options
context:
space:
mode:
Diffstat (limited to 'node/node.go')
-rw-r--r--node/node.go195
1 files changed, 187 insertions, 8 deletions
diff --git a/node/node.go b/node/node.go
index e0d2f437f..e5a2f7e6a 100644
--- a/node/node.go
+++ b/node/node.go
@@ -19,10 +19,12 @@ package node
import (
"context"
+ "errors"
"fmt"
"io/ioutil"
"os"
"path/filepath"
+ "strings"
"sync"
"time"
@@ -53,6 +55,7 @@ import (
"github.com/algorand/go-algorand/util/metrics"
"github.com/algorand/go-algorand/util/timers"
"github.com/algorand/go-deadlock"
+ uuid "github.com/satori/go.uuid"
)
// StatusReport represents the current basic status of the node
@@ -177,7 +180,6 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd
}
p2pNode.SetPrioScheme(node)
node.net = p2pNode
- node.accountManager = data.MakeAccountManager(log)
accountListener := makeTopAccountListener(log)
@@ -267,6 +269,13 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd
node.catchupService = catchup.MakeService(node.log, node.config, p2pNode, node.ledger, node.catchupBlockAuth, agreementLedger.UnmatchedPendingCertificates, node.lowPriorityCryptoVerificationPool)
node.txPoolSyncerService = rpcs.MakeTxSyncer(node.transactionPool, node.net, node.txHandler.SolicitedTxHandler(), time.Duration(cfg.TxSyncIntervalSeconds)*time.Second, time.Duration(cfg.TxSyncTimeoutSeconds)*time.Second, cfg.TxSyncServeResponseSize)
+ registry, err := ensureParticipationDB(genesisDir, node.log)
+ if err != nil {
+ log.Errorf("unable to initialize the participation registry database: %v", err)
+ return nil, err
+ }
+ node.accountManager = data.MakeAccountManager(log, registry)
+
err = node.loadParticipationKeys()
if err != nil {
log.Errorf("Cannot load participation keys: %v", err)
@@ -394,6 +403,7 @@ func (node *AlgorandFullNode) Start() {
func (node *AlgorandFullNode) startMonitoringRoutines() {
node.monitoringRoutinesWaitGroup.Add(3)
+ // PKI TODO: Remove this with #2596
// Periodically check for new participation keys
go node.checkForParticipationKeys()
@@ -473,7 +483,7 @@ func (node *AlgorandFullNode) Ledger() *data.Ledger {
// writeDevmodeBlock generates a new block for a devmode, and write it to the ledger.
func (node *AlgorandFullNode) writeDevmodeBlock() (err error) {
- var vb *ledger.ValidatedBlock
+ var vb *ledgercore.ValidatedBlock
vb, err = node.transactionPool.AssembleDevModeBlock()
if err != nil || vb == nil {
return
@@ -742,6 +752,16 @@ func (node *AlgorandFullNode) GetPendingTxnsFromPool() ([]transactions.SignedTxn
return bookkeeping.SignedTxnGroupsFlatten(node.transactionPool.PendingTxGroups()), nil
}
+// ensureParticipationDB opens or creates a participation DB.
+func ensureParticipationDB(genesisDir string, log logging.Logger) (account.ParticipationRegistry, error) {
+ accessorFile := filepath.Join(genesisDir, config.ParticipationRegistryFilename)
+ accessor, err := db.OpenPair(accessorFile, false)
+ if err != nil {
+ return nil, err
+ }
+ return account.MakeParticipationRegistry(accessor, log)
+}
+
// Reload participation keys from disk periodically
func (node *AlgorandFullNode) checkForParticipationKeys() {
defer node.monitoringRoutinesWaitGroup.Done()
@@ -760,6 +780,149 @@ func (node *AlgorandFullNode) checkForParticipationKeys() {
}
}
+// ListParticipationKeys returns all participation keys currently installed on the node
+func (node *AlgorandFullNode) ListParticipationKeys() (partKeys []account.ParticipationRecord, err error) {
+ return node.accountManager.Registry().GetAll(), nil
+}
+
+// GetParticipationKey retries the information of a participation id from the node
+func (node *AlgorandFullNode) GetParticipationKey(partKey account.ParticipationID) (account.ParticipationRecord, error) {
+ rval := node.accountManager.Registry().Get(partKey)
+
+ if rval.IsZero() {
+ return account.ParticipationRecord{}, account.ErrParticipationIDNotFound
+ }
+
+ return node.accountManager.Registry().Get(partKey), nil
+}
+
+// RemoveParticipationKey given a participation id, remove the records from the node
+func (node *AlgorandFullNode) RemoveParticipationKey(partKey account.ParticipationID) error {
+
+ // Need to remove the file and then remove the entry in the registry
+ // Let's first get the recorded information from the registry so we can lookup the file
+
+ partRecord := node.accountManager.Registry().Get(partKey)
+
+ if partRecord.IsZero() {
+ return account.ErrParticipationIDNotFound
+ }
+
+ genID := node.GenesisID()
+
+ outDir := filepath.Join(node.rootDir, genID)
+
+ filename := config.PartKeyFilename(partRecord.ParticipationID.String(), uint64(partRecord.FirstValid), uint64(partRecord.LastValid))
+ fullyQualifiedFilename := filepath.Join(outDir, filepath.Base(filename))
+
+ err := node.accountManager.Registry().Delete(partKey)
+ if err != nil {
+ return err
+ }
+
+ // PKI TODO: pick a better timeout, this is just something short. This could also be removed if we change
+ // POST /v2/participation and DELETE /v2/participation to return "202 OK Accepted" instead of waiting and getting
+ // the error message.
+ err = node.accountManager.Registry().Flush(500 * time.Millisecond)
+ if err != nil {
+ return err
+ }
+
+ // Only after deleting and flushing do we want to remove the file
+ _ = os.Remove(fullyQualifiedFilename)
+
+ return nil
+}
+
+func createTemporaryParticipationKey(outDir string, partKeyBinary []byte) (string, error) {
+ var sb strings.Builder
+
+ // Create a temporary filename with a UUID so that we can call this function twice
+ // in a row without worrying about collisions
+ sb.WriteString("tempPartKeyBinary.")
+ sb.WriteString(uuid.NewV4().String())
+ sb.WriteString(".bin")
+
+ tempFile := filepath.Join(outDir, filepath.Base(sb.String()))
+
+ file, err := os.Create(tempFile)
+
+ if err != nil {
+ return "", err
+ }
+
+ _, err = file.Write(partKeyBinary)
+
+ file.Close()
+
+ if err != nil {
+ os.Remove(tempFile)
+ return "", err
+ }
+
+ return tempFile, nil
+}
+
+// InstallParticipationKey Given a participation key binary stream install the participation key.
+func (node *AlgorandFullNode) InstallParticipationKey(partKeyBinary []byte) (account.ParticipationID, error) {
+ genID := node.GenesisID()
+
+ outDir := filepath.Join(node.rootDir, genID)
+
+ fullyQualifiedTempFile, err := createTemporaryParticipationKey(outDir, partKeyBinary)
+ // We need to make sure no tempfile is created/remains if there is an error
+ // However, we will eventually rename this file but if we fail in-between
+ // this point and the rename we want to ensure that we remove the temporary file
+ // After we rename, this will fail anyway since the file will not exist
+
+ // Explicitly ignore the error with a closure
+ defer func(name string) {
+ _ = os.Remove(name)
+ }(fullyQualifiedTempFile)
+
+ if err != nil {
+ return account.ParticipationID{}, err
+ }
+
+ inputdb, err := db.MakeErasableAccessor(fullyQualifiedTempFile)
+ if err != nil {
+ return account.ParticipationID{}, err
+ }
+ defer inputdb.Close()
+
+ partkey, err := account.RestoreParticipation(inputdb)
+ if err != nil {
+ return account.ParticipationID{}, err
+ }
+ defer partkey.Close()
+
+ if partkey.Parent == (basics.Address{}) {
+ return account.ParticipationID{}, fmt.Errorf("cannot install partkey with missing (zero) parent address")
+ }
+
+ // Tell the AccountManager about the Participation (dupes don't matter) so we ignore the return value
+ _ = node.accountManager.AddParticipation(partkey)
+
+ // PKI TODO: pick a better timeout, this is just something short. This could also be removed if we change
+ // POST /v2/participation and DELETE /v2/participation to return "202 OK Accepted" instead of waiting and getting
+ // the error message.
+ err = node.accountManager.Registry().Flush(500 * time.Millisecond)
+ if err != nil {
+ return account.ParticipationID{}, err
+ }
+
+ newFilename := config.PartKeyFilename(partkey.ID().String(), uint64(partkey.FirstValid), uint64(partkey.LastValid))
+ newFullyQualifiedFilename := filepath.Join(outDir, filepath.Base(newFilename))
+
+ err = os.Rename(fullyQualifiedTempFile, newFullyQualifiedFilename)
+
+ if err != nil {
+ return account.ParticipationID{}, nil
+ }
+
+ return partkey.ID(), nil
+}
+
func (node *AlgorandFullNode) loadParticipationKeys() error {
// Generate a list of all potential participation key files
genesisDir := filepath.Join(node.rootDir, node.genesisID)
@@ -781,7 +944,7 @@ func (node *AlgorandFullNode) loadParticipationKeys() error {
if err != nil {
if db.IsErrBusy(err) {
// this is a special case:
- // we might get "database is locked" when we attempt to access a database that is conurrently updates it's participation keys.
+ // we might get "database is locked" when we attempt to access a database that is concurrently updating its participation keys.
// that database is clearly already on the account manager, and doesn't need to be processed through this logic, and therefore
// we can safely ignore that fail case.
continue
@@ -913,6 +1076,10 @@ func (node *AlgorandFullNode) oldKeyDeletionThread() {
node.mu.Lock()
node.accountManager.DeleteOldKeys(latestHdr, ccSigs, agreementProto)
node.mu.Unlock()
+
+ // PKI TODO: Maybe we don't even need to flush the registry.
+ // Persist participation registry metrics.
+ node.accountManager.FlushRegistry(2 * time.Second)
}
}
@@ -1067,7 +1234,7 @@ func (node *AlgorandFullNode) SetCatchpointCatchupMode(catchpointCatchupMode boo
// validatedBlock satisfies agreement.ValidatedBlock
type validatedBlock struct {
- vb *ledger.ValidatedBlock
+ vb *ledgercore.ValidatedBlock
}
// WithSeed satisfies the agreement.ValidatedBlock interface.
@@ -1083,10 +1250,11 @@ func (vb validatedBlock) Block() bookkeeping.Block {
}
// AssembleBlock implements Ledger.AssembleBlock.
-func (node *AlgorandFullNode) AssembleBlock(round basics.Round, deadline time.Time) (agreement.ValidatedBlock, error) {
+func (node *AlgorandFullNode) AssembleBlock(round basics.Round) (agreement.ValidatedBlock, error) {
+ deadline := time.Now().Add(node.config.ProposalAssemblyTime)
lvb, err := node.transactionPool.AssembleBlock(round, deadline)
if err != nil {
- if err == pools.ErrStaleBlockAssemblyRequest {
+ if errors.Is(err, pools.ErrStaleBlockAssemblyRequest) {
// convert specific error to one that would have special handling in the agreement code.
err = agreement.ErrAssembleBlockRoundStale
@@ -1112,7 +1280,7 @@ func (node *AlgorandFullNode) VotingKeys(votingRound, keysRound basics.Round) []
keys := node.accountManager.Keys(votingRound)
participations := make([]account.Participation, 0, len(keys))
- accountsData := make(map[basics.Address]basics.AccountData, len(keys))
+ accountsData := make(map[basics.Address]basics.OnlineAccountData, len(keys))
matchingAccountsKeys := make(map[basics.Address]bool)
mismatchingAccountsKeys := make(map[basics.Address]int)
const bitMismatchingVotingKey = 1
@@ -1121,7 +1289,7 @@ func (node *AlgorandFullNode) VotingKeys(votingRound, keysRound basics.Round) []
acctData, hasAccountData := accountsData[part.Parent]
if !hasAccountData {
var err error
- acctData, _, err = node.ledger.LookupWithoutRewards(keysRound, part.Parent)
+ acctData, err = node.ledger.LookupAgreement(keysRound, part.Parent)
if err != nil {
node.log.Warnf("node.VotingKeys: Account %v not participating: cannot locate account for round %d : %v", part.Address(), keysRound, err)
continue
@@ -1139,6 +1307,12 @@ func (node *AlgorandFullNode) VotingKeys(votingRound, keysRound basics.Round) []
}
participations = append(participations, part)
matchingAccountsKeys[part.Address()] = true
+
+ // Make sure the key is registered.
+ err := node.accountManager.Registry().Register(part.ID(), votingRound)
+ if err != nil {
+ node.log.Warnf("Failed to register participation key (%s) with participation registry: %v\n", part.ID(), err)
+ }
}
// write the warnings per account only if we couldn't find a single valid key for that account.
for mismatchingAddr, warningFlags := range mismatchingAccountsKeys {
@@ -1156,3 +1330,8 @@ func (node *AlgorandFullNode) VotingKeys(votingRound, keysRound basics.Round) []
}
return participations
}
+
+// Record forwards participation record calls to the participation registry.
+func (node *AlgorandFullNode) Record(account basics.Address, round basics.Round, participationType account.ParticipationAction) {
+ node.accountManager.Record(account, round, participationType)
+}