summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavel Zbitskiy <65323360+algorandskiy@users.noreply.github.com>2023-12-21 13:19:27 -0500
committerGitHub <noreply@github.com>2023-12-21 13:19:27 -0500
commit55cbb7f6d93acbe5040e4530aaabb87aec729f13 (patch)
treef676b0716f00b13924fbc9d83f9c52a971623eb1
parenta4fcdfa0d216fe9b0a60d57acc76e92131cfbfbd (diff)
txHandler: kick in ARL at 1/2 of a base backlog capacity (#5873)
-rw-r--r--data/txHandler.go19
-rw-r--r--data/txHandler_test.go12
2 files changed, 17 insertions, 14 deletions
diff --git a/data/txHandler.go b/data/txHandler.go
index 9e6e1094c..58febd1d7 100644
--- a/data/txHandler.go
+++ b/data/txHandler.go
@@ -131,6 +131,7 @@ type TxHandler struct {
streamVerifierDropped chan *verify.UnverifiedTxnSigJob
erl *util.ElasticRateLimiter
appLimiter *appRateLimiter
+ appLimiterBacklogThreshold int
}
// TxHandlerOpts is TxHandler configuration options
@@ -203,6 +204,8 @@ func MakeTxHandler(opts TxHandlerOpts) (*TxHandler, error) {
uint64(opts.Config.TxBacklogAppTxPerSecondRate),
time.Duration(opts.Config.TxBacklogServiceRateWindowSeconds)*time.Second,
)
+ // set appLimiter triggering threshold at 50% of the base backlog size
+ handler.appLimiterBacklogThreshold = int(float64(opts.Config.TxBacklogSize) * float64(opts.Config.TxBacklogRateLimitingCongestionPct) / 100)
}
}
@@ -596,11 +599,8 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net
var err error
var capguard *util.ErlCapacityGuard
- var congested bool
- if handler.erl != nil || handler.appLimiter != nil {
- congested = float64(cap(handler.backlogQueue))*handler.backlogCongestionThreshold < float64(len(handler.backlogQueue))
- }
if handler.erl != nil {
+ congestedERL := float64(cap(handler.backlogQueue))*handler.backlogCongestionThreshold < float64(len(handler.backlogQueue))
// consume a capacity unit
// if the elastic rate limiter cannot vend a capacity, the error it returns
// is sufficient to indicate that we should enable Congestion Control, because
@@ -613,7 +613,7 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net
return network.OutgoingMessage{Action: network.Ignore}
}
// if the backlog Queue has 50% of its buffer back, turn congestion control off
- if !congested {
+ if !congestedERL {
handler.erl.DisableCongestionControl()
}
}
@@ -663,9 +663,12 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net
}
// rate limit per application in a group. Limiting any app in a group drops the entire message.
- if handler.appLimiter != nil && congested && handler.appLimiter.shouldDrop(unverifiedTxGroup, rawmsg.Sender.(network.IPAddressable).RoutingAddr()) {
- transactionMessagesAppLimiterDrop.Inc(nil)
- return network.OutgoingMessage{Action: network.Ignore}
+ if handler.appLimiter != nil {
+ congestedARL := len(handler.backlogQueue) > handler.appLimiterBacklogThreshold
+ if congestedARL && handler.appLimiter.shouldDrop(unverifiedTxGroup, rawmsg.Sender.(network.IPAddressable).RoutingAddr()) {
+ transactionMessagesAppLimiterDrop.Inc(nil)
+ return network.OutgoingMessage{Action: network.Ignore}
+ }
}
select {
diff --git a/data/txHandler_test.go b/data/txHandler_test.go
index 894fef9d4..74282b5c2 100644
--- a/data/txHandler_test.go
+++ b/data/txHandler_test.go
@@ -2522,9 +2522,7 @@ func TestTxHandlerAppRateLimiterERLEnabled(t *testing.T) {
cfg.TxBacklogAppTxRateLimiterMaxSize = 100
cfg.TxBacklogServiceRateWindowSeconds = 1
cfg.TxBacklogAppTxPerSecondRate = 3
- cfg.TxBacklogReservedCapacityPerPeer = 2
- cfg.TxBacklogSize = 1
- cfg.IncomingConnectionsLimit = 1
+ cfg.TxBacklogSize = 3
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, bookkeeping.GenesisBalances{}, genesisID, genesisHash, nil, cfg)
require.NoError(t, err)
defer ledger.Close()
@@ -2585,7 +2583,9 @@ func TestTxHandlerAppRateLimiterERLEnabled(t *testing.T) {
sender := mockSender{}
// submit and ensure it is accepted
- congested := float64(cap(handler.backlogQueue))*0.5 < float64(len(handler.backlogQueue))
+ pct := float64(cfg.TxBacklogRateLimitingCongestionPct) / 100
+ limit := int(float64(cfg.TxBacklogSize) * pct)
+ congested := len(handler.backlogQueue) > limit
require.False(t, congested)
action := handler.processIncomingTxn(network.IncomingMessage{Data: blob, Sender: sender})
@@ -2593,7 +2593,7 @@ func TestTxHandlerAppRateLimiterERLEnabled(t *testing.T) {
require.Equal(t, 1, len(handler.backlogQueue))
// repeat the same txn, we are still not congested
- congested = float64(cap(handler.backlogQueue))*0.5 < float64(len(handler.backlogQueue))
+ congested = len(handler.backlogQueue) > limit
require.False(t, congested)
signedTx = tx.Sign(keypair())
@@ -2603,7 +2603,7 @@ func TestTxHandlerAppRateLimiterERLEnabled(t *testing.T) {
require.Equal(t, 2, len(handler.backlogQueue))
require.Equal(t, 0, handler.appLimiter.len()) // no rate limiting yet
- congested = float64(cap(handler.backlogQueue))*0.5 < float64(len(handler.backlogQueue))
+ congested = len(handler.backlogQueue) > limit
require.True(t, congested)
// submit it again and the app rate limiter should kick in