diff options
Diffstat (limited to 'agreement/pseudonode.go')
-rw-r--r-- | agreement/pseudonode.go | 115 |
1 files changed, 79 insertions, 36 deletions
diff --git a/agreement/pseudonode.go b/agreement/pseudonode.go index e2809fb13..2589028cb 100644 --- a/agreement/pseudonode.go +++ b/agreement/pseudonode.go @@ -18,11 +18,11 @@ package agreement import ( "context" + "errors" "fmt" "sync" "time" - "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/data/account" "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/logging" @@ -33,13 +33,14 @@ import ( // TODO put these in config const ( - pseudonodeVerificationBacklog = 32 + pseudonodeVerificationBacklog = 32 + maxPseudonodeOutputWaitDuration = 2 * time.Second ) var errPseudonodeBacklogFull = fmt.Errorf("pseudonode input channel is full") -var errPseudonodeVerifierClosedChannel = fmt.Errorf("crypto verifier closed the output channel prematurely") -var errPseudonodeNoVotes = fmt.Errorf("no valid participation keys to generate votes for given round") -var errPseudonodeNoProposals = fmt.Errorf("no valid participation keys to generate proposals for given round") +var errPseudonodeVerifierClosedChannel = errors.New("crypto verifier closed the output channel prematurely") +var errPseudonodeNoVotes = errors.New("no valid participation keys to generate votes for given round") +var errPseudonodeNoProposals = errors.New("no valid participation keys to generate proposals for given round") // A pseudonode creates proposals and votes with a KeyManager which holds participation keys. // @@ -174,7 +175,7 @@ func (n asyncPseudonode) MakeProposals(ctx context.Context, r round, p period) ( return proposalTask.outputChannel(), nil default: proposalTask.close() - return nil, errPseudonodeBacklogFull + return nil, fmt.Errorf("unable to make proposal for (%d, %d): %w", r, p, errPseudonodeBacklogFull) } } @@ -191,7 +192,7 @@ func (n asyncPseudonode) MakeVotes(ctx context.Context, r round, p period, s ste return proposalTask.outputChannel(), nil default: proposalTask.close() - return nil, errPseudonodeBacklogFull + return nil, fmt.Errorf("unable to make vote for (%d, %d, %d): %w", r, p, s, errPseudonodeBacklogFull) } } @@ -267,8 +268,7 @@ func (n asyncPseudonode) makePseudonodeVerifier(voteVerifier *AsyncVoteVerifier) // makeProposals creates a slice of block proposals for the given round and period. func (n asyncPseudonode) makeProposals(round basics.Round, period period, accounts []account.Participation) ([]proposal, []unauthenticatedVote) { - deadline := time.Now().Add(config.ProposalAssemblyTime) - ve, err := n.factory.AssembleBlock(round, deadline) + ve, err := n.factory.AssembleBlock(round) if err != nil { if err != ErrAssembleBlockRoundStale { n.log.Errorf("pseudonode.makeProposals: could not generate a proposal for round %d: %v", round, err) @@ -367,13 +367,20 @@ func (t pseudonodeVotesTask) execute(verifier *AsyncVoteVerifier, quit chan stru unverifiedVotes := t.node.makeVotes(t.round, t.period, t.step, t.prop, t.participation) t.node.log.Infof("pseudonode: made %v votes", len(unverifiedVotes)) results := make(chan asyncVerifyVoteResponse, len(unverifiedVotes)) + orderedResults := make([]asyncVerifyVoteResponse, len(unverifiedVotes)) + asyncVerifyingVotes := len(unverifiedVotes) for i, uv := range unverifiedVotes { msg := message{Tag: protocol.AgreementVoteTag, UnauthenticatedVote: uv} - verifier.verifyVote(context.TODO(), t.node.ledger, uv, i, msg, results) + err := verifier.verifyVote(context.TODO(), t.node.ledger, uv, i, msg, results) + if err != nil { + orderedResults[i].err = err + t.node.log.Infof("pseudonode.makeVotes: failed to enqueue vote verification for (%d, %d): %v", t.round, t.period, err) + asyncVerifyingVotes-- + continue + } } - orderedResults := make([]asyncVerifyVoteResponse, len(unverifiedVotes)) - for i := 0; i < len(unverifiedVotes); i++ { + for i := 0; i < asyncVerifyingVotes; i++ { resp := <-results orderedResults[resp.index] = resp } @@ -440,15 +447,26 @@ func (t pseudonodeVotesTask) execute(verifier *AsyncVoteVerifier, quit chan stru } t.node.monitor.dec(pseudonodeCoserviceType) + outputTimeout := time.After(maxPseudonodeOutputWaitDuration) + // push results into channel. +verifiedVotesLoop: for _, r := range verifiedResults { - select { - case t.out <- messageEvent{T: voteVerified, Input: r.message, Err: makeSerErr(r.err)}: - case <-quit: - return - case <-t.context.Done(): - // we done care about the output anymore; just exit. - return + for { + select { + case t.out <- messageEvent{T: voteVerified, Input: r.message, Err: makeSerErr(r.err)}: + t.node.keys.Record(r.v.R.Sender, r.v.R.Round, account.Vote) + continue verifiedVotesLoop + case <-quit: + return + case <-t.context.Done(): + // we done care about the output anymore; just exit. + return + case <-outputTimeout: + // we've been waiting for too long for this vote to be written to the output. + t.node.log.Warnf("pseudonode.makeVotes: unable to write vote to output channel for round %d, period %d", t.round, t.period) + outputTimeout = nil + } } } } @@ -477,13 +495,20 @@ func (t pseudonodeProposalsTask) execute(verifier *AsyncVoteVerifier, quit chan // For now, don't log at all, and revisit when the metric becomes more important. results := make(chan asyncVerifyVoteResponse, len(votes)) + cryptoOutputs := make([]asyncVerifyVoteResponse, len(votes)) + asyncVerifyingVotes := len(votes) for i, uv := range votes { msg := message{Tag: protocol.AgreementVoteTag, UnauthenticatedVote: uv} - verifier.verifyVote(context.TODO(), t.node.ledger, uv, i, msg, results) + err := verifier.verifyVote(context.TODO(), t.node.ledger, uv, i, msg, results) + if err != nil { + cryptoOutputs[i].err = err + t.node.log.Infof("pseudonode.makeProposals: failed to enqueue vote verification for (%d, %d): %v", t.round, t.period, err) + asyncVerifyingVotes-- + continue + } } - cryptoOutputs := make([]asyncVerifyVoteResponse, len(votes)) - for i := 0; i < len(votes); i++ { + for i := 0; i < asyncVerifyingVotes; i++ { resp := <-results cryptoOutputs[resp.index] = resp } @@ -527,27 +552,45 @@ func (t pseudonodeProposalsTask) execute(verifier *AsyncVoteVerifier, quit chan } t.node.monitor.dec(pseudonodeCoserviceType) + outputTimeout := time.After(maxPseudonodeOutputWaitDuration) // push results into channel. +verifiedVotesLoop: for _, r := range verifiedVotes { - select { - case t.out <- messageEvent{T: voteVerified, Input: r.message, Err: makeSerErr(r.err)}: - case <-quit: - return - case <-t.context.Done(): - // we done care about the output anymore; just exit. - return + for { + select { + case t.out <- messageEvent{T: voteVerified, Input: r.message, Err: makeSerErr(r.err)}: + t.node.keys.Record(r.v.R.Sender, r.v.R.Round, account.BlockProposal) + continue verifiedVotesLoop + case <-quit: + return + case <-t.context.Done(): + // we done care about the output anymore; just exit. + return + case <-outputTimeout: + // we've been waiting for too long for this vote to be written to the output. + t.node.log.Warnf("pseudonode.makeProposals: unable to write proposal vote to output channel for round %d, period %d", t.round, t.period) + outputTimeout = nil + } } } +verifiedPayloadsLoop: for _, payload := range verifiedPayloads { msg := message{Tag: protocol.ProposalPayloadTag, UnauthenticatedProposal: payload.u(), Proposal: payload} - select { - case t.out <- messageEvent{T: payloadVerified, Input: msg}: - case <-quit: - return - case <-t.context.Done(): - // we done care about the output anymore; just exit. - return + for { + select { + case t.out <- messageEvent{T: payloadVerified, Input: msg}: + continue verifiedPayloadsLoop + case <-quit: + return + case <-t.context.Done(): + // we done care about the output anymore; just exit. + return + case <-outputTimeout: + // we've been waiting for too long for this vote to be written to the output. + t.node.log.Warnf("pseudonode.makeProposals: unable to write proposal payload to output channel for round %d, period %d", t.round, t.period) + outputTimeout = nil + } } } } |