diff options
Diffstat (limited to 'node/node.go')
-rw-r--r-- | node/node.go | 195 |
1 files changed, 187 insertions, 8 deletions
diff --git a/node/node.go b/node/node.go index e0d2f437f..e5a2f7e6a 100644 --- a/node/node.go +++ b/node/node.go @@ -19,10 +19,12 @@ package node import ( "context" + "errors" "fmt" "io/ioutil" "os" "path/filepath" + "strings" "sync" "time" @@ -53,6 +55,7 @@ import ( "github.com/algorand/go-algorand/util/metrics" "github.com/algorand/go-algorand/util/timers" "github.com/algorand/go-deadlock" + uuid "github.com/satori/go.uuid" ) // StatusReport represents the current basic status of the node @@ -177,7 +180,6 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd } p2pNode.SetPrioScheme(node) node.net = p2pNode - node.accountManager = data.MakeAccountManager(log) accountListener := makeTopAccountListener(log) @@ -267,6 +269,13 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd node.catchupService = catchup.MakeService(node.log, node.config, p2pNode, node.ledger, node.catchupBlockAuth, agreementLedger.UnmatchedPendingCertificates, node.lowPriorityCryptoVerificationPool) node.txPoolSyncerService = rpcs.MakeTxSyncer(node.transactionPool, node.net, node.txHandler.SolicitedTxHandler(), time.Duration(cfg.TxSyncIntervalSeconds)*time.Second, time.Duration(cfg.TxSyncTimeoutSeconds)*time.Second, cfg.TxSyncServeResponseSize) + registry, err := ensureParticipationDB(genesisDir, node.log) + if err != nil { + log.Errorf("unable to initialize the participation registry database: %v", err) + return nil, err + } + node.accountManager = data.MakeAccountManager(log, registry) + err = node.loadParticipationKeys() if err != nil { log.Errorf("Cannot load participation keys: %v", err) @@ -394,6 +403,7 @@ func (node *AlgorandFullNode) Start() { func (node *AlgorandFullNode) startMonitoringRoutines() { node.monitoringRoutinesWaitGroup.Add(3) + // PKI TODO: Remove this with #2596 // Periodically check for new participation keys go node.checkForParticipationKeys() @@ -473,7 +483,7 @@ func (node *AlgorandFullNode) Ledger() *data.Ledger { // writeDevmodeBlock generates a new block for a devmode, and write it to the ledger. func (node *AlgorandFullNode) writeDevmodeBlock() (err error) { - var vb *ledger.ValidatedBlock + var vb *ledgercore.ValidatedBlock vb, err = node.transactionPool.AssembleDevModeBlock() if err != nil || vb == nil { return @@ -742,6 +752,16 @@ func (node *AlgorandFullNode) GetPendingTxnsFromPool() ([]transactions.SignedTxn return bookkeeping.SignedTxnGroupsFlatten(node.transactionPool.PendingTxGroups()), nil } +// ensureParticipationDB opens or creates a participation DB. +func ensureParticipationDB(genesisDir string, log logging.Logger) (account.ParticipationRegistry, error) { + accessorFile := filepath.Join(genesisDir, config.ParticipationRegistryFilename) + accessor, err := db.OpenPair(accessorFile, false) + if err != nil { + return nil, err + } + return account.MakeParticipationRegistry(accessor, log) +} + // Reload participation keys from disk periodically func (node *AlgorandFullNode) checkForParticipationKeys() { defer node.monitoringRoutinesWaitGroup.Done() @@ -760,6 +780,149 @@ func (node *AlgorandFullNode) checkForParticipationKeys() { } } +// ListParticipationKeys returns all participation keys currently installed on the node +func (node *AlgorandFullNode) ListParticipationKeys() (partKeys []account.ParticipationRecord, err error) { + return node.accountManager.Registry().GetAll(), nil +} + +// GetParticipationKey retries the information of a participation id from the node +func (node *AlgorandFullNode) GetParticipationKey(partKey account.ParticipationID) (account.ParticipationRecord, error) { + rval := node.accountManager.Registry().Get(partKey) + + if rval.IsZero() { + return account.ParticipationRecord{}, account.ErrParticipationIDNotFound + } + + return node.accountManager.Registry().Get(partKey), nil +} + +// RemoveParticipationKey given a participation id, remove the records from the node +func (node *AlgorandFullNode) RemoveParticipationKey(partKey account.ParticipationID) error { + + // Need to remove the file and then remove the entry in the registry + // Let's first get the recorded information from the registry so we can lookup the file + + partRecord := node.accountManager.Registry().Get(partKey) + + if partRecord.IsZero() { + return account.ErrParticipationIDNotFound + } + + genID := node.GenesisID() + + outDir := filepath.Join(node.rootDir, genID) + + filename := config.PartKeyFilename(partRecord.ParticipationID.String(), uint64(partRecord.FirstValid), uint64(partRecord.LastValid)) + fullyQualifiedFilename := filepath.Join(outDir, filepath.Base(filename)) + + err := node.accountManager.Registry().Delete(partKey) + if err != nil { + return err + } + + // PKI TODO: pick a better timeout, this is just something short. This could also be removed if we change + // POST /v2/participation and DELETE /v2/participation to return "202 OK Accepted" instead of waiting and getting + // the error message. + err = node.accountManager.Registry().Flush(500 * time.Millisecond) + if err != nil { + return err + } + + // Only after deleting and flushing do we want to remove the file + _ = os.Remove(fullyQualifiedFilename) + + return nil +} + +func createTemporaryParticipationKey(outDir string, partKeyBinary []byte) (string, error) { + var sb strings.Builder + + // Create a temporary filename with a UUID so that we can call this function twice + // in a row without worrying about collisions + sb.WriteString("tempPartKeyBinary.") + sb.WriteString(uuid.NewV4().String()) + sb.WriteString(".bin") + + tempFile := filepath.Join(outDir, filepath.Base(sb.String())) + + file, err := os.Create(tempFile) + + if err != nil { + return "", err + } + + _, err = file.Write(partKeyBinary) + + file.Close() + + if err != nil { + os.Remove(tempFile) + return "", err + } + + return tempFile, nil +} + +// InstallParticipationKey Given a participation key binary stream install the participation key. +func (node *AlgorandFullNode) InstallParticipationKey(partKeyBinary []byte) (account.ParticipationID, error) { + genID := node.GenesisID() + + outDir := filepath.Join(node.rootDir, genID) + + fullyQualifiedTempFile, err := createTemporaryParticipationKey(outDir, partKeyBinary) + // We need to make sure no tempfile is created/remains if there is an error + // However, we will eventually rename this file but if we fail in-between + // this point and the rename we want to ensure that we remove the temporary file + // After we rename, this will fail anyway since the file will not exist + + // Explicitly ignore the error with a closure + defer func(name string) { + _ = os.Remove(name) + }(fullyQualifiedTempFile) + + if err != nil { + return account.ParticipationID{}, err + } + + inputdb, err := db.MakeErasableAccessor(fullyQualifiedTempFile) + if err != nil { + return account.ParticipationID{}, err + } + defer inputdb.Close() + + partkey, err := account.RestoreParticipation(inputdb) + if err != nil { + return account.ParticipationID{}, err + } + defer partkey.Close() + + if partkey.Parent == (basics.Address{}) { + return account.ParticipationID{}, fmt.Errorf("cannot install partkey with missing (zero) parent address") + } + + // Tell the AccountManager about the Participation (dupes don't matter) so we ignore the return value + _ = node.accountManager.AddParticipation(partkey) + + // PKI TODO: pick a better timeout, this is just something short. This could also be removed if we change + // POST /v2/participation and DELETE /v2/participation to return "202 OK Accepted" instead of waiting and getting + // the error message. + err = node.accountManager.Registry().Flush(500 * time.Millisecond) + if err != nil { + return account.ParticipationID{}, err + } + + newFilename := config.PartKeyFilename(partkey.ID().String(), uint64(partkey.FirstValid), uint64(partkey.LastValid)) + newFullyQualifiedFilename := filepath.Join(outDir, filepath.Base(newFilename)) + + err = os.Rename(fullyQualifiedTempFile, newFullyQualifiedFilename) + + if err != nil { + return account.ParticipationID{}, nil + } + + return partkey.ID(), nil +} + func (node *AlgorandFullNode) loadParticipationKeys() error { // Generate a list of all potential participation key files genesisDir := filepath.Join(node.rootDir, node.genesisID) @@ -781,7 +944,7 @@ func (node *AlgorandFullNode) loadParticipationKeys() error { if err != nil { if db.IsErrBusy(err) { // this is a special case: - // we might get "database is locked" when we attempt to access a database that is conurrently updates it's participation keys. + // we might get "database is locked" when we attempt to access a database that is concurrently updating its participation keys. // that database is clearly already on the account manager, and doesn't need to be processed through this logic, and therefore // we can safely ignore that fail case. continue @@ -913,6 +1076,10 @@ func (node *AlgorandFullNode) oldKeyDeletionThread() { node.mu.Lock() node.accountManager.DeleteOldKeys(latestHdr, ccSigs, agreementProto) node.mu.Unlock() + + // PKI TODO: Maybe we don't even need to flush the registry. + // Persist participation registry metrics. + node.accountManager.FlushRegistry(2 * time.Second) } } @@ -1067,7 +1234,7 @@ func (node *AlgorandFullNode) SetCatchpointCatchupMode(catchpointCatchupMode boo // validatedBlock satisfies agreement.ValidatedBlock type validatedBlock struct { - vb *ledger.ValidatedBlock + vb *ledgercore.ValidatedBlock } // WithSeed satisfies the agreement.ValidatedBlock interface. @@ -1083,10 +1250,11 @@ func (vb validatedBlock) Block() bookkeeping.Block { } // AssembleBlock implements Ledger.AssembleBlock. -func (node *AlgorandFullNode) AssembleBlock(round basics.Round, deadline time.Time) (agreement.ValidatedBlock, error) { +func (node *AlgorandFullNode) AssembleBlock(round basics.Round) (agreement.ValidatedBlock, error) { + deadline := time.Now().Add(node.config.ProposalAssemblyTime) lvb, err := node.transactionPool.AssembleBlock(round, deadline) if err != nil { - if err == pools.ErrStaleBlockAssemblyRequest { + if errors.Is(err, pools.ErrStaleBlockAssemblyRequest) { // convert specific error to one that would have special handling in the agreement code. err = agreement.ErrAssembleBlockRoundStale @@ -1112,7 +1280,7 @@ func (node *AlgorandFullNode) VotingKeys(votingRound, keysRound basics.Round) [] keys := node.accountManager.Keys(votingRound) participations := make([]account.Participation, 0, len(keys)) - accountsData := make(map[basics.Address]basics.AccountData, len(keys)) + accountsData := make(map[basics.Address]basics.OnlineAccountData, len(keys)) matchingAccountsKeys := make(map[basics.Address]bool) mismatchingAccountsKeys := make(map[basics.Address]int) const bitMismatchingVotingKey = 1 @@ -1121,7 +1289,7 @@ func (node *AlgorandFullNode) VotingKeys(votingRound, keysRound basics.Round) [] acctData, hasAccountData := accountsData[part.Parent] if !hasAccountData { var err error - acctData, _, err = node.ledger.LookupWithoutRewards(keysRound, part.Parent) + acctData, err = node.ledger.LookupAgreement(keysRound, part.Parent) if err != nil { node.log.Warnf("node.VotingKeys: Account %v not participating: cannot locate account for round %d : %v", part.Address(), keysRound, err) continue @@ -1139,6 +1307,12 @@ func (node *AlgorandFullNode) VotingKeys(votingRound, keysRound basics.Round) [] } participations = append(participations, part) matchingAccountsKeys[part.Address()] = true + + // Make sure the key is registered. + err := node.accountManager.Registry().Register(part.ID(), votingRound) + if err != nil { + node.log.Warnf("Failed to register participation key (%s) with participation registry: %v\n", part.ID(), err) + } } // write the warnings per account only if we couldn't find a single valid key for that account. for mismatchingAddr, warningFlags := range mismatchingAccountsKeys { @@ -1156,3 +1330,8 @@ func (node *AlgorandFullNode) VotingKeys(votingRound, keysRound basics.Round) [] } return participations } + +// Record forwards participation record calls to the participation registry. +func (node *AlgorandFullNode) Record(account basics.Address, round basics.Round, participationType account.ParticipationAction) { + node.accountManager.Record(account, round, participationType) +} |