summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Jannotti <jj@cs.brown.edu>2022-01-17 22:07:58 -0500
committerJohn Jannotti <jj@cs.brown.edu>2022-01-17 22:07:58 -0500
commit63750bcb73b1af7ed5e5f8f1a5cb127a1e457e88 (patch)
tree3426f347e1702eb25cad77505436bc1b8b3ff934
parenta106e8366ac37b49cd9ad48f7423ab0ecc867422 (diff)
parent4006ce219ebcebbb65c621b9f04362b45b256065 (diff)
Merge branch 'master' into feature/contract-to-contract
-rw-r--r--agreement/demux.go2
-rw-r--r--agreement/listener.go12
-rw-r--r--agreement/persistence.go24
-rw-r--r--agreement/persistence_test.go7
-rw-r--r--agreement/proposalManager.go4
-rw-r--r--agreement/proposalStore.go6
-rw-r--r--agreement/proposalTracker.go3
-rw-r--r--agreement/pseudonode.go1
-rw-r--r--agreement/service.go4
-rw-r--r--agreement/trace.go5
-rw-r--r--agreement/voteAggregator.go7
-rw-r--r--agreement/voteAuxiliary.go8
-rw-r--r--agreement/voteTracker.go22
-rw-r--r--ledger/acctupdates_test.go22
-rw-r--r--network/wsNetwork.go21
-rw-r--r--network/wsPeer.go10
16 files changed, 82 insertions, 76 deletions
diff --git a/agreement/demux.go b/agreement/demux.go
index fe48f604b..65a5d06a0 100644
--- a/agreement/demux.go
+++ b/agreement/demux.go
@@ -122,7 +122,7 @@ func (d *demux) tokenizeMessages(ctx context.Context, net Network, tag protocol.
o, err := tokenize(raw.Data)
if err != nil {
- logging.Base().Warnf("disconnecting from peer: error decoding message tagged %v: %v", tag, err)
+ d.log.Warnf("disconnecting from peer: error decoding message tagged %v: %v", tag, err)
net.Disconnect(raw.MessageHandle)
d.UpdateEventsQueue(eventQueueTokenizing[tag], 0)
continue
diff --git a/agreement/listener.go b/agreement/listener.go
index 224e0c53e..b0dafaff7 100644
--- a/agreement/listener.go
+++ b/agreement/listener.go
@@ -16,10 +16,6 @@
package agreement
-import (
- "github.com/algorand/go-algorand/logging"
-)
-
// A listener is a state machine which can handle events, returning new events.
type listener interface {
// T returns the stateMachineTag describing the listener.
@@ -60,17 +56,17 @@ func (l checkedListener) handle(r routerHandle, p player, in event) event {
errs := l.pre(p, in)
if len(errs) != 0 {
for _, err := range errs {
- logging.Base().Errorf("%v: precondition violated: %v", l.T(), err)
+ r.t.log.Errorf("%v: precondition violated: %v", l.T(), err)
}
- logging.Base().Panicf("%v: precondition violated: %v", l.T(), errs[0])
+ r.t.log.Panicf("%v: precondition violated: %v", l.T(), errs[0])
}
out := l.listener.handle(r, p, in)
errs = l.post(p, in, out)
if len(errs) != 0 {
for _, err := range errs {
- logging.Base().Errorf("%v: postcondition violated: %v", l.T(), err)
+ r.t.log.Errorf("%v: postcondition violated: %v", l.T(), err)
}
- logging.Base().Panicf("%v: postcondition violated: %v", l.T(), errs[0])
+ r.t.log.Panicf("%v: postcondition violated: %v", l.T(), errs[0])
}
return out
}
diff --git a/agreement/persistence.go b/agreement/persistence.go
index b11fd4f0a..d92abbd1e 100644
--- a/agreement/persistence.go
+++ b/agreement/persistence.go
@@ -82,7 +82,7 @@ func persist(log serviceLogger, crash db.Accessor, Round basics.Round, Period pe
return
}
- logging.Base().Errorf("persisting failure: %v", err)
+ log.Errorf("persisting failure: %v", err)
return
}
@@ -90,7 +90,7 @@ func persist(log serviceLogger, crash db.Accessor, Round basics.Round, Period pe
//
// In case it's unable to clear the Service table, an error would get logged.
func reset(log logging.Logger, crash db.Accessor) {
- logging.Base().Infof("reset (agreement): resetting crash state")
+ log.Infof("reset (agreement): resetting crash state")
err := crash.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
// we could not retrieve our state, so wipe it
@@ -99,7 +99,7 @@ func reset(log logging.Logger, crash db.Accessor) {
})
if err != nil {
- logging.Base().Warnf("reset (agreement): failed to clear Service table - %v", err)
+ log.Warnf("reset (agreement): failed to clear Service table - %v", err)
}
}
@@ -124,7 +124,7 @@ func restore(log logging.Logger, crash db.Accessor) (raw []byte, err error) {
if err == nil {
// the above call was completed sucecssfully, which means that we've just created the table ( which wasn't there ! ).
// in that case, the table is guaranteed to be empty, and therefore we can return right here.
- logging.Base().Infof("restore (agreement): crash state table initialized")
+ log.Infof("restore (agreement): crash state table initialized")
err = errNoCrashStateAvailable
return
}
@@ -135,7 +135,7 @@ func restore(log logging.Logger, crash db.Accessor) (raw []byte, err error) {
if !reset {
return
}
- logging.Base().Infof("restore (agreement): resetting crash state")
+ log.Infof("restore (agreement): resetting crash state")
// we could not retrieve our state, so wipe it
_, err = tx.Exec("delete from Service")
@@ -149,12 +149,12 @@ func restore(log logging.Logger, crash db.Accessor) (raw []byte, err error) {
row := tx.QueryRow("select count(*) from Service")
err := row.Scan(&nrows)
if err != nil {
- logging.Base().Errorf("restore (agreement): could not query raw state: %v", err)
+ log.Errorf("restore (agreement): could not query raw state: %v", err)
reset = true
return err
}
if nrows != 1 {
- logging.Base().Infof("restore (agreement): crash state not found (n = %d)", nrows)
+ log.Infof("restore (agreement): crash state not found (n = %d)", nrows)
reset = true
noCrashState = true // this is a normal case (we have leftover crash state from an old round)
return errNoCrashStateAvailable
@@ -163,7 +163,7 @@ func restore(log logging.Logger, crash db.Accessor) (raw []byte, err error) {
row = tx.QueryRow("select data from Service")
err = row.Scan(&raw)
if err != nil {
- logging.Base().Errorf("restore (agreement): could not read crash state raw data: %v", err)
+ log.Errorf("restore (agreement): could not read crash state raw data: %v", err)
reset = true
return err
}
@@ -176,7 +176,7 @@ func restore(log logging.Logger, crash db.Accessor) (raw []byte, err error) {
// decode process the incoming raw bytes array and attempt to reconstruct the agreement state objects.
//
// In all decoding errors, it returns the error code in err
-func decode(raw []byte, t0 timers.Clock) (t timers.Clock, rr rootRouter, p player, a []action, err error) {
+func decode(raw []byte, t0 timers.Clock, log serviceLogger) (t timers.Clock, rr rootRouter, p player, a []action, err error) {
var t2 timers.Clock
var rr2 rootRouter
var p2 player
@@ -185,7 +185,7 @@ func decode(raw []byte, t0 timers.Clock) (t timers.Clock, rr rootRouter, p playe
err = protocol.DecodeReflect(raw, &s)
if err != nil {
- logging.Base().Errorf("decode (agreement): error decoding retrieved state (len = %v): %v", len(raw), err)
+ log.Errorf("decode (agreement): error decoding retrieved state (len = %v): %v", len(raw), err)
return
}
@@ -307,9 +307,9 @@ func (p *asyncPersistenceLoop) loop(ctx context.Context) {
// sanity check; we check it after the fact, since it's not expected to ever happen.
// performance-wise, it takes approximitly 300000ns to execute, and we don't want it to
// block the persist operation.
- _, _, _, _, derr := decode(s.raw, s.clock)
+ _, _, _, _, derr := decode(s.raw, s.clock, p.log)
if derr != nil {
- logging.Base().Errorf("could not decode own encoded disk state: %v", derr)
+ p.log.Errorf("could not decode own encoded disk state: %v", derr)
}
}
}
diff --git a/agreement/persistence_test.go b/agreement/persistence_test.go
index 79aabd675..94221f7be 100644
--- a/agreement/persistence_test.go
+++ b/agreement/persistence_test.go
@@ -43,7 +43,8 @@ func TestAgreementSerialization(t *testing.T) {
encodedBytes := encode(clock, router, status, a)
t0 := timers.MakeMonotonicClock(time.Date(2000, 0, 0, 0, 0, 0, 0, time.UTC))
- clock2, router2, status2, a2, err := decode(encodedBytes, t0)
+ log := makeServiceLogger(logging.Base())
+ clock2, router2, status2, a2, err := decode(encodedBytes, t0, log)
require.NoError(t, err)
require.Equalf(t, clock, clock2, "Clock wasn't serialized/deserialized correctly")
require.Equalf(t, router, router2, "Router wasn't serialized/deserialized correctly")
@@ -77,10 +78,10 @@ func BenchmarkAgreementDeserialization(b *testing.B) {
encodedBytes := encode(clock, router, status, a)
t0 := timers.MakeMonotonicClock(time.Date(2000, 0, 0, 0, 0, 0, 0, time.UTC))
-
+ log := makeServiceLogger(logging.Base())
b.ResetTimer()
for n := 0; n < b.N; n++ {
- decode(encodedBytes, t0)
+ decode(encodedBytes, t0, log)
}
}
diff --git a/agreement/proposalManager.go b/agreement/proposalManager.go
index 9a6e14166..8cf03b32f 100644
--- a/agreement/proposalManager.go
+++ b/agreement/proposalManager.go
@@ -18,8 +18,6 @@ package agreement
import (
"fmt"
-
- "github.com/algorand/go-algorand/logging"
)
// A proposalManager is a proposalMachine which applies relay rules to incoming
@@ -71,7 +69,7 @@ func (m *proposalManager) handle(r routerHandle, p player, e event) event {
r = m.handleNewPeriod(r, p, e.(thresholdEvent))
return emptyEvent{}
}
- logging.Base().Panicf("proposalManager: bad event type: observed an event of type %v", e.t())
+ r.t.log.Panicf("proposalManager: bad event type: observed an event of type %v", e.t())
panic("not reached")
}
diff --git a/agreement/proposalStore.go b/agreement/proposalStore.go
index 7cc9a7643..973f909d0 100644
--- a/agreement/proposalStore.go
+++ b/agreement/proposalStore.go
@@ -18,8 +18,6 @@ package agreement
import (
"fmt"
-
- "github.com/algorand/go-algorand/logging"
)
// An blockAssembler contains the proposal data associated with some
@@ -289,7 +287,7 @@ func (store *proposalStore) handle(r routerHandle, p player, e event) event {
case newRound:
if len(store.Assemblers) > 1 {
// TODO this check is really an implementation invariant; move it into a whitebox test
- logging.Base().Panic("too many assemblers")
+ r.t.log.Panic("too many assemblers")
}
for pv, ea := range store.Assemblers {
if ea.Filled {
@@ -347,7 +345,7 @@ func (store *proposalStore) handle(r routerHandle, p player, e event) event {
se.Payload = ea.Payload
return se
}
- logging.Base().Panicf("proposalStore: bad event type: observed an event of type %v", e.t())
+ r.t.log.Panicf("proposalStore: bad event type: observed an event of type %v", e.t())
panic("not reached")
}
diff --git a/agreement/proposalTracker.go b/agreement/proposalTracker.go
index 5b17bb10b..c76c5c9fd 100644
--- a/agreement/proposalTracker.go
+++ b/agreement/proposalTracker.go
@@ -20,7 +20,6 @@ import (
"fmt"
"github.com/algorand/go-algorand/data/basics"
- "github.com/algorand/go-algorand/logging"
)
// A proposalSeeker finds the vote with the lowest credential until freeze() is
@@ -180,7 +179,7 @@ func (t *proposalTracker) handle(r routerHandle, p player, e event) event {
return se
}
- logging.Base().Panicf("proposalTracker: bad event type: observed an event of type %v", e.t())
+ r.t.log.Panicf("proposalTracker: bad event type: observed an event of type %v", e.t())
panic("not reached")
}
diff --git a/agreement/pseudonode.go b/agreement/pseudonode.go
index 12765970c..f645ca362 100644
--- a/agreement/pseudonode.go
+++ b/agreement/pseudonode.go
@@ -481,7 +481,6 @@ func (t pseudonodeProposalsTask) execute(verifier *AsyncVoteVerifier, quit chan
payloads, votes := t.node.makeProposals(t.round, t.period, t.participation)
fields := logging.Fields{
- "Context": "Agreement",
"Type": logspec.ProposalAssembled.String(),
"ObjectRound": t.round,
"ObjectPeriod": t.period,
diff --git a/agreement/service.go b/agreement/service.go
index 5e1fac3da..346234950 100644
--- a/agreement/service.go
+++ b/agreement/service.go
@@ -93,7 +93,7 @@ func MakeService(p Parameters) *Service {
s.parameters = parameters(p)
- s.log = serviceLogger{Logger: p.Logger}
+ s.log = makeServiceLogger(p.Logger)
// GOAL2-541: tracer is not concurrency safe. It should only ever be
// accessed by main state machine loop.
@@ -191,7 +191,7 @@ func (s *Service) mainLoop(input <-chan externalEvent, output chan<- []action, r
var err error
raw, err := restore(s.log, s.Accessor)
if err == nil {
- clock, router, status, a, err = decode(raw, s.Clock)
+ clock, router, status, a, err = decode(raw, s.Clock, s.log)
if err != nil {
reset(s.log, s.Accessor)
} else {
diff --git a/agreement/trace.go b/agreement/trace.go
index 70dae3798..a3f8cea66 100644
--- a/agreement/trace.go
+++ b/agreement/trace.go
@@ -497,9 +497,12 @@ type serviceLogger struct {
logging.Logger
}
+func makeServiceLogger(log logging.Logger) serviceLogger {
+ return serviceLogger{log.With("Context", "Agreement")}
+}
+
func (log serviceLogger) with(e logspec.AgreementEvent) serviceLogger {
fields := logging.Fields{
- "Context": "Agreement",
"Type": e.Type.String(),
"Round": e.Round,
"Period": e.Period,
diff --git a/agreement/voteAggregator.go b/agreement/voteAggregator.go
index 95b378c0d..057f9d14f 100644
--- a/agreement/voteAggregator.go
+++ b/agreement/voteAggregator.go
@@ -19,7 +19,6 @@ package agreement
import (
"fmt"
- "github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/protocol"
)
@@ -118,7 +117,7 @@ func (agg *voteAggregator) handle(r routerHandle, pr player, em event) (res even
} else if tE.(thresholdEvent).Round == e.FreshnessData.PlayerRound+1 {
return emptyEvent{}
}
- logging.Base().Panicf("bad round (%v, %v)", tE.(thresholdEvent).Round, e.FreshnessData.PlayerRound) // TODO this should be a postcondition check; move it
+ r.t.log.Panicf("bad round (%v, %v)", tE.(thresholdEvent).Round, e.FreshnessData.PlayerRound) // TODO this should be a postcondition check; move it
case bundlePresent:
ub := e.Input.UnauthenticatedBundle
@@ -180,7 +179,7 @@ func (agg *voteAggregator) handle(r routerHandle, pr player, em event) (res even
smErr := makeSerErrf("bundle for (%v, %v, %v: %v) failed to cause a significant state change", b.U.Round, b.U.Period, b.U.Step, b.U.Proposal)
return filteredEvent{T: bundleFiltered, Err: smErr}
}
- logging.Base().Panicf("voteAggregator: bad event type: observed an event of type %v", e.t())
+ r.t.log.Panicf("voteAggregator: bad event type: observed an event of type %v", e.t())
panic("not reached")
}
@@ -200,7 +199,7 @@ func (agg *voteAggregator) filterVote(proto protocol.ConsensusVersion, p player,
case none:
return nil
}
- logging.Base().Panicf("voteAggregator: bad event type: while filtering, observed an event of type %v", filterRes.t())
+ r.t.log.Panicf("voteAggregator: bad event type: while filtering, observed an event of type %v", filterRes.t())
panic("not reached")
}
diff --git a/agreement/voteAuxiliary.go b/agreement/voteAuxiliary.go
index f419feebc..0c6e85c47 100644
--- a/agreement/voteAuxiliary.go
+++ b/agreement/voteAuxiliary.go
@@ -16,10 +16,6 @@
package agreement
-import (
- "github.com/algorand/go-algorand/logging"
-)
-
// A voteTrackerPeriod is a voteMachinePeriod which indicates whether a
// next-threshold of votes was observed for a some value in a period.
type voteTrackerPeriod struct {
@@ -82,7 +78,7 @@ func (t *voteTrackerPeriod) handle(r routerHandle, p player, e event) event {
case nextThresholdStatusRequest:
return t.Cached
default:
- logging.Base().Panicf("voteTrackerPeriod: bad event type: observed an event of type %v", e.t())
+ r.t.log.Panicf("voteTrackerPeriod: bad event type: observed an event of type %v", e.t())
panic("not reached")
}
}
@@ -152,7 +148,7 @@ func (t *voteTrackerRound) handle(r routerHandle, p player, e event) event {
case freshestBundleRequest:
return freshestBundleEvent{Ok: t.Ok, Event: t.Freshest}
default:
- logging.Base().Panicf("voteTrackerRound: bad event type: observed an event of type %v", e.t())
+ r.t.log.Panicf("voteTrackerRound: bad event type: observed an event of type %v", e.t())
panic("not reached")
}
}
diff --git a/agreement/voteTracker.go b/agreement/voteTracker.go
index df95f4223..d0f717abd 100644
--- a/agreement/voteTracker.go
+++ b/agreement/voteTracker.go
@@ -23,7 +23,6 @@ import (
"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/data/basics"
- "github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/logging/telemetryspec"
)
@@ -129,12 +128,12 @@ func (tracker *voteTracker) handle(r routerHandle, p player, e0 event) event {
PreviousProposalHash1: eqVote.Proposals[0].BlockDigest.String(),
PreviousProposalHash2: eqVote.Proposals[1].BlockDigest.String(),
}
- logging.Base().EventWithDetails(telemetryspec.ApplicationState, telemetryspec.EquivocatedVoteEvent, equivocationDetails)
+ r.t.log.EventWithDetails(telemetryspec.ApplicationState, telemetryspec.EquivocatedVoteEvent, equivocationDetails)
return thresholdEvent{}
}
- _, overBefore := tracker.overThreshold(proto, e.Vote.R.Step)
+ _, overBefore := tracker.overThreshold(proto, e.Vote.R.Step, r.t.log)
oldVote, voted := tracker.Voters[sender]
@@ -170,9 +169,10 @@ func (tracker *voteTracker) handle(r routerHandle, p player, e0 event) event {
Weight: e.Vote.Cred.Weight,
PreviousProposalHash1: oldVote.R.Proposal.BlockDigest.String(),
}
- logging.Base().EventWithDetails(telemetryspec.ApplicationState, telemetryspec.EquivocatedVoteEvent, equivocationDetails)
- logging.Base().Warnf("voteTracker: observed an equivocator: %v (vote was %v)", sender, e.Vote)
+ r.t.log.EventWithDetails(telemetryspec.ApplicationState, telemetryspec.EquivocatedVoteEvent, equivocationDetails)
+
+ r.t.log.Warnf("voteTracker: observed an equivocator: %v (vote was %v)", sender, e.Vote)
// sender was not already marked as an equivocator so track
// their weight
@@ -183,7 +183,7 @@ func (tracker *voteTracker) handle(r routerHandle, p player, e0 event) event {
// In order for this to be triggered, more than 75% of the vote for the given step need to vote for more than
// a single proposal. In that state, all the proposals become "above threshold". That's a serious issue, since
// it would compromise the honest node core assumption.
- logging.Base().Panicf("too many equivocators for step %d: %d", e.Vote.R.Step, tracker.EquivocatorsCount)
+ r.t.log.Panicf("too many equivocators for step %d: %d", e.Vote.R.Step, tracker.EquivocatorsCount)
}
// decrease their weight from any block proposal they already
@@ -227,7 +227,7 @@ func (tracker *voteTracker) handle(r routerHandle, p player, e0 event) event {
}
}
- prop, overAfter := tracker.overThreshold(proto, e.Vote.R.Step)
+ prop, overAfter := tracker.overThreshold(proto, e.Vote.R.Step, r.t.log)
if overBefore || !overAfter {
return res
@@ -265,7 +265,7 @@ func (tracker *voteTracker) handle(r routerHandle, p player, e0 event) event {
PreviousProposalHash1: eqVote.Proposals[0].BlockDigest.String(),
PreviousProposalHash2: eqVote.Proposals[1].BlockDigest.String(),
}
- logging.Base().EventWithDetails(telemetryspec.ApplicationState, telemetryspec.EquivocatedVoteEvent, equivocationDetails)
+ r.t.log.EventWithDetails(telemetryspec.ApplicationState, telemetryspec.EquivocatedVoteEvent, equivocationDetails)
return filteredStepEvent{T: voteFilteredStep}
}
@@ -289,18 +289,18 @@ func (tracker *voteTracker) handle(r routerHandle, p player, e0 event) event {
return dumpVotesEvent{Votes: votes}
default:
- logging.Base().Panicf("voteTracker: bad event type: observed an event of type %v", e0.t())
+ r.t.log.Panicf("voteTracker: bad event type: observed an event of type %v", e0.t())
panic("not reached")
}
}
// overThreshold returns an arbitrary proposal over the step threshold or
// (_, false) if none exists.
-func (tracker *voteTracker) overThreshold(proto config.ConsensusParams, step step) (res proposalValue, ok bool) {
+func (tracker *voteTracker) overThreshold(proto config.ConsensusParams, step step, log serviceLogger) (res proposalValue, ok bool) {
for proposal := range tracker.Counts {
if step.reachesQuorum(proto, tracker.count(proposal)) {
if ok {
- logging.Base().Panicf("voteTracker: more than value reached a threhsold in a given step: %v; %v", res, proposal)
+ log.Panicf("voteTracker: more than value reached a threhsold in a given step: %v; %v", res, proposal)
}
res = proposal
ok = true
diff --git a/ledger/acctupdates_test.go b/ledger/acctupdates_test.go
index 5f014985c..f1434b02f 100644
--- a/ledger/acctupdates_test.go
+++ b/ledger/acctupdates_test.go
@@ -1734,7 +1734,7 @@ func TestAcctUpdatesLookupRetry(t *testing.T) {
sinkdata.Status = basics.NotParticipating
accts[0][testSinkAddr] = sinkdata
- ml := makeMockLedgerForTracker(t, false, 10, testProtocolVersion, accts)
+ ml := makeMockLedgerForTracker(t, true, 10, testProtocolVersion, accts)
defer ml.Close()
conf := config.GetDefaultLocal()
@@ -1833,19 +1833,25 @@ func TestAcctUpdatesLookupRetry(t *testing.T) {
break
}
- // release the postCommit lock, once au.lookupWithoutRewards hits au.accountsReadCond.Wait()
+ defer func() { // allow the postCommitUnlocked() handler to go through, even if test fails
+ <-stallingTracker.postCommitUnlockedEntryLock
+ stallingTracker.postCommitUnlockedReleaseLock <- struct{}{}
+ }()
+
+ // issue a LookupWithoutRewards while persistedData.round != au.cachedDBRound
+ // when synchronized=false it will fail fast
+ d, validThrough, err := au.lookupWithoutRewards(rnd, addr, false)
+ require.Equal(t, err, &MismatchingDatabaseRoundError{databaseRound: 2, memoryRound: 1})
+
+ // release the postCommit lock, once au.lookupWithoutRewards() hits au.accountsReadCond.Wait()
go func() {
time.Sleep(200 * time.Millisecond)
stallingTracker.postCommitReleaseLock <- struct{}{}
}()
- // issue a LookupWithoutRewards while persistedData.round != au.cachedDBRound
- d, validThrough, err := au.LookupWithoutRewards(rnd, addr)
+ // when synchronized=true it will wait until above goroutine releases postCommitReleaseLock
+ d, validThrough, err = au.lookupWithoutRewards(rnd, addr, true)
require.NoError(t, err)
require.Equal(t, d, data)
require.GreaterOrEqualf(t, uint64(validThrough), uint64(rnd), "validThrough: %v rnd :%v", validThrough, rnd)
-
- // allow the postCommitUnlocked() handler to go through
- <-stallingTracker.postCommitUnlockedEntryLock
- stallingTracker.postCommitUnlockedReleaseLock <- struct{}{}
}
diff --git a/network/wsNetwork.go b/network/wsNetwork.go
index 7768addf2..8bde3040d 100644
--- a/network/wsNetwork.go
+++ b/network/wsNetwork.go
@@ -125,6 +125,12 @@ var peers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peers", De
var incomingPeers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_incoming_peers", Description: "Number of active incoming peers."})
var outgoingPeers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_outgoing_peers", Description: "Number of active outgoing peers."})
+// peerDisconnectionAckDuration defines the time we would wait for the peer disconnection to compelete.
+const peerDisconnectionAckDuration time.Duration = 5 * time.Second
+
+// peerShutdownDisconnectionAckDuration defines the time we would wait for the peer disconnection to compelete during shutdown.
+const peerShutdownDisconnectionAckDuration time.Duration = 50 * time.Millisecond
+
// Peer opaque interface for referring to a neighbor in the network
type Peer interface{}
@@ -542,13 +548,13 @@ func (wn *WebsocketNetwork) disconnect(badnode Peer, reason disconnectReason) {
return
}
peer := badnode.(*wsPeer)
- peer.CloseAndWait()
+ peer.CloseAndWait(time.Now().Add(peerDisconnectionAckDuration))
wn.removePeer(peer, reason)
}
-func closeWaiter(wg *sync.WaitGroup, peer *wsPeer) {
+func closeWaiter(wg *sync.WaitGroup, peer *wsPeer, deadline time.Time) {
defer wg.Done()
- peer.CloseAndWait()
+ peer.CloseAndWait(deadline)
}
// DisconnectPeers shuts down all connections
@@ -557,8 +563,9 @@ func (wn *WebsocketNetwork) DisconnectPeers() {
defer wn.peersLock.Unlock()
closeGroup := sync.WaitGroup{}
closeGroup.Add(len(wn.peers))
+ deadline := time.Now().Add(peerDisconnectionAckDuration)
for _, peer := range wn.peers {
- go closeWaiter(&closeGroup, peer)
+ go closeWaiter(&closeGroup, peer, deadline)
}
wn.peers = wn.peers[:0]
closeGroup.Wait()
@@ -812,8 +819,12 @@ func (wn *WebsocketNetwork) innerStop() {
wn.peersLock.Lock()
defer wn.peersLock.Unlock()
wn.wg.Add(len(wn.peers))
+ // this method is called only during node shutdown. In this case, we want to send the
+ // shutdown message, but we don't want to wait for a long time - since we might not be lucky
+ // to get a response.
+ deadline := time.Now().Add(peerShutdownDisconnectionAckDuration)
for _, peer := range wn.peers {
- go closeWaiter(&wn.wg, peer)
+ go closeWaiter(&wn.wg, peer, deadline)
}
wn.peers = wn.peers[:0]
}
diff --git a/network/wsPeer.go b/network/wsPeer.go
index e0d209c41..337dae07b 100644
--- a/network/wsPeer.go
+++ b/network/wsPeer.go
@@ -754,15 +754,15 @@ func (wp *wsPeer) internalClose(reason disconnectReason) {
if atomic.CompareAndSwapInt32(&wp.didSignalClose, 0, 1) {
wp.net.peerRemoteClose(wp, reason)
}
- wp.Close()
+ wp.Close(time.Now().Add(peerDisconnectionAckDuration))
}
// called either here or from above enclosing node logic
-func (wp *wsPeer) Close() {
+func (wp *wsPeer) Close(deadline time.Time) {
atomic.StoreInt32(&wp.didSignalClose, 1)
if atomic.CompareAndSwapInt32(&wp.didInnerClose, 0, 1) {
close(wp.closing)
- err := wp.conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add(5*time.Second))
+ err := wp.conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), deadline)
if err != nil {
wp.net.log.Infof("failed to write CloseMessage to connection for %s", wp.conn.RemoteAddr().String())
}
@@ -774,8 +774,8 @@ func (wp *wsPeer) Close() {
}
// CloseAndWait internally calls Close() then waits for all peer activity to stop
-func (wp *wsPeer) CloseAndWait() {
- wp.Close()
+func (wp *wsPeer) CloseAndWait(deadline time.Time) {
+ wp.Close(deadline)
wp.wg.Wait()
}