summaryrefslogtreecommitdiff
path: root/node/node.go
diff options
context:
space:
mode:
Diffstat (limited to 'node/node.go')
-rw-r--r--node/node.go251
1 files changed, 200 insertions, 51 deletions
diff --git a/node/node.go b/node/node.go
index 62256b348..e5a2f7e6a 100644
--- a/node/node.go
+++ b/node/node.go
@@ -24,6 +24,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
+ "strings"
"sync"
"time"
@@ -38,7 +39,6 @@ import (
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/data/committee"
- "github.com/algorand/go-algorand/data/pooldata"
"github.com/algorand/go-algorand/data/pools"
"github.com/algorand/go-algorand/data/transactions"
"github.com/algorand/go-algorand/data/transactions/verify"
@@ -50,12 +50,12 @@ import (
"github.com/algorand/go-algorand/node/indexer"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/rpcs"
- "github.com/algorand/go-algorand/txnsync"
"github.com/algorand/go-algorand/util/db"
"github.com/algorand/go-algorand/util/execpool"
"github.com/algorand/go-algorand/util/metrics"
"github.com/algorand/go-algorand/util/timers"
"github.com/algorand/go-deadlock"
+ uuid "github.com/satori/go.uuid"
)
// StatusReport represents the current basic status of the node
@@ -108,7 +108,6 @@ type AlgorandFullNode struct {
blockService *rpcs.BlockService
ledgerService *rpcs.LedgerService
txPoolSyncerService *rpcs.TxSyncer
- txnSyncService *txnsync.Service
indexer *indexer.Indexer
@@ -136,8 +135,6 @@ type AlgorandFullNode struct {
tracer messagetracer.MessageTracer
compactCert *compactcert.Worker
-
- txnSyncConnector *transactionSyncNodeConnector
}
// TxnWithStatus represents information about a single transaction,
@@ -183,7 +180,6 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd
}
p2pNode.SetPrioScheme(node)
node.net = p2pNode
- node.accountManager = data.MakeAccountManager(log)
accountListener := makeTopAccountListener(log)
@@ -215,6 +211,15 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd
node.transactionPool = pools.MakeTransactionPool(node.ledger.Ledger, cfg, node.log)
+ blockListeners := []ledger.BlockListener{
+ node.transactionPool,
+ node,
+ }
+
+ if node.config.EnableTopAccountsReporting {
+ blockListeners = append(blockListeners, &accountListener)
+ }
+ node.ledger.RegisterBlockListeners(blockListeners)
node.txHandler = data.MakeTxHandler(node.transactionPool, node.ledger, node.net, node.genesisID, node.genesisHash, node.lowPriorityCryptoVerificationPool)
// Indexer setup
@@ -263,8 +268,13 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd
node.catchupBlockAuth = blockAuthenticatorImpl{Ledger: node.ledger, AsyncVoteVerifier: agreement.MakeAsyncVoteVerifier(node.lowPriorityCryptoVerificationPool)}
node.catchupService = catchup.MakeService(node.log, node.config, p2pNode, node.ledger, node.catchupBlockAuth, agreementLedger.UnmatchedPendingCertificates, node.lowPriorityCryptoVerificationPool)
node.txPoolSyncerService = rpcs.MakeTxSyncer(node.transactionPool, node.net, node.txHandler.SolicitedTxHandler(), time.Duration(cfg.TxSyncIntervalSeconds)*time.Second, time.Duration(cfg.TxSyncTimeoutSeconds)*time.Second, cfg.TxSyncServeResponseSize)
- node.txnSyncConnector = makeTransactionSyncNodeConnector(node)
- node.txnSyncService = txnsync.MakeTransactionSyncService(node.log, node.txnSyncConnector, cfg.NetAddress != "", node.genesisID, node.genesisHash, node.config, node.lowPriorityCryptoVerificationPool)
+
+ registry, err := ensureParticipationDB(genesisDir, node.log)
+ if err != nil {
+ log.Errorf("unable to initialize the participation registry database: %v", err)
+ return nil, err
+ }
+ node.accountManager = data.MakeAccountManager(log, registry)
err = node.loadParticipationKeys()
if err != nil {
@@ -298,17 +308,6 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd
}
node.compactCert = compactcert.NewWorker(compactCertAccess, node.log, node.accountManager, node.ledger.Ledger, node.net, node)
- blockListeners := []ledger.BlockListener{
- node.txnSyncConnector,
- node.transactionPool,
- node,
- }
-
- if node.config.EnableTopAccountsReporting {
- blockListeners = append(blockListeners, &accountListener)
- }
- node.ledger.RegisterBlockListeners(blockListeners)
-
return node, err
}
@@ -379,12 +378,9 @@ func (node *AlgorandFullNode) Start() {
node.txPoolSyncerService.Start(node.catchupService.InitialSyncDone)
node.blockService.Start()
node.ledgerService.Start()
+ node.txHandler.Start()
node.compactCert.Start()
- node.txnSyncService.Start()
- node.txnSyncConnector.start()
-
startNetwork()
-
// start indexer
if idx, err := node.Indexer(); err == nil {
err := idx.Start()
@@ -407,6 +403,7 @@ func (node *AlgorandFullNode) Start() {
func (node *AlgorandFullNode) startMonitoringRoutines() {
node.monitoringRoutinesWaitGroup.Add(3)
+ // PKI TODO: Remove this with #2596
// Periodically check for new participation keys
go node.checkForParticipationKeys()
@@ -451,8 +448,7 @@ func (node *AlgorandFullNode) Stop() {
if node.catchpointCatchupService != nil {
node.catchpointCatchupService.Stop()
} else {
- node.txnSyncService.Stop()
- node.txnSyncConnector.stop()
+ node.txHandler.Stop()
node.agreementService.Shutdown()
node.catchupService.Stop()
node.txPoolSyncerService.Stop()
@@ -487,7 +483,7 @@ func (node *AlgorandFullNode) Ledger() *data.Ledger {
// writeDevmodeBlock generates a new block for a devmode, and write it to the ledger.
func (node *AlgorandFullNode) writeDevmodeBlock() (err error) {
- var vb *ledger.ValidatedBlock
+ var vb *ledgercore.ValidatedBlock
vb, err = node.transactionPool.AssembleDevModeBlock()
if err != nil || vb == nil {
return
@@ -528,7 +524,7 @@ func (node *AlgorandFullNode) BroadcastSignedTxGroup(txgroup []transactions.Sign
return err
}
- err = node.transactionPool.Remember(pooldata.SignedTxGroup{Transactions: txgroup, LocallyOriginated: true})
+ err = node.transactionPool.Remember(txgroup)
if err != nil {
node.log.Infof("rejected by local pool: %v - transaction group was %+v", err, txgroup)
return err
@@ -539,8 +535,6 @@ func (node *AlgorandFullNode) BroadcastSignedTxGroup(txgroup []transactions.Sign
logging.Base().Infof("unable to pin transaction: %v", err)
}
- node.txnSyncConnector.onNewTransactionPoolEntry(node.transactionPool.PendingCount())
-
var enc []byte
var txids []transactions.Txid
for _, tx := range txgroup {
@@ -755,12 +749,17 @@ func (node *AlgorandFullNode) SuggestedFee() basics.MicroAlgos {
// GetPendingTxnsFromPool returns a snapshot of every pending transactions from the node's transaction pool in a slice.
// Transactions are sorted in decreasing order. If no transactions, returns an empty slice.
func (node *AlgorandFullNode) GetPendingTxnsFromPool() ([]transactions.SignedTxn, error) {
- poolGroups, _ := node.transactionPool.PendingTxGroups()
- txnGroups := make([][]transactions.SignedTxn, len(poolGroups))
- for i := range txnGroups {
- txnGroups[i] = poolGroups[i].Transactions
+ return bookkeeping.SignedTxnGroupsFlatten(node.transactionPool.PendingTxGroups()), nil
+}
+
+// ensureParticipationDB opens or creates a participation DB.
+func ensureParticipationDB(genesisDir string, log logging.Logger) (account.ParticipationRegistry, error) {
+ accessorFile := filepath.Join(genesisDir, config.ParticipationRegistryFilename)
+ accessor, err := db.OpenPair(accessorFile, false)
+ if err != nil {
+ return nil, err
}
- return bookkeeping.SignedTxnGroupsFlatten(txnGroups), nil
+ return account.MakeParticipationRegistry(accessor, log)
}
// Reload participation keys from disk periodically
@@ -781,6 +780,149 @@ func (node *AlgorandFullNode) checkForParticipationKeys() {
}
}
+// ListParticipationKeys returns all participation keys currently installed on the node
+func (node *AlgorandFullNode) ListParticipationKeys() (partKeys []account.ParticipationRecord, err error) {
+ return node.accountManager.Registry().GetAll(), nil
+}
+
+// GetParticipationKey retries the information of a participation id from the node
+func (node *AlgorandFullNode) GetParticipationKey(partKey account.ParticipationID) (account.ParticipationRecord, error) {
+ rval := node.accountManager.Registry().Get(partKey)
+
+ if rval.IsZero() {
+ return account.ParticipationRecord{}, account.ErrParticipationIDNotFound
+ }
+
+ return node.accountManager.Registry().Get(partKey), nil
+}
+
+// RemoveParticipationKey given a participation id, remove the records from the node
+func (node *AlgorandFullNode) RemoveParticipationKey(partKey account.ParticipationID) error {
+
+ // Need to remove the file and then remove the entry in the registry
+ // Let's first get the recorded information from the registry so we can lookup the file
+
+ partRecord := node.accountManager.Registry().Get(partKey)
+
+ if partRecord.IsZero() {
+ return account.ErrParticipationIDNotFound
+ }
+
+ genID := node.GenesisID()
+
+ outDir := filepath.Join(node.rootDir, genID)
+
+ filename := config.PartKeyFilename(partRecord.ParticipationID.String(), uint64(partRecord.FirstValid), uint64(partRecord.LastValid))
+ fullyQualifiedFilename := filepath.Join(outDir, filepath.Base(filename))
+
+ err := node.accountManager.Registry().Delete(partKey)
+ if err != nil {
+ return err
+ }
+
+ // PKI TODO: pick a better timeout, this is just something short. This could also be removed if we change
+ // POST /v2/participation and DELETE /v2/participation to return "202 OK Accepted" instead of waiting and getting
+ // the error message.
+ err = node.accountManager.Registry().Flush(500 * time.Millisecond)
+ if err != nil {
+ return err
+ }
+
+ // Only after deleting and flushing do we want to remove the file
+ _ = os.Remove(fullyQualifiedFilename)
+
+ return nil
+}
+
+func createTemporaryParticipationKey(outDir string, partKeyBinary []byte) (string, error) {
+ var sb strings.Builder
+
+ // Create a temporary filename with a UUID so that we can call this function twice
+ // in a row without worrying about collisions
+ sb.WriteString("tempPartKeyBinary.")
+ sb.WriteString(uuid.NewV4().String())
+ sb.WriteString(".bin")
+
+ tempFile := filepath.Join(outDir, filepath.Base(sb.String()))
+
+ file, err := os.Create(tempFile)
+
+ if err != nil {
+ return "", err
+ }
+
+ _, err = file.Write(partKeyBinary)
+
+ file.Close()
+
+ if err != nil {
+ os.Remove(tempFile)
+ return "", err
+ }
+
+ return tempFile, nil
+}
+
+// InstallParticipationKey Given a participation key binary stream install the participation key.
+func (node *AlgorandFullNode) InstallParticipationKey(partKeyBinary []byte) (account.ParticipationID, error) {
+ genID := node.GenesisID()
+
+ outDir := filepath.Join(node.rootDir, genID)
+
+ fullyQualifiedTempFile, err := createTemporaryParticipationKey(outDir, partKeyBinary)
+ // We need to make sure no tempfile is created/remains if there is an error
+ // However, we will eventually rename this file but if we fail in-between
+ // this point and the rename we want to ensure that we remove the temporary file
+ // After we rename, this will fail anyway since the file will not exist
+
+ // Explicitly ignore the error with a closure
+ defer func(name string) {
+ _ = os.Remove(name)
+ }(fullyQualifiedTempFile)
+
+ if err != nil {
+ return account.ParticipationID{}, err
+ }
+
+ inputdb, err := db.MakeErasableAccessor(fullyQualifiedTempFile)
+ if err != nil {
+ return account.ParticipationID{}, err
+ }
+ defer inputdb.Close()
+
+ partkey, err := account.RestoreParticipation(inputdb)
+ if err != nil {
+ return account.ParticipationID{}, err
+ }
+ defer partkey.Close()
+
+ if partkey.Parent == (basics.Address{}) {
+ return account.ParticipationID{}, fmt.Errorf("cannot install partkey with missing (zero) parent address")
+ }
+
+ // Tell the AccountManager about the Participation (dupes don't matter) so we ignore the return value
+ _ = node.accountManager.AddParticipation(partkey)
+
+ // PKI TODO: pick a better timeout, this is just something short. This could also be removed if we change
+ // POST /v2/participation and DELETE /v2/participation to return "202 OK Accepted" instead of waiting and getting
+ // the error message.
+ err = node.accountManager.Registry().Flush(500 * time.Millisecond)
+ if err != nil {
+ return account.ParticipationID{}, err
+ }
+
+ newFilename := config.PartKeyFilename(partkey.ID().String(), uint64(partkey.FirstValid), uint64(partkey.LastValid))
+ newFullyQualifiedFilename := filepath.Join(outDir, filepath.Base(newFilename))
+
+ err = os.Rename(fullyQualifiedTempFile, newFullyQualifiedFilename)
+
+ if err != nil {
+ return account.ParticipationID{}, nil
+ }
+
+ return partkey.ID(), nil
+}
+
func (node *AlgorandFullNode) loadParticipationKeys() error {
// Generate a list of all potential participation key files
genesisDir := filepath.Join(node.rootDir, node.genesisID)
@@ -802,7 +944,7 @@ func (node *AlgorandFullNode) loadParticipationKeys() error {
if err != nil {
if db.IsErrBusy(err) {
// this is a special case:
- // we might get "database is locked" when we attempt to access a database that is conurrently updates it's participation keys.
+ // we might get "database is locked" when we attempt to access a database that is concurrently updating its participation keys.
// that database is clearly already on the account manager, and doesn't need to be processed through this logic, and therefore
// we can safely ignore that fail case.
continue
@@ -862,17 +1004,10 @@ func (node *AlgorandFullNode) IsArchival() bool {
}
// OnNewBlock implements the BlockListener interface so we're notified after each block is written to the ledger
-// The method is being called *after* the transaction pool received it's OnNewBlock call.
func (node *AlgorandFullNode) OnNewBlock(block bookkeeping.Block, delta ledgercore.StateDelta) {
- blkRound := block.Round()
- if node.ledger.Latest() > blkRound {
+ if node.ledger.Latest() > block.Round() {
return
}
-
- // the transaction pool already updated its transactions (dumping out old and invalid transactions). At this point,
- // we need to let the txnsync know about the size of the transaction pool.
- node.txnSyncConnector.onNewTransactionPoolEntry(node.transactionPool.PendingCount())
-
node.syncStatusMu.Lock()
node.lastRoundTimestamp = time.Now()
node.hasSyncedSinceStartup = true
@@ -941,6 +1076,10 @@ func (node *AlgorandFullNode) oldKeyDeletionThread() {
node.mu.Lock()
node.accountManager.DeleteOldKeys(latestHdr, ccSigs, agreementProto)
node.mu.Unlock()
+
+ // PKI TODO: Maybe we don't even need to flush the registry.
+ // Persist participation registry metrics.
+ node.accountManager.FlushRegistry(2 * time.Second)
}
}
@@ -1040,8 +1179,7 @@ func (node *AlgorandFullNode) SetCatchpointCatchupMode(catchpointCatchupMode boo
node.waitMonitoringRoutines()
}()
node.net.ClearHandlers()
- node.txnSyncConnector.stop()
- node.txnSyncService.Stop()
+ node.txHandler.Stop()
node.agreementService.Shutdown()
node.catchupService.Stop()
node.txPoolSyncerService.Stop()
@@ -1065,8 +1203,7 @@ func (node *AlgorandFullNode) SetCatchpointCatchupMode(catchpointCatchupMode boo
node.txPoolSyncerService.Start(node.catchupService.InitialSyncDone)
node.blockService.Start()
node.ledgerService.Start()
- node.txnSyncService.Start()
- node.txnSyncConnector.start()
+ node.txHandler.Start()
// start indexer
if idx, err := node.Indexer(); err == nil {
@@ -1097,7 +1234,7 @@ func (node *AlgorandFullNode) SetCatchpointCatchupMode(catchpointCatchupMode boo
// validatedBlock satisfies agreement.ValidatedBlock
type validatedBlock struct {
- vb *ledger.ValidatedBlock
+ vb *ledgercore.ValidatedBlock
}
// WithSeed satisfies the agreement.ValidatedBlock interface.
@@ -1113,7 +1250,8 @@ func (vb validatedBlock) Block() bookkeeping.Block {
}
// AssembleBlock implements Ledger.AssembleBlock.
-func (node *AlgorandFullNode) AssembleBlock(round basics.Round, deadline time.Time) (agreement.ValidatedBlock, error) {
+func (node *AlgorandFullNode) AssembleBlock(round basics.Round) (agreement.ValidatedBlock, error) {
+ deadline := time.Now().Add(node.config.ProposalAssemblyTime)
lvb, err := node.transactionPool.AssembleBlock(round, deadline)
if err != nil {
if errors.Is(err, pools.ErrStaleBlockAssemblyRequest) {
@@ -1142,7 +1280,7 @@ func (node *AlgorandFullNode) VotingKeys(votingRound, keysRound basics.Round) []
keys := node.accountManager.Keys(votingRound)
participations := make([]account.Participation, 0, len(keys))
- accountsData := make(map[basics.Address]basics.AccountData, len(keys))
+ accountsData := make(map[basics.Address]basics.OnlineAccountData, len(keys))
matchingAccountsKeys := make(map[basics.Address]bool)
mismatchingAccountsKeys := make(map[basics.Address]int)
const bitMismatchingVotingKey = 1
@@ -1151,7 +1289,7 @@ func (node *AlgorandFullNode) VotingKeys(votingRound, keysRound basics.Round) []
acctData, hasAccountData := accountsData[part.Parent]
if !hasAccountData {
var err error
- acctData, _, err = node.ledger.LookupWithoutRewards(keysRound, part.Parent)
+ acctData, err = node.ledger.LookupAgreement(keysRound, part.Parent)
if err != nil {
node.log.Warnf("node.VotingKeys: Account %v not participating: cannot locate account for round %d : %v", part.Address(), keysRound, err)
continue
@@ -1169,6 +1307,12 @@ func (node *AlgorandFullNode) VotingKeys(votingRound, keysRound basics.Round) []
}
participations = append(participations, part)
matchingAccountsKeys[part.Address()] = true
+
+ // Make sure the key is registered.
+ err := node.accountManager.Registry().Register(part.ID(), votingRound)
+ if err != nil {
+ node.log.Warnf("Failed to register participation key (%s) with participation registry: %v\n", part.ID(), err)
+ }
}
// write the warnings per account only if we couldn't find a single valid key for that account.
for mismatchingAddr, warningFlags := range mismatchingAccountsKeys {
@@ -1186,3 +1330,8 @@ func (node *AlgorandFullNode) VotingKeys(votingRound, keysRound basics.Round) []
}
return participations
}
+
+// Record forwards participation record calls to the participation registry.
+func (node *AlgorandFullNode) Record(account basics.Address, round basics.Round, participationType account.ParticipationAction) {
+ node.accountManager.Record(account, round, participationType)
+}