diff options
author | John Jannotti <jj@cs.brown.edu> | 2022-01-17 22:07:58 -0500 |
---|---|---|
committer | John Jannotti <jj@cs.brown.edu> | 2022-01-17 22:07:58 -0500 |
commit | 63750bcb73b1af7ed5e5f8f1a5cb127a1e457e88 (patch) | |
tree | 3426f347e1702eb25cad77505436bc1b8b3ff934 | |
parent | a106e8366ac37b49cd9ad48f7423ab0ecc867422 (diff) | |
parent | 4006ce219ebcebbb65c621b9f04362b45b256065 (diff) |
Merge branch 'master' into feature/contract-to-contract
-rw-r--r-- | agreement/demux.go | 2 | ||||
-rw-r--r-- | agreement/listener.go | 12 | ||||
-rw-r--r-- | agreement/persistence.go | 24 | ||||
-rw-r--r-- | agreement/persistence_test.go | 7 | ||||
-rw-r--r-- | agreement/proposalManager.go | 4 | ||||
-rw-r--r-- | agreement/proposalStore.go | 6 | ||||
-rw-r--r-- | agreement/proposalTracker.go | 3 | ||||
-rw-r--r-- | agreement/pseudonode.go | 1 | ||||
-rw-r--r-- | agreement/service.go | 4 | ||||
-rw-r--r-- | agreement/trace.go | 5 | ||||
-rw-r--r-- | agreement/voteAggregator.go | 7 | ||||
-rw-r--r-- | agreement/voteAuxiliary.go | 8 | ||||
-rw-r--r-- | agreement/voteTracker.go | 22 | ||||
-rw-r--r-- | ledger/acctupdates_test.go | 22 | ||||
-rw-r--r-- | network/wsNetwork.go | 21 | ||||
-rw-r--r-- | network/wsPeer.go | 10 |
16 files changed, 82 insertions, 76 deletions
diff --git a/agreement/demux.go b/agreement/demux.go index fe48f604b..65a5d06a0 100644 --- a/agreement/demux.go +++ b/agreement/demux.go @@ -122,7 +122,7 @@ func (d *demux) tokenizeMessages(ctx context.Context, net Network, tag protocol. o, err := tokenize(raw.Data) if err != nil { - logging.Base().Warnf("disconnecting from peer: error decoding message tagged %v: %v", tag, err) + d.log.Warnf("disconnecting from peer: error decoding message tagged %v: %v", tag, err) net.Disconnect(raw.MessageHandle) d.UpdateEventsQueue(eventQueueTokenizing[tag], 0) continue diff --git a/agreement/listener.go b/agreement/listener.go index 224e0c53e..b0dafaff7 100644 --- a/agreement/listener.go +++ b/agreement/listener.go @@ -16,10 +16,6 @@ package agreement -import ( - "github.com/algorand/go-algorand/logging" -) - // A listener is a state machine which can handle events, returning new events. type listener interface { // T returns the stateMachineTag describing the listener. @@ -60,17 +56,17 @@ func (l checkedListener) handle(r routerHandle, p player, in event) event { errs := l.pre(p, in) if len(errs) != 0 { for _, err := range errs { - logging.Base().Errorf("%v: precondition violated: %v", l.T(), err) + r.t.log.Errorf("%v: precondition violated: %v", l.T(), err) } - logging.Base().Panicf("%v: precondition violated: %v", l.T(), errs[0]) + r.t.log.Panicf("%v: precondition violated: %v", l.T(), errs[0]) } out := l.listener.handle(r, p, in) errs = l.post(p, in, out) if len(errs) != 0 { for _, err := range errs { - logging.Base().Errorf("%v: postcondition violated: %v", l.T(), err) + r.t.log.Errorf("%v: postcondition violated: %v", l.T(), err) } - logging.Base().Panicf("%v: postcondition violated: %v", l.T(), errs[0]) + r.t.log.Panicf("%v: postcondition violated: %v", l.T(), errs[0]) } return out } diff --git a/agreement/persistence.go b/agreement/persistence.go index b11fd4f0a..d92abbd1e 100644 --- a/agreement/persistence.go +++ b/agreement/persistence.go @@ -82,7 +82,7 @@ func persist(log serviceLogger, crash db.Accessor, Round basics.Round, Period pe return } - logging.Base().Errorf("persisting failure: %v", err) + log.Errorf("persisting failure: %v", err) return } @@ -90,7 +90,7 @@ func persist(log serviceLogger, crash db.Accessor, Round basics.Round, Period pe // // In case it's unable to clear the Service table, an error would get logged. func reset(log logging.Logger, crash db.Accessor) { - logging.Base().Infof("reset (agreement): resetting crash state") + log.Infof("reset (agreement): resetting crash state") err := crash.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { // we could not retrieve our state, so wipe it @@ -99,7 +99,7 @@ func reset(log logging.Logger, crash db.Accessor) { }) if err != nil { - logging.Base().Warnf("reset (agreement): failed to clear Service table - %v", err) + log.Warnf("reset (agreement): failed to clear Service table - %v", err) } } @@ -124,7 +124,7 @@ func restore(log logging.Logger, crash db.Accessor) (raw []byte, err error) { if err == nil { // the above call was completed sucecssfully, which means that we've just created the table ( which wasn't there ! ). // in that case, the table is guaranteed to be empty, and therefore we can return right here. - logging.Base().Infof("restore (agreement): crash state table initialized") + log.Infof("restore (agreement): crash state table initialized") err = errNoCrashStateAvailable return } @@ -135,7 +135,7 @@ func restore(log logging.Logger, crash db.Accessor) (raw []byte, err error) { if !reset { return } - logging.Base().Infof("restore (agreement): resetting crash state") + log.Infof("restore (agreement): resetting crash state") // we could not retrieve our state, so wipe it _, err = tx.Exec("delete from Service") @@ -149,12 +149,12 @@ func restore(log logging.Logger, crash db.Accessor) (raw []byte, err error) { row := tx.QueryRow("select count(*) from Service") err := row.Scan(&nrows) if err != nil { - logging.Base().Errorf("restore (agreement): could not query raw state: %v", err) + log.Errorf("restore (agreement): could not query raw state: %v", err) reset = true return err } if nrows != 1 { - logging.Base().Infof("restore (agreement): crash state not found (n = %d)", nrows) + log.Infof("restore (agreement): crash state not found (n = %d)", nrows) reset = true noCrashState = true // this is a normal case (we have leftover crash state from an old round) return errNoCrashStateAvailable @@ -163,7 +163,7 @@ func restore(log logging.Logger, crash db.Accessor) (raw []byte, err error) { row = tx.QueryRow("select data from Service") err = row.Scan(&raw) if err != nil { - logging.Base().Errorf("restore (agreement): could not read crash state raw data: %v", err) + log.Errorf("restore (agreement): could not read crash state raw data: %v", err) reset = true return err } @@ -176,7 +176,7 @@ func restore(log logging.Logger, crash db.Accessor) (raw []byte, err error) { // decode process the incoming raw bytes array and attempt to reconstruct the agreement state objects. // // In all decoding errors, it returns the error code in err -func decode(raw []byte, t0 timers.Clock) (t timers.Clock, rr rootRouter, p player, a []action, err error) { +func decode(raw []byte, t0 timers.Clock, log serviceLogger) (t timers.Clock, rr rootRouter, p player, a []action, err error) { var t2 timers.Clock var rr2 rootRouter var p2 player @@ -185,7 +185,7 @@ func decode(raw []byte, t0 timers.Clock) (t timers.Clock, rr rootRouter, p playe err = protocol.DecodeReflect(raw, &s) if err != nil { - logging.Base().Errorf("decode (agreement): error decoding retrieved state (len = %v): %v", len(raw), err) + log.Errorf("decode (agreement): error decoding retrieved state (len = %v): %v", len(raw), err) return } @@ -307,9 +307,9 @@ func (p *asyncPersistenceLoop) loop(ctx context.Context) { // sanity check; we check it after the fact, since it's not expected to ever happen. // performance-wise, it takes approximitly 300000ns to execute, and we don't want it to // block the persist operation. - _, _, _, _, derr := decode(s.raw, s.clock) + _, _, _, _, derr := decode(s.raw, s.clock, p.log) if derr != nil { - logging.Base().Errorf("could not decode own encoded disk state: %v", derr) + p.log.Errorf("could not decode own encoded disk state: %v", derr) } } } diff --git a/agreement/persistence_test.go b/agreement/persistence_test.go index 79aabd675..94221f7be 100644 --- a/agreement/persistence_test.go +++ b/agreement/persistence_test.go @@ -43,7 +43,8 @@ func TestAgreementSerialization(t *testing.T) { encodedBytes := encode(clock, router, status, a) t0 := timers.MakeMonotonicClock(time.Date(2000, 0, 0, 0, 0, 0, 0, time.UTC)) - clock2, router2, status2, a2, err := decode(encodedBytes, t0) + log := makeServiceLogger(logging.Base()) + clock2, router2, status2, a2, err := decode(encodedBytes, t0, log) require.NoError(t, err) require.Equalf(t, clock, clock2, "Clock wasn't serialized/deserialized correctly") require.Equalf(t, router, router2, "Router wasn't serialized/deserialized correctly") @@ -77,10 +78,10 @@ func BenchmarkAgreementDeserialization(b *testing.B) { encodedBytes := encode(clock, router, status, a) t0 := timers.MakeMonotonicClock(time.Date(2000, 0, 0, 0, 0, 0, 0, time.UTC)) - + log := makeServiceLogger(logging.Base()) b.ResetTimer() for n := 0; n < b.N; n++ { - decode(encodedBytes, t0) + decode(encodedBytes, t0, log) } } diff --git a/agreement/proposalManager.go b/agreement/proposalManager.go index 9a6e14166..8cf03b32f 100644 --- a/agreement/proposalManager.go +++ b/agreement/proposalManager.go @@ -18,8 +18,6 @@ package agreement import ( "fmt" - - "github.com/algorand/go-algorand/logging" ) // A proposalManager is a proposalMachine which applies relay rules to incoming @@ -71,7 +69,7 @@ func (m *proposalManager) handle(r routerHandle, p player, e event) event { r = m.handleNewPeriod(r, p, e.(thresholdEvent)) return emptyEvent{} } - logging.Base().Panicf("proposalManager: bad event type: observed an event of type %v", e.t()) + r.t.log.Panicf("proposalManager: bad event type: observed an event of type %v", e.t()) panic("not reached") } diff --git a/agreement/proposalStore.go b/agreement/proposalStore.go index 7cc9a7643..973f909d0 100644 --- a/agreement/proposalStore.go +++ b/agreement/proposalStore.go @@ -18,8 +18,6 @@ package agreement import ( "fmt" - - "github.com/algorand/go-algorand/logging" ) // An blockAssembler contains the proposal data associated with some @@ -289,7 +287,7 @@ func (store *proposalStore) handle(r routerHandle, p player, e event) event { case newRound: if len(store.Assemblers) > 1 { // TODO this check is really an implementation invariant; move it into a whitebox test - logging.Base().Panic("too many assemblers") + r.t.log.Panic("too many assemblers") } for pv, ea := range store.Assemblers { if ea.Filled { @@ -347,7 +345,7 @@ func (store *proposalStore) handle(r routerHandle, p player, e event) event { se.Payload = ea.Payload return se } - logging.Base().Panicf("proposalStore: bad event type: observed an event of type %v", e.t()) + r.t.log.Panicf("proposalStore: bad event type: observed an event of type %v", e.t()) panic("not reached") } diff --git a/agreement/proposalTracker.go b/agreement/proposalTracker.go index 5b17bb10b..c76c5c9fd 100644 --- a/agreement/proposalTracker.go +++ b/agreement/proposalTracker.go @@ -20,7 +20,6 @@ import ( "fmt" "github.com/algorand/go-algorand/data/basics" - "github.com/algorand/go-algorand/logging" ) // A proposalSeeker finds the vote with the lowest credential until freeze() is @@ -180,7 +179,7 @@ func (t *proposalTracker) handle(r routerHandle, p player, e event) event { return se } - logging.Base().Panicf("proposalTracker: bad event type: observed an event of type %v", e.t()) + r.t.log.Panicf("proposalTracker: bad event type: observed an event of type %v", e.t()) panic("not reached") } diff --git a/agreement/pseudonode.go b/agreement/pseudonode.go index 12765970c..f645ca362 100644 --- a/agreement/pseudonode.go +++ b/agreement/pseudonode.go @@ -481,7 +481,6 @@ func (t pseudonodeProposalsTask) execute(verifier *AsyncVoteVerifier, quit chan payloads, votes := t.node.makeProposals(t.round, t.period, t.participation) fields := logging.Fields{ - "Context": "Agreement", "Type": logspec.ProposalAssembled.String(), "ObjectRound": t.round, "ObjectPeriod": t.period, diff --git a/agreement/service.go b/agreement/service.go index 5e1fac3da..346234950 100644 --- a/agreement/service.go +++ b/agreement/service.go @@ -93,7 +93,7 @@ func MakeService(p Parameters) *Service { s.parameters = parameters(p) - s.log = serviceLogger{Logger: p.Logger} + s.log = makeServiceLogger(p.Logger) // GOAL2-541: tracer is not concurrency safe. It should only ever be // accessed by main state machine loop. @@ -191,7 +191,7 @@ func (s *Service) mainLoop(input <-chan externalEvent, output chan<- []action, r var err error raw, err := restore(s.log, s.Accessor) if err == nil { - clock, router, status, a, err = decode(raw, s.Clock) + clock, router, status, a, err = decode(raw, s.Clock, s.log) if err != nil { reset(s.log, s.Accessor) } else { diff --git a/agreement/trace.go b/agreement/trace.go index 70dae3798..a3f8cea66 100644 --- a/agreement/trace.go +++ b/agreement/trace.go @@ -497,9 +497,12 @@ type serviceLogger struct { logging.Logger } +func makeServiceLogger(log logging.Logger) serviceLogger { + return serviceLogger{log.With("Context", "Agreement")} +} + func (log serviceLogger) with(e logspec.AgreementEvent) serviceLogger { fields := logging.Fields{ - "Context": "Agreement", "Type": e.Type.String(), "Round": e.Round, "Period": e.Period, diff --git a/agreement/voteAggregator.go b/agreement/voteAggregator.go index 95b378c0d..057f9d14f 100644 --- a/agreement/voteAggregator.go +++ b/agreement/voteAggregator.go @@ -19,7 +19,6 @@ package agreement import ( "fmt" - "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/protocol" ) @@ -118,7 +117,7 @@ func (agg *voteAggregator) handle(r routerHandle, pr player, em event) (res even } else if tE.(thresholdEvent).Round == e.FreshnessData.PlayerRound+1 { return emptyEvent{} } - logging.Base().Panicf("bad round (%v, %v)", tE.(thresholdEvent).Round, e.FreshnessData.PlayerRound) // TODO this should be a postcondition check; move it + r.t.log.Panicf("bad round (%v, %v)", tE.(thresholdEvent).Round, e.FreshnessData.PlayerRound) // TODO this should be a postcondition check; move it case bundlePresent: ub := e.Input.UnauthenticatedBundle @@ -180,7 +179,7 @@ func (agg *voteAggregator) handle(r routerHandle, pr player, em event) (res even smErr := makeSerErrf("bundle for (%v, %v, %v: %v) failed to cause a significant state change", b.U.Round, b.U.Period, b.U.Step, b.U.Proposal) return filteredEvent{T: bundleFiltered, Err: smErr} } - logging.Base().Panicf("voteAggregator: bad event type: observed an event of type %v", e.t()) + r.t.log.Panicf("voteAggregator: bad event type: observed an event of type %v", e.t()) panic("not reached") } @@ -200,7 +199,7 @@ func (agg *voteAggregator) filterVote(proto protocol.ConsensusVersion, p player, case none: return nil } - logging.Base().Panicf("voteAggregator: bad event type: while filtering, observed an event of type %v", filterRes.t()) + r.t.log.Panicf("voteAggregator: bad event type: while filtering, observed an event of type %v", filterRes.t()) panic("not reached") } diff --git a/agreement/voteAuxiliary.go b/agreement/voteAuxiliary.go index f419feebc..0c6e85c47 100644 --- a/agreement/voteAuxiliary.go +++ b/agreement/voteAuxiliary.go @@ -16,10 +16,6 @@ package agreement -import ( - "github.com/algorand/go-algorand/logging" -) - // A voteTrackerPeriod is a voteMachinePeriod which indicates whether a // next-threshold of votes was observed for a some value in a period. type voteTrackerPeriod struct { @@ -82,7 +78,7 @@ func (t *voteTrackerPeriod) handle(r routerHandle, p player, e event) event { case nextThresholdStatusRequest: return t.Cached default: - logging.Base().Panicf("voteTrackerPeriod: bad event type: observed an event of type %v", e.t()) + r.t.log.Panicf("voteTrackerPeriod: bad event type: observed an event of type %v", e.t()) panic("not reached") } } @@ -152,7 +148,7 @@ func (t *voteTrackerRound) handle(r routerHandle, p player, e event) event { case freshestBundleRequest: return freshestBundleEvent{Ok: t.Ok, Event: t.Freshest} default: - logging.Base().Panicf("voteTrackerRound: bad event type: observed an event of type %v", e.t()) + r.t.log.Panicf("voteTrackerRound: bad event type: observed an event of type %v", e.t()) panic("not reached") } } diff --git a/agreement/voteTracker.go b/agreement/voteTracker.go index df95f4223..d0f717abd 100644 --- a/agreement/voteTracker.go +++ b/agreement/voteTracker.go @@ -23,7 +23,6 @@ import ( "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/crypto" "github.com/algorand/go-algorand/data/basics" - "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/logging/telemetryspec" ) @@ -129,12 +128,12 @@ func (tracker *voteTracker) handle(r routerHandle, p player, e0 event) event { PreviousProposalHash1: eqVote.Proposals[0].BlockDigest.String(), PreviousProposalHash2: eqVote.Proposals[1].BlockDigest.String(), } - logging.Base().EventWithDetails(telemetryspec.ApplicationState, telemetryspec.EquivocatedVoteEvent, equivocationDetails) + r.t.log.EventWithDetails(telemetryspec.ApplicationState, telemetryspec.EquivocatedVoteEvent, equivocationDetails) return thresholdEvent{} } - _, overBefore := tracker.overThreshold(proto, e.Vote.R.Step) + _, overBefore := tracker.overThreshold(proto, e.Vote.R.Step, r.t.log) oldVote, voted := tracker.Voters[sender] @@ -170,9 +169,10 @@ func (tracker *voteTracker) handle(r routerHandle, p player, e0 event) event { Weight: e.Vote.Cred.Weight, PreviousProposalHash1: oldVote.R.Proposal.BlockDigest.String(), } - logging.Base().EventWithDetails(telemetryspec.ApplicationState, telemetryspec.EquivocatedVoteEvent, equivocationDetails) - logging.Base().Warnf("voteTracker: observed an equivocator: %v (vote was %v)", sender, e.Vote) + r.t.log.EventWithDetails(telemetryspec.ApplicationState, telemetryspec.EquivocatedVoteEvent, equivocationDetails) + + r.t.log.Warnf("voteTracker: observed an equivocator: %v (vote was %v)", sender, e.Vote) // sender was not already marked as an equivocator so track // their weight @@ -183,7 +183,7 @@ func (tracker *voteTracker) handle(r routerHandle, p player, e0 event) event { // In order for this to be triggered, more than 75% of the vote for the given step need to vote for more than // a single proposal. In that state, all the proposals become "above threshold". That's a serious issue, since // it would compromise the honest node core assumption. - logging.Base().Panicf("too many equivocators for step %d: %d", e.Vote.R.Step, tracker.EquivocatorsCount) + r.t.log.Panicf("too many equivocators for step %d: %d", e.Vote.R.Step, tracker.EquivocatorsCount) } // decrease their weight from any block proposal they already @@ -227,7 +227,7 @@ func (tracker *voteTracker) handle(r routerHandle, p player, e0 event) event { } } - prop, overAfter := tracker.overThreshold(proto, e.Vote.R.Step) + prop, overAfter := tracker.overThreshold(proto, e.Vote.R.Step, r.t.log) if overBefore || !overAfter { return res @@ -265,7 +265,7 @@ func (tracker *voteTracker) handle(r routerHandle, p player, e0 event) event { PreviousProposalHash1: eqVote.Proposals[0].BlockDigest.String(), PreviousProposalHash2: eqVote.Proposals[1].BlockDigest.String(), } - logging.Base().EventWithDetails(telemetryspec.ApplicationState, telemetryspec.EquivocatedVoteEvent, equivocationDetails) + r.t.log.EventWithDetails(telemetryspec.ApplicationState, telemetryspec.EquivocatedVoteEvent, equivocationDetails) return filteredStepEvent{T: voteFilteredStep} } @@ -289,18 +289,18 @@ func (tracker *voteTracker) handle(r routerHandle, p player, e0 event) event { return dumpVotesEvent{Votes: votes} default: - logging.Base().Panicf("voteTracker: bad event type: observed an event of type %v", e0.t()) + r.t.log.Panicf("voteTracker: bad event type: observed an event of type %v", e0.t()) panic("not reached") } } // overThreshold returns an arbitrary proposal over the step threshold or // (_, false) if none exists. -func (tracker *voteTracker) overThreshold(proto config.ConsensusParams, step step) (res proposalValue, ok bool) { +func (tracker *voteTracker) overThreshold(proto config.ConsensusParams, step step, log serviceLogger) (res proposalValue, ok bool) { for proposal := range tracker.Counts { if step.reachesQuorum(proto, tracker.count(proposal)) { if ok { - logging.Base().Panicf("voteTracker: more than value reached a threhsold in a given step: %v; %v", res, proposal) + log.Panicf("voteTracker: more than value reached a threhsold in a given step: %v; %v", res, proposal) } res = proposal ok = true diff --git a/ledger/acctupdates_test.go b/ledger/acctupdates_test.go index 5f014985c..f1434b02f 100644 --- a/ledger/acctupdates_test.go +++ b/ledger/acctupdates_test.go @@ -1734,7 +1734,7 @@ func TestAcctUpdatesLookupRetry(t *testing.T) { sinkdata.Status = basics.NotParticipating accts[0][testSinkAddr] = sinkdata - ml := makeMockLedgerForTracker(t, false, 10, testProtocolVersion, accts) + ml := makeMockLedgerForTracker(t, true, 10, testProtocolVersion, accts) defer ml.Close() conf := config.GetDefaultLocal() @@ -1833,19 +1833,25 @@ func TestAcctUpdatesLookupRetry(t *testing.T) { break } - // release the postCommit lock, once au.lookupWithoutRewards hits au.accountsReadCond.Wait() + defer func() { // allow the postCommitUnlocked() handler to go through, even if test fails + <-stallingTracker.postCommitUnlockedEntryLock + stallingTracker.postCommitUnlockedReleaseLock <- struct{}{} + }() + + // issue a LookupWithoutRewards while persistedData.round != au.cachedDBRound + // when synchronized=false it will fail fast + d, validThrough, err := au.lookupWithoutRewards(rnd, addr, false) + require.Equal(t, err, &MismatchingDatabaseRoundError{databaseRound: 2, memoryRound: 1}) + + // release the postCommit lock, once au.lookupWithoutRewards() hits au.accountsReadCond.Wait() go func() { time.Sleep(200 * time.Millisecond) stallingTracker.postCommitReleaseLock <- struct{}{} }() - // issue a LookupWithoutRewards while persistedData.round != au.cachedDBRound - d, validThrough, err := au.LookupWithoutRewards(rnd, addr) + // when synchronized=true it will wait until above goroutine releases postCommitReleaseLock + d, validThrough, err = au.lookupWithoutRewards(rnd, addr, true) require.NoError(t, err) require.Equal(t, d, data) require.GreaterOrEqualf(t, uint64(validThrough), uint64(rnd), "validThrough: %v rnd :%v", validThrough, rnd) - - // allow the postCommitUnlocked() handler to go through - <-stallingTracker.postCommitUnlockedEntryLock - stallingTracker.postCommitUnlockedReleaseLock <- struct{}{} } diff --git a/network/wsNetwork.go b/network/wsNetwork.go index 7768addf2..8bde3040d 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -125,6 +125,12 @@ var peers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peers", De var incomingPeers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_incoming_peers", Description: "Number of active incoming peers."}) var outgoingPeers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_outgoing_peers", Description: "Number of active outgoing peers."}) +// peerDisconnectionAckDuration defines the time we would wait for the peer disconnection to compelete. +const peerDisconnectionAckDuration time.Duration = 5 * time.Second + +// peerShutdownDisconnectionAckDuration defines the time we would wait for the peer disconnection to compelete during shutdown. +const peerShutdownDisconnectionAckDuration time.Duration = 50 * time.Millisecond + // Peer opaque interface for referring to a neighbor in the network type Peer interface{} @@ -542,13 +548,13 @@ func (wn *WebsocketNetwork) disconnect(badnode Peer, reason disconnectReason) { return } peer := badnode.(*wsPeer) - peer.CloseAndWait() + peer.CloseAndWait(time.Now().Add(peerDisconnectionAckDuration)) wn.removePeer(peer, reason) } -func closeWaiter(wg *sync.WaitGroup, peer *wsPeer) { +func closeWaiter(wg *sync.WaitGroup, peer *wsPeer, deadline time.Time) { defer wg.Done() - peer.CloseAndWait() + peer.CloseAndWait(deadline) } // DisconnectPeers shuts down all connections @@ -557,8 +563,9 @@ func (wn *WebsocketNetwork) DisconnectPeers() { defer wn.peersLock.Unlock() closeGroup := sync.WaitGroup{} closeGroup.Add(len(wn.peers)) + deadline := time.Now().Add(peerDisconnectionAckDuration) for _, peer := range wn.peers { - go closeWaiter(&closeGroup, peer) + go closeWaiter(&closeGroup, peer, deadline) } wn.peers = wn.peers[:0] closeGroup.Wait() @@ -812,8 +819,12 @@ func (wn *WebsocketNetwork) innerStop() { wn.peersLock.Lock() defer wn.peersLock.Unlock() wn.wg.Add(len(wn.peers)) + // this method is called only during node shutdown. In this case, we want to send the + // shutdown message, but we don't want to wait for a long time - since we might not be lucky + // to get a response. + deadline := time.Now().Add(peerShutdownDisconnectionAckDuration) for _, peer := range wn.peers { - go closeWaiter(&wn.wg, peer) + go closeWaiter(&wn.wg, peer, deadline) } wn.peers = wn.peers[:0] } diff --git a/network/wsPeer.go b/network/wsPeer.go index e0d209c41..337dae07b 100644 --- a/network/wsPeer.go +++ b/network/wsPeer.go @@ -754,15 +754,15 @@ func (wp *wsPeer) internalClose(reason disconnectReason) { if atomic.CompareAndSwapInt32(&wp.didSignalClose, 0, 1) { wp.net.peerRemoteClose(wp, reason) } - wp.Close() + wp.Close(time.Now().Add(peerDisconnectionAckDuration)) } // called either here or from above enclosing node logic -func (wp *wsPeer) Close() { +func (wp *wsPeer) Close(deadline time.Time) { atomic.StoreInt32(&wp.didSignalClose, 1) if atomic.CompareAndSwapInt32(&wp.didInnerClose, 0, 1) { close(wp.closing) - err := wp.conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add(5*time.Second)) + err := wp.conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), deadline) if err != nil { wp.net.log.Infof("failed to write CloseMessage to connection for %s", wp.conn.RemoteAddr().String()) } @@ -774,8 +774,8 @@ func (wp *wsPeer) Close() { } // CloseAndWait internally calls Close() then waits for all peer activity to stop -func (wp *wsPeer) CloseAndWait() { - wp.Close() +func (wp *wsPeer) CloseAndWait(deadline time.Time) { + wp.Close(deadline) wp.wg.Wait() } |