summaryrefslogtreecommitdiff
path: root/agreement/pseudonode.go
diff options
context:
space:
mode:
Diffstat (limited to 'agreement/pseudonode.go')
-rw-r--r--agreement/pseudonode.go115
1 files changed, 79 insertions, 36 deletions
diff --git a/agreement/pseudonode.go b/agreement/pseudonode.go
index e2809fb13..2589028cb 100644
--- a/agreement/pseudonode.go
+++ b/agreement/pseudonode.go
@@ -18,11 +18,11 @@ package agreement
import (
"context"
+ "errors"
"fmt"
"sync"
"time"
- "github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/data/account"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/logging"
@@ -33,13 +33,14 @@ import (
// TODO put these in config
const (
- pseudonodeVerificationBacklog = 32
+ pseudonodeVerificationBacklog = 32
+ maxPseudonodeOutputWaitDuration = 2 * time.Second
)
var errPseudonodeBacklogFull = fmt.Errorf("pseudonode input channel is full")
-var errPseudonodeVerifierClosedChannel = fmt.Errorf("crypto verifier closed the output channel prematurely")
-var errPseudonodeNoVotes = fmt.Errorf("no valid participation keys to generate votes for given round")
-var errPseudonodeNoProposals = fmt.Errorf("no valid participation keys to generate proposals for given round")
+var errPseudonodeVerifierClosedChannel = errors.New("crypto verifier closed the output channel prematurely")
+var errPseudonodeNoVotes = errors.New("no valid participation keys to generate votes for given round")
+var errPseudonodeNoProposals = errors.New("no valid participation keys to generate proposals for given round")
// A pseudonode creates proposals and votes with a KeyManager which holds participation keys.
//
@@ -174,7 +175,7 @@ func (n asyncPseudonode) MakeProposals(ctx context.Context, r round, p period) (
return proposalTask.outputChannel(), nil
default:
proposalTask.close()
- return nil, errPseudonodeBacklogFull
+ return nil, fmt.Errorf("unable to make proposal for (%d, %d): %w", r, p, errPseudonodeBacklogFull)
}
}
@@ -191,7 +192,7 @@ func (n asyncPseudonode) MakeVotes(ctx context.Context, r round, p period, s ste
return proposalTask.outputChannel(), nil
default:
proposalTask.close()
- return nil, errPseudonodeBacklogFull
+ return nil, fmt.Errorf("unable to make vote for (%d, %d, %d): %w", r, p, s, errPseudonodeBacklogFull)
}
}
@@ -267,8 +268,7 @@ func (n asyncPseudonode) makePseudonodeVerifier(voteVerifier *AsyncVoteVerifier)
// makeProposals creates a slice of block proposals for the given round and period.
func (n asyncPseudonode) makeProposals(round basics.Round, period period, accounts []account.Participation) ([]proposal, []unauthenticatedVote) {
- deadline := time.Now().Add(config.ProposalAssemblyTime)
- ve, err := n.factory.AssembleBlock(round, deadline)
+ ve, err := n.factory.AssembleBlock(round)
if err != nil {
if err != ErrAssembleBlockRoundStale {
n.log.Errorf("pseudonode.makeProposals: could not generate a proposal for round %d: %v", round, err)
@@ -367,13 +367,20 @@ func (t pseudonodeVotesTask) execute(verifier *AsyncVoteVerifier, quit chan stru
unverifiedVotes := t.node.makeVotes(t.round, t.period, t.step, t.prop, t.participation)
t.node.log.Infof("pseudonode: made %v votes", len(unverifiedVotes))
results := make(chan asyncVerifyVoteResponse, len(unverifiedVotes))
+ orderedResults := make([]asyncVerifyVoteResponse, len(unverifiedVotes))
+ asyncVerifyingVotes := len(unverifiedVotes)
for i, uv := range unverifiedVotes {
msg := message{Tag: protocol.AgreementVoteTag, UnauthenticatedVote: uv}
- verifier.verifyVote(context.TODO(), t.node.ledger, uv, i, msg, results)
+ err := verifier.verifyVote(context.TODO(), t.node.ledger, uv, i, msg, results)
+ if err != nil {
+ orderedResults[i].err = err
+ t.node.log.Infof("pseudonode.makeVotes: failed to enqueue vote verification for (%d, %d): %v", t.round, t.period, err)
+ asyncVerifyingVotes--
+ continue
+ }
}
- orderedResults := make([]asyncVerifyVoteResponse, len(unverifiedVotes))
- for i := 0; i < len(unverifiedVotes); i++ {
+ for i := 0; i < asyncVerifyingVotes; i++ {
resp := <-results
orderedResults[resp.index] = resp
}
@@ -440,15 +447,26 @@ func (t pseudonodeVotesTask) execute(verifier *AsyncVoteVerifier, quit chan stru
}
t.node.monitor.dec(pseudonodeCoserviceType)
+ outputTimeout := time.After(maxPseudonodeOutputWaitDuration)
+
// push results into channel.
+verifiedVotesLoop:
for _, r := range verifiedResults {
- select {
- case t.out <- messageEvent{T: voteVerified, Input: r.message, Err: makeSerErr(r.err)}:
- case <-quit:
- return
- case <-t.context.Done():
- // we done care about the output anymore; just exit.
- return
+ for {
+ select {
+ case t.out <- messageEvent{T: voteVerified, Input: r.message, Err: makeSerErr(r.err)}:
+ t.node.keys.Record(r.v.R.Sender, r.v.R.Round, account.Vote)
+ continue verifiedVotesLoop
+ case <-quit:
+ return
+ case <-t.context.Done():
+ // we done care about the output anymore; just exit.
+ return
+ case <-outputTimeout:
+ // we've been waiting for too long for this vote to be written to the output.
+ t.node.log.Warnf("pseudonode.makeVotes: unable to write vote to output channel for round %d, period %d", t.round, t.period)
+ outputTimeout = nil
+ }
}
}
}
@@ -477,13 +495,20 @@ func (t pseudonodeProposalsTask) execute(verifier *AsyncVoteVerifier, quit chan
// For now, don't log at all, and revisit when the metric becomes more important.
results := make(chan asyncVerifyVoteResponse, len(votes))
+ cryptoOutputs := make([]asyncVerifyVoteResponse, len(votes))
+ asyncVerifyingVotes := len(votes)
for i, uv := range votes {
msg := message{Tag: protocol.AgreementVoteTag, UnauthenticatedVote: uv}
- verifier.verifyVote(context.TODO(), t.node.ledger, uv, i, msg, results)
+ err := verifier.verifyVote(context.TODO(), t.node.ledger, uv, i, msg, results)
+ if err != nil {
+ cryptoOutputs[i].err = err
+ t.node.log.Infof("pseudonode.makeProposals: failed to enqueue vote verification for (%d, %d): %v", t.round, t.period, err)
+ asyncVerifyingVotes--
+ continue
+ }
}
- cryptoOutputs := make([]asyncVerifyVoteResponse, len(votes))
- for i := 0; i < len(votes); i++ {
+ for i := 0; i < asyncVerifyingVotes; i++ {
resp := <-results
cryptoOutputs[resp.index] = resp
}
@@ -527,27 +552,45 @@ func (t pseudonodeProposalsTask) execute(verifier *AsyncVoteVerifier, quit chan
}
t.node.monitor.dec(pseudonodeCoserviceType)
+ outputTimeout := time.After(maxPseudonodeOutputWaitDuration)
// push results into channel.
+verifiedVotesLoop:
for _, r := range verifiedVotes {
- select {
- case t.out <- messageEvent{T: voteVerified, Input: r.message, Err: makeSerErr(r.err)}:
- case <-quit:
- return
- case <-t.context.Done():
- // we done care about the output anymore; just exit.
- return
+ for {
+ select {
+ case t.out <- messageEvent{T: voteVerified, Input: r.message, Err: makeSerErr(r.err)}:
+ t.node.keys.Record(r.v.R.Sender, r.v.R.Round, account.BlockProposal)
+ continue verifiedVotesLoop
+ case <-quit:
+ return
+ case <-t.context.Done():
+ // we done care about the output anymore; just exit.
+ return
+ case <-outputTimeout:
+ // we've been waiting for too long for this vote to be written to the output.
+ t.node.log.Warnf("pseudonode.makeProposals: unable to write proposal vote to output channel for round %d, period %d", t.round, t.period)
+ outputTimeout = nil
+ }
}
}
+verifiedPayloadsLoop:
for _, payload := range verifiedPayloads {
msg := message{Tag: protocol.ProposalPayloadTag, UnauthenticatedProposal: payload.u(), Proposal: payload}
- select {
- case t.out <- messageEvent{T: payloadVerified, Input: msg}:
- case <-quit:
- return
- case <-t.context.Done():
- // we done care about the output anymore; just exit.
- return
+ for {
+ select {
+ case t.out <- messageEvent{T: payloadVerified, Input: msg}:
+ continue verifiedPayloadsLoop
+ case <-quit:
+ return
+ case <-t.context.Done():
+ // we done care about the output anymore; just exit.
+ return
+ case <-outputTimeout:
+ // we've been waiting for too long for this vote to be written to the output.
+ t.node.log.Warnf("pseudonode.makeProposals: unable to write proposal payload to output channel for round %d, period %d", t.round, t.period)
+ outputTimeout = nil
+ }
}
}
}