diff options
Diffstat (limited to 'node/node.go')
-rw-r--r-- | node/node.go | 251 |
1 files changed, 200 insertions, 51 deletions
diff --git a/node/node.go b/node/node.go index 62256b348..e5a2f7e6a 100644 --- a/node/node.go +++ b/node/node.go @@ -24,6 +24,7 @@ import ( "io/ioutil" "os" "path/filepath" + "strings" "sync" "time" @@ -38,7 +39,6 @@ import ( "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/data/bookkeeping" "github.com/algorand/go-algorand/data/committee" - "github.com/algorand/go-algorand/data/pooldata" "github.com/algorand/go-algorand/data/pools" "github.com/algorand/go-algorand/data/transactions" "github.com/algorand/go-algorand/data/transactions/verify" @@ -50,12 +50,12 @@ import ( "github.com/algorand/go-algorand/node/indexer" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/rpcs" - "github.com/algorand/go-algorand/txnsync" "github.com/algorand/go-algorand/util/db" "github.com/algorand/go-algorand/util/execpool" "github.com/algorand/go-algorand/util/metrics" "github.com/algorand/go-algorand/util/timers" "github.com/algorand/go-deadlock" + uuid "github.com/satori/go.uuid" ) // StatusReport represents the current basic status of the node @@ -108,7 +108,6 @@ type AlgorandFullNode struct { blockService *rpcs.BlockService ledgerService *rpcs.LedgerService txPoolSyncerService *rpcs.TxSyncer - txnSyncService *txnsync.Service indexer *indexer.Indexer @@ -136,8 +135,6 @@ type AlgorandFullNode struct { tracer messagetracer.MessageTracer compactCert *compactcert.Worker - - txnSyncConnector *transactionSyncNodeConnector } // TxnWithStatus represents information about a single transaction, @@ -183,7 +180,6 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd } p2pNode.SetPrioScheme(node) node.net = p2pNode - node.accountManager = data.MakeAccountManager(log) accountListener := makeTopAccountListener(log) @@ -215,6 +211,15 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd node.transactionPool = pools.MakeTransactionPool(node.ledger.Ledger, cfg, node.log) + blockListeners := []ledger.BlockListener{ + node.transactionPool, + node, + } + + if node.config.EnableTopAccountsReporting { + blockListeners = append(blockListeners, &accountListener) + } + node.ledger.RegisterBlockListeners(blockListeners) node.txHandler = data.MakeTxHandler(node.transactionPool, node.ledger, node.net, node.genesisID, node.genesisHash, node.lowPriorityCryptoVerificationPool) // Indexer setup @@ -263,8 +268,13 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd node.catchupBlockAuth = blockAuthenticatorImpl{Ledger: node.ledger, AsyncVoteVerifier: agreement.MakeAsyncVoteVerifier(node.lowPriorityCryptoVerificationPool)} node.catchupService = catchup.MakeService(node.log, node.config, p2pNode, node.ledger, node.catchupBlockAuth, agreementLedger.UnmatchedPendingCertificates, node.lowPriorityCryptoVerificationPool) node.txPoolSyncerService = rpcs.MakeTxSyncer(node.transactionPool, node.net, node.txHandler.SolicitedTxHandler(), time.Duration(cfg.TxSyncIntervalSeconds)*time.Second, time.Duration(cfg.TxSyncTimeoutSeconds)*time.Second, cfg.TxSyncServeResponseSize) - node.txnSyncConnector = makeTransactionSyncNodeConnector(node) - node.txnSyncService = txnsync.MakeTransactionSyncService(node.log, node.txnSyncConnector, cfg.NetAddress != "", node.genesisID, node.genesisHash, node.config, node.lowPriorityCryptoVerificationPool) + + registry, err := ensureParticipationDB(genesisDir, node.log) + if err != nil { + log.Errorf("unable to initialize the participation registry database: %v", err) + return nil, err + } + node.accountManager = data.MakeAccountManager(log, registry) err = node.loadParticipationKeys() if err != nil { @@ -298,17 +308,6 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd } node.compactCert = compactcert.NewWorker(compactCertAccess, node.log, node.accountManager, node.ledger.Ledger, node.net, node) - blockListeners := []ledger.BlockListener{ - node.txnSyncConnector, - node.transactionPool, - node, - } - - if node.config.EnableTopAccountsReporting { - blockListeners = append(blockListeners, &accountListener) - } - node.ledger.RegisterBlockListeners(blockListeners) - return node, err } @@ -379,12 +378,9 @@ func (node *AlgorandFullNode) Start() { node.txPoolSyncerService.Start(node.catchupService.InitialSyncDone) node.blockService.Start() node.ledgerService.Start() + node.txHandler.Start() node.compactCert.Start() - node.txnSyncService.Start() - node.txnSyncConnector.start() - startNetwork() - // start indexer if idx, err := node.Indexer(); err == nil { err := idx.Start() @@ -407,6 +403,7 @@ func (node *AlgorandFullNode) Start() { func (node *AlgorandFullNode) startMonitoringRoutines() { node.monitoringRoutinesWaitGroup.Add(3) + // PKI TODO: Remove this with #2596 // Periodically check for new participation keys go node.checkForParticipationKeys() @@ -451,8 +448,7 @@ func (node *AlgorandFullNode) Stop() { if node.catchpointCatchupService != nil { node.catchpointCatchupService.Stop() } else { - node.txnSyncService.Stop() - node.txnSyncConnector.stop() + node.txHandler.Stop() node.agreementService.Shutdown() node.catchupService.Stop() node.txPoolSyncerService.Stop() @@ -487,7 +483,7 @@ func (node *AlgorandFullNode) Ledger() *data.Ledger { // writeDevmodeBlock generates a new block for a devmode, and write it to the ledger. func (node *AlgorandFullNode) writeDevmodeBlock() (err error) { - var vb *ledger.ValidatedBlock + var vb *ledgercore.ValidatedBlock vb, err = node.transactionPool.AssembleDevModeBlock() if err != nil || vb == nil { return @@ -528,7 +524,7 @@ func (node *AlgorandFullNode) BroadcastSignedTxGroup(txgroup []transactions.Sign return err } - err = node.transactionPool.Remember(pooldata.SignedTxGroup{Transactions: txgroup, LocallyOriginated: true}) + err = node.transactionPool.Remember(txgroup) if err != nil { node.log.Infof("rejected by local pool: %v - transaction group was %+v", err, txgroup) return err @@ -539,8 +535,6 @@ func (node *AlgorandFullNode) BroadcastSignedTxGroup(txgroup []transactions.Sign logging.Base().Infof("unable to pin transaction: %v", err) } - node.txnSyncConnector.onNewTransactionPoolEntry(node.transactionPool.PendingCount()) - var enc []byte var txids []transactions.Txid for _, tx := range txgroup { @@ -755,12 +749,17 @@ func (node *AlgorandFullNode) SuggestedFee() basics.MicroAlgos { // GetPendingTxnsFromPool returns a snapshot of every pending transactions from the node's transaction pool in a slice. // Transactions are sorted in decreasing order. If no transactions, returns an empty slice. func (node *AlgorandFullNode) GetPendingTxnsFromPool() ([]transactions.SignedTxn, error) { - poolGroups, _ := node.transactionPool.PendingTxGroups() - txnGroups := make([][]transactions.SignedTxn, len(poolGroups)) - for i := range txnGroups { - txnGroups[i] = poolGroups[i].Transactions + return bookkeeping.SignedTxnGroupsFlatten(node.transactionPool.PendingTxGroups()), nil +} + +// ensureParticipationDB opens or creates a participation DB. +func ensureParticipationDB(genesisDir string, log logging.Logger) (account.ParticipationRegistry, error) { + accessorFile := filepath.Join(genesisDir, config.ParticipationRegistryFilename) + accessor, err := db.OpenPair(accessorFile, false) + if err != nil { + return nil, err } - return bookkeeping.SignedTxnGroupsFlatten(txnGroups), nil + return account.MakeParticipationRegistry(accessor, log) } // Reload participation keys from disk periodically @@ -781,6 +780,149 @@ func (node *AlgorandFullNode) checkForParticipationKeys() { } } +// ListParticipationKeys returns all participation keys currently installed on the node +func (node *AlgorandFullNode) ListParticipationKeys() (partKeys []account.ParticipationRecord, err error) { + return node.accountManager.Registry().GetAll(), nil +} + +// GetParticipationKey retries the information of a participation id from the node +func (node *AlgorandFullNode) GetParticipationKey(partKey account.ParticipationID) (account.ParticipationRecord, error) { + rval := node.accountManager.Registry().Get(partKey) + + if rval.IsZero() { + return account.ParticipationRecord{}, account.ErrParticipationIDNotFound + } + + return node.accountManager.Registry().Get(partKey), nil +} + +// RemoveParticipationKey given a participation id, remove the records from the node +func (node *AlgorandFullNode) RemoveParticipationKey(partKey account.ParticipationID) error { + + // Need to remove the file and then remove the entry in the registry + // Let's first get the recorded information from the registry so we can lookup the file + + partRecord := node.accountManager.Registry().Get(partKey) + + if partRecord.IsZero() { + return account.ErrParticipationIDNotFound + } + + genID := node.GenesisID() + + outDir := filepath.Join(node.rootDir, genID) + + filename := config.PartKeyFilename(partRecord.ParticipationID.String(), uint64(partRecord.FirstValid), uint64(partRecord.LastValid)) + fullyQualifiedFilename := filepath.Join(outDir, filepath.Base(filename)) + + err := node.accountManager.Registry().Delete(partKey) + if err != nil { + return err + } + + // PKI TODO: pick a better timeout, this is just something short. This could also be removed if we change + // POST /v2/participation and DELETE /v2/participation to return "202 OK Accepted" instead of waiting and getting + // the error message. + err = node.accountManager.Registry().Flush(500 * time.Millisecond) + if err != nil { + return err + } + + // Only after deleting and flushing do we want to remove the file + _ = os.Remove(fullyQualifiedFilename) + + return nil +} + +func createTemporaryParticipationKey(outDir string, partKeyBinary []byte) (string, error) { + var sb strings.Builder + + // Create a temporary filename with a UUID so that we can call this function twice + // in a row without worrying about collisions + sb.WriteString("tempPartKeyBinary.") + sb.WriteString(uuid.NewV4().String()) + sb.WriteString(".bin") + + tempFile := filepath.Join(outDir, filepath.Base(sb.String())) + + file, err := os.Create(tempFile) + + if err != nil { + return "", err + } + + _, err = file.Write(partKeyBinary) + + file.Close() + + if err != nil { + os.Remove(tempFile) + return "", err + } + + return tempFile, nil +} + +// InstallParticipationKey Given a participation key binary stream install the participation key. +func (node *AlgorandFullNode) InstallParticipationKey(partKeyBinary []byte) (account.ParticipationID, error) { + genID := node.GenesisID() + + outDir := filepath.Join(node.rootDir, genID) + + fullyQualifiedTempFile, err := createTemporaryParticipationKey(outDir, partKeyBinary) + // We need to make sure no tempfile is created/remains if there is an error + // However, we will eventually rename this file but if we fail in-between + // this point and the rename we want to ensure that we remove the temporary file + // After we rename, this will fail anyway since the file will not exist + + // Explicitly ignore the error with a closure + defer func(name string) { + _ = os.Remove(name) + }(fullyQualifiedTempFile) + + if err != nil { + return account.ParticipationID{}, err + } + + inputdb, err := db.MakeErasableAccessor(fullyQualifiedTempFile) + if err != nil { + return account.ParticipationID{}, err + } + defer inputdb.Close() + + partkey, err := account.RestoreParticipation(inputdb) + if err != nil { + return account.ParticipationID{}, err + } + defer partkey.Close() + + if partkey.Parent == (basics.Address{}) { + return account.ParticipationID{}, fmt.Errorf("cannot install partkey with missing (zero) parent address") + } + + // Tell the AccountManager about the Participation (dupes don't matter) so we ignore the return value + _ = node.accountManager.AddParticipation(partkey) + + // PKI TODO: pick a better timeout, this is just something short. This could also be removed if we change + // POST /v2/participation and DELETE /v2/participation to return "202 OK Accepted" instead of waiting and getting + // the error message. + err = node.accountManager.Registry().Flush(500 * time.Millisecond) + if err != nil { + return account.ParticipationID{}, err + } + + newFilename := config.PartKeyFilename(partkey.ID().String(), uint64(partkey.FirstValid), uint64(partkey.LastValid)) + newFullyQualifiedFilename := filepath.Join(outDir, filepath.Base(newFilename)) + + err = os.Rename(fullyQualifiedTempFile, newFullyQualifiedFilename) + + if err != nil { + return account.ParticipationID{}, nil + } + + return partkey.ID(), nil +} + func (node *AlgorandFullNode) loadParticipationKeys() error { // Generate a list of all potential participation key files genesisDir := filepath.Join(node.rootDir, node.genesisID) @@ -802,7 +944,7 @@ func (node *AlgorandFullNode) loadParticipationKeys() error { if err != nil { if db.IsErrBusy(err) { // this is a special case: - // we might get "database is locked" when we attempt to access a database that is conurrently updates it's participation keys. + // we might get "database is locked" when we attempt to access a database that is concurrently updating its participation keys. // that database is clearly already on the account manager, and doesn't need to be processed through this logic, and therefore // we can safely ignore that fail case. continue @@ -862,17 +1004,10 @@ func (node *AlgorandFullNode) IsArchival() bool { } // OnNewBlock implements the BlockListener interface so we're notified after each block is written to the ledger -// The method is being called *after* the transaction pool received it's OnNewBlock call. func (node *AlgorandFullNode) OnNewBlock(block bookkeeping.Block, delta ledgercore.StateDelta) { - blkRound := block.Round() - if node.ledger.Latest() > blkRound { + if node.ledger.Latest() > block.Round() { return } - - // the transaction pool already updated its transactions (dumping out old and invalid transactions). At this point, - // we need to let the txnsync know about the size of the transaction pool. - node.txnSyncConnector.onNewTransactionPoolEntry(node.transactionPool.PendingCount()) - node.syncStatusMu.Lock() node.lastRoundTimestamp = time.Now() node.hasSyncedSinceStartup = true @@ -941,6 +1076,10 @@ func (node *AlgorandFullNode) oldKeyDeletionThread() { node.mu.Lock() node.accountManager.DeleteOldKeys(latestHdr, ccSigs, agreementProto) node.mu.Unlock() + + // PKI TODO: Maybe we don't even need to flush the registry. + // Persist participation registry metrics. + node.accountManager.FlushRegistry(2 * time.Second) } } @@ -1040,8 +1179,7 @@ func (node *AlgorandFullNode) SetCatchpointCatchupMode(catchpointCatchupMode boo node.waitMonitoringRoutines() }() node.net.ClearHandlers() - node.txnSyncConnector.stop() - node.txnSyncService.Stop() + node.txHandler.Stop() node.agreementService.Shutdown() node.catchupService.Stop() node.txPoolSyncerService.Stop() @@ -1065,8 +1203,7 @@ func (node *AlgorandFullNode) SetCatchpointCatchupMode(catchpointCatchupMode boo node.txPoolSyncerService.Start(node.catchupService.InitialSyncDone) node.blockService.Start() node.ledgerService.Start() - node.txnSyncService.Start() - node.txnSyncConnector.start() + node.txHandler.Start() // start indexer if idx, err := node.Indexer(); err == nil { @@ -1097,7 +1234,7 @@ func (node *AlgorandFullNode) SetCatchpointCatchupMode(catchpointCatchupMode boo // validatedBlock satisfies agreement.ValidatedBlock type validatedBlock struct { - vb *ledger.ValidatedBlock + vb *ledgercore.ValidatedBlock } // WithSeed satisfies the agreement.ValidatedBlock interface. @@ -1113,7 +1250,8 @@ func (vb validatedBlock) Block() bookkeeping.Block { } // AssembleBlock implements Ledger.AssembleBlock. -func (node *AlgorandFullNode) AssembleBlock(round basics.Round, deadline time.Time) (agreement.ValidatedBlock, error) { +func (node *AlgorandFullNode) AssembleBlock(round basics.Round) (agreement.ValidatedBlock, error) { + deadline := time.Now().Add(node.config.ProposalAssemblyTime) lvb, err := node.transactionPool.AssembleBlock(round, deadline) if err != nil { if errors.Is(err, pools.ErrStaleBlockAssemblyRequest) { @@ -1142,7 +1280,7 @@ func (node *AlgorandFullNode) VotingKeys(votingRound, keysRound basics.Round) [] keys := node.accountManager.Keys(votingRound) participations := make([]account.Participation, 0, len(keys)) - accountsData := make(map[basics.Address]basics.AccountData, len(keys)) + accountsData := make(map[basics.Address]basics.OnlineAccountData, len(keys)) matchingAccountsKeys := make(map[basics.Address]bool) mismatchingAccountsKeys := make(map[basics.Address]int) const bitMismatchingVotingKey = 1 @@ -1151,7 +1289,7 @@ func (node *AlgorandFullNode) VotingKeys(votingRound, keysRound basics.Round) [] acctData, hasAccountData := accountsData[part.Parent] if !hasAccountData { var err error - acctData, _, err = node.ledger.LookupWithoutRewards(keysRound, part.Parent) + acctData, err = node.ledger.LookupAgreement(keysRound, part.Parent) if err != nil { node.log.Warnf("node.VotingKeys: Account %v not participating: cannot locate account for round %d : %v", part.Address(), keysRound, err) continue @@ -1169,6 +1307,12 @@ func (node *AlgorandFullNode) VotingKeys(votingRound, keysRound basics.Round) [] } participations = append(participations, part) matchingAccountsKeys[part.Address()] = true + + // Make sure the key is registered. + err := node.accountManager.Registry().Register(part.ID(), votingRound) + if err != nil { + node.log.Warnf("Failed to register participation key (%s) with participation registry: %v\n", part.ID(), err) + } } // write the warnings per account only if we couldn't find a single valid key for that account. for mismatchingAddr, warningFlags := range mismatchingAccountsKeys { @@ -1186,3 +1330,8 @@ func (node *AlgorandFullNode) VotingKeys(votingRound, keysRound basics.Round) [] } return participations } + +// Record forwards participation record calls to the participation registry. +func (node *AlgorandFullNode) Record(account basics.Address, round basics.Round, participationType account.ParticipationAction) { + node.accountManager.Record(account, round, participationType) +} |