diff options
Diffstat (limited to 'data/txHandler.go')
-rw-r--r-- | data/txHandler.go | 19 |
1 files changed, 12 insertions, 7 deletions
diff --git a/data/txHandler.go b/data/txHandler.go index 4689a497b..3d20e95ac 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -1,4 +1,4 @@ -// Copyright (C) 2019-2023 Algorand, Inc. +// Copyright (C) 2019-2024 Algorand, Inc. // This file is part of go-algorand // // go-algorand is free software: you can redistribute it and/or modify @@ -131,6 +131,7 @@ type TxHandler struct { streamVerifierDropped chan *verify.UnverifiedTxnSigJob erl *util.ElasticRateLimiter appLimiter *appRateLimiter + appLimiterBacklogThreshold int } // TxHandlerOpts is TxHandler configuration options @@ -203,6 +204,8 @@ func MakeTxHandler(opts TxHandlerOpts) (*TxHandler, error) { uint64(opts.Config.TxBacklogAppTxPerSecondRate), time.Duration(opts.Config.TxBacklogServiceRateWindowSeconds)*time.Second, ) + // set appLimiter triggering threshold at 50% of the base backlog size + handler.appLimiterBacklogThreshold = int(float64(opts.Config.TxBacklogSize) * float64(opts.Config.TxBacklogRateLimitingCongestionPct) / 100) } } @@ -596,9 +599,8 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net var err error var capguard *util.ErlCapacityGuard - var congested bool if handler.erl != nil { - congested = float64(cap(handler.backlogQueue))*handler.backlogCongestionThreshold < float64(len(handler.backlogQueue)) + congestedERL := float64(cap(handler.backlogQueue))*handler.backlogCongestionThreshold < float64(len(handler.backlogQueue)) // consume a capacity unit // if the elastic rate limiter cannot vend a capacity, the error it returns // is sufficient to indicate that we should enable Congestion Control, because @@ -611,7 +613,7 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net return network.OutgoingMessage{Action: network.Ignore} } // if the backlog Queue has 50% of its buffer back, turn congestion control off - if !congested { + if !congestedERL { handler.erl.DisableCongestionControl() } } @@ -661,9 +663,12 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net } // rate limit per application in a group. Limiting any app in a group drops the entire message. - if handler.appLimiter != nil && congested && handler.appLimiter.shouldDrop(unverifiedTxGroup, rawmsg.Sender.(network.IPAddressable).RoutingAddr()) { - transactionMessagesAppLimiterDrop.Inc(nil) - return network.OutgoingMessage{Action: network.Ignore} + if handler.appLimiter != nil { + congestedARL := len(handler.backlogQueue) > handler.appLimiterBacklogThreshold + if congestedARL && handler.appLimiter.shouldDrop(unverifiedTxGroup, rawmsg.Sender.(network.IPAddressable).RoutingAddr()) { + transactionMessagesAppLimiterDrop.Inc(nil) + return network.OutgoingMessage{Action: network.Ignore} + } } select { |