diff options
author | Pavel Zbitskiy <65323360+algorandskiy@users.noreply.github.com> | 2023-12-21 13:19:27 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-21 13:19:27 -0500 |
commit | 55cbb7f6d93acbe5040e4530aaabb87aec729f13 (patch) | |
tree | f676b0716f00b13924fbc9d83f9c52a971623eb1 | |
parent | a4fcdfa0d216fe9b0a60d57acc76e92131cfbfbd (diff) |
txHandler: kick in ARL at 1/2 of a base backlog capacity (#5873)
-rw-r--r-- | data/txHandler.go | 19 | ||||
-rw-r--r-- | data/txHandler_test.go | 12 |
2 files changed, 17 insertions, 14 deletions
diff --git a/data/txHandler.go b/data/txHandler.go index 9e6e1094c..58febd1d7 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -131,6 +131,7 @@ type TxHandler struct { streamVerifierDropped chan *verify.UnverifiedTxnSigJob erl *util.ElasticRateLimiter appLimiter *appRateLimiter + appLimiterBacklogThreshold int } // TxHandlerOpts is TxHandler configuration options @@ -203,6 +204,8 @@ func MakeTxHandler(opts TxHandlerOpts) (*TxHandler, error) { uint64(opts.Config.TxBacklogAppTxPerSecondRate), time.Duration(opts.Config.TxBacklogServiceRateWindowSeconds)*time.Second, ) + // set appLimiter triggering threshold at 50% of the base backlog size + handler.appLimiterBacklogThreshold = int(float64(opts.Config.TxBacklogSize) * float64(opts.Config.TxBacklogRateLimitingCongestionPct) / 100) } } @@ -596,11 +599,8 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net var err error var capguard *util.ErlCapacityGuard - var congested bool - if handler.erl != nil || handler.appLimiter != nil { - congested = float64(cap(handler.backlogQueue))*handler.backlogCongestionThreshold < float64(len(handler.backlogQueue)) - } if handler.erl != nil { + congestedERL := float64(cap(handler.backlogQueue))*handler.backlogCongestionThreshold < float64(len(handler.backlogQueue)) // consume a capacity unit // if the elastic rate limiter cannot vend a capacity, the error it returns // is sufficient to indicate that we should enable Congestion Control, because @@ -613,7 +613,7 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net return network.OutgoingMessage{Action: network.Ignore} } // if the backlog Queue has 50% of its buffer back, turn congestion control off - if !congested { + if !congestedERL { handler.erl.DisableCongestionControl() } } @@ -663,9 +663,12 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net } // rate limit per application in a group. Limiting any app in a group drops the entire message. - if handler.appLimiter != nil && congested && handler.appLimiter.shouldDrop(unverifiedTxGroup, rawmsg.Sender.(network.IPAddressable).RoutingAddr()) { - transactionMessagesAppLimiterDrop.Inc(nil) - return network.OutgoingMessage{Action: network.Ignore} + if handler.appLimiter != nil { + congestedARL := len(handler.backlogQueue) > handler.appLimiterBacklogThreshold + if congestedARL && handler.appLimiter.shouldDrop(unverifiedTxGroup, rawmsg.Sender.(network.IPAddressable).RoutingAddr()) { + transactionMessagesAppLimiterDrop.Inc(nil) + return network.OutgoingMessage{Action: network.Ignore} + } } select { diff --git a/data/txHandler_test.go b/data/txHandler_test.go index 894fef9d4..74282b5c2 100644 --- a/data/txHandler_test.go +++ b/data/txHandler_test.go @@ -2522,9 +2522,7 @@ func TestTxHandlerAppRateLimiterERLEnabled(t *testing.T) { cfg.TxBacklogAppTxRateLimiterMaxSize = 100 cfg.TxBacklogServiceRateWindowSeconds = 1 cfg.TxBacklogAppTxPerSecondRate = 3 - cfg.TxBacklogReservedCapacityPerPeer = 2 - cfg.TxBacklogSize = 1 - cfg.IncomingConnectionsLimit = 1 + cfg.TxBacklogSize = 3 ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, bookkeeping.GenesisBalances{}, genesisID, genesisHash, nil, cfg) require.NoError(t, err) defer ledger.Close() @@ -2585,7 +2583,9 @@ func TestTxHandlerAppRateLimiterERLEnabled(t *testing.T) { sender := mockSender{} // submit and ensure it is accepted - congested := float64(cap(handler.backlogQueue))*0.5 < float64(len(handler.backlogQueue)) + pct := float64(cfg.TxBacklogRateLimitingCongestionPct) / 100 + limit := int(float64(cfg.TxBacklogSize) * pct) + congested := len(handler.backlogQueue) > limit require.False(t, congested) action := handler.processIncomingTxn(network.IncomingMessage{Data: blob, Sender: sender}) @@ -2593,7 +2593,7 @@ func TestTxHandlerAppRateLimiterERLEnabled(t *testing.T) { require.Equal(t, 1, len(handler.backlogQueue)) // repeat the same txn, we are still not congested - congested = float64(cap(handler.backlogQueue))*0.5 < float64(len(handler.backlogQueue)) + congested = len(handler.backlogQueue) > limit require.False(t, congested) signedTx = tx.Sign(keypair()) @@ -2603,7 +2603,7 @@ func TestTxHandlerAppRateLimiterERLEnabled(t *testing.T) { require.Equal(t, 2, len(handler.backlogQueue)) require.Equal(t, 0, handler.appLimiter.len()) // no rate limiting yet - congested = float64(cap(handler.backlogQueue))*0.5 < float64(len(handler.backlogQueue)) + congested = len(handler.backlogQueue) > limit require.True(t, congested) // submit it again and the app rate limiter should kick in |