summaryrefslogtreecommitdiff
path: root/data/txHandler.go
blob: 0ecef433c189901b3aafeaedab5fcff5f49dc233 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
// Copyright (C) 2019-2021 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand.  If not, see <https://www.gnu.org/licenses/>.

package data

import (
	"bytes"
	"context"
	"fmt"
	"io"
	"sync"

	"github.com/algorand/go-algorand/crypto"
	"github.com/algorand/go-algorand/data/bookkeeping"
	"github.com/algorand/go-algorand/data/pooldata"
	"github.com/algorand/go-algorand/data/pools"
	"github.com/algorand/go-algorand/data/transactions"
	"github.com/algorand/go-algorand/data/transactions/verify"
	"github.com/algorand/go-algorand/ledger/ledgercore"
	"github.com/algorand/go-algorand/logging"
	"github.com/algorand/go-algorand/network"
	"github.com/algorand/go-algorand/protocol"
	"github.com/algorand/go-algorand/util/execpool"
	"github.com/algorand/go-algorand/util/metrics"
)

// The size txBacklogSize used to determine the size of the backlog that is used to store incoming transaction messages before starting dropping them.
// It should be configured to be higher then the number of CPU cores, so that the execution pool get saturated, but not too high to avoid lockout of the
// execution pool for a long duration of time.
const txBacklogSize = 1000

var transactionMessagesHandled = metrics.MakeCounter(metrics.TransactionMessagesHandled)
var transactionMessagesDroppedFromBacklog = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromBacklog)
var transactionMessagesDroppedFromPool = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromPool)

// The txBacklogMsg structure used to track a single incoming transaction from the gossip network,
type txBacklogMsg struct {
	rawmsg            *network.IncomingMessage // the raw message from the network
	unverifiedTxGroup []transactions.SignedTxn // the unverified ( and signed ) transaction group
	verificationErr   error                    // The verification error generated by the verification function, if any.
}

// TxHandler handles transaction messages
type TxHandler struct {
	txPool                *pools.TransactionPool
	ledger                *Ledger
	genesisID             string
	genesisHash           crypto.Digest
	txVerificationPool    execpool.BacklogPool
	backlogQueue          chan *txBacklogMsg
	postVerificationQueue chan *txBacklogMsg
	backlogWg             sync.WaitGroup
	net                   network.GossipNode
	ctx                   context.Context
	ctxCancel             context.CancelFunc
}

// MakeTxHandler makes a new handler for transaction messages
func MakeTxHandler(txPool *pools.TransactionPool, ledger *Ledger, net network.GossipNode, genesisID string, genesisHash crypto.Digest, executionPool execpool.BacklogPool) *TxHandler {

	if txPool == nil {
		logging.Base().Fatal("MakeTxHandler: txPool is nil on initialization")
		return nil
	}

	if ledger == nil {
		logging.Base().Fatal("MakeTxHandler: ledger is nil on initialization")
		return nil
	}

	handler := &TxHandler{
		txPool:                txPool,
		genesisID:             genesisID,
		genesisHash:           genesisHash,
		ledger:                ledger,
		txVerificationPool:    executionPool,
		backlogQueue:          make(chan *txBacklogMsg, txBacklogSize),
		postVerificationQueue: make(chan *txBacklogMsg, txBacklogSize),
		net:                   net,
	}

	handler.ctx, handler.ctxCancel = context.WithCancel(context.Background())
	return handler
}

// Start enables the processing of incoming messages at the transaction handler
func (handler *TxHandler) Start() {
	handler.net.RegisterHandlers([]network.TaggedMessageHandler{
		{Tag: protocol.TxnTag, MessageHandler: network.HandlerFunc(handler.processIncomingTxn)},
	})
	handler.backlogWg.Add(1)
	go handler.backlogWorker()
}

// Stop suspends the processing of incoming messages at the transaction handler
func (handler *TxHandler) Stop() {
	handler.ctxCancel()
	handler.backlogWg.Wait()
}

func reencode(stxns []transactions.SignedTxn) []byte {
	var result [][]byte
	for _, stxn := range stxns {
		result = append(result, protocol.Encode(&stxn))
	}
	return bytes.Join(result, nil)
}

// backlogWorker is the worker go routine that process the incoming messages from the postVerificationQueue and backlogQueue channels
// and dispatches them further.
func (handler *TxHandler) backlogWorker() {
	defer handler.backlogWg.Done()
	for {
		// prioritize the postVerificationQueue
		select {
		case wi, ok := <-handler.postVerificationQueue:
			if !ok {
				return
			}
			handler.postprocessCheckedTxn(wi)

			// restart the loop so that we could empty out the post verification queue.
			continue
		default:
		}

		// we have no more post verification items. wait for either backlog queue item or post verification item.
		select {
		case wi, ok := <-handler.backlogQueue:
			if !ok {
				return
			}
			if handler.checkAlreadyCommitted(wi) {
				continue
			}

			// enqueue the task to the verification pool.
			handler.txVerificationPool.EnqueueBacklog(handler.ctx, handler.asyncVerifySignature, wi, nil)

		case wi, ok := <-handler.postVerificationQueue:
			if !ok {
				return
			}
			handler.postprocessCheckedTxn(wi)

		case <-handler.ctx.Done():
			return
		}
	}
}

func (handler *TxHandler) postprocessCheckedTxn(wi *txBacklogMsg) {
	if wi.verificationErr != nil {
		// disconnect from peer.
		logging.Base().Warnf("Received a malformed tx group %v: %v", wi.unverifiedTxGroup, wi.verificationErr)
		handler.net.Disconnect(wi.rawmsg.Sender)
		return
	}

	// we've processed this message, so increase the counter.
	transactionMessagesHandled.Inc(nil)

	// at this point, we've verified the transaction, so we can safely treat the transaction as a verified transaction.
	verifiedTxGroup := wi.unverifiedTxGroup

	// save the transaction, if it has high enough fee and not already in the cache
	err := handler.txPool.Remember(pooldata.SignedTxGroup{Transactions: verifiedTxGroup})
	if err != nil {
		logging.Base().Debugf("could not remember tx: %v", err)
		return
	}

	// if we remembered without any error ( i.e. txpool wasn't full ), then we should pin these transactions.
	err = handler.ledger.VerifiedTransactionCache().Pin(verifiedTxGroup)
	if err != nil {
		logging.Base().Infof("unable to pin transaction: %v", err)
	}

	// We reencode here instead of using rawmsg.Data to avoid broadcasting non-canonical encodings
	handler.net.Relay(handler.ctx, protocol.TxnTag, reencode(verifiedTxGroup), false, wi.rawmsg.Sender)
}

// asyncVerifySignature verifies that the given transaction group is valid, and update the txBacklogMsg data structure accordingly.
func (handler *TxHandler) asyncVerifySignature(arg interface{}) interface{} {
	tx := arg.(*txBacklogMsg)

	// build the transaction verification context
	latest := handler.ledger.Latest()
	latestHdr, err := handler.ledger.BlockHdr(latest)
	if err != nil {
		tx.verificationErr = fmt.Errorf("Could not get header for previous block %d: %w", latest, err)
		logging.Base().Warnf("Could not get header for previous block %d: %v", latest, err)
	} else {
		// we can't use PaysetGroups here since it's using a execpool like this go-routine and we don't want to deadlock.
		_, tx.verificationErr = verify.TxnGroup(tx.unverifiedTxGroup, latestHdr, handler.ledger.VerifiedTransactionCache())
	}

	select {
	case handler.postVerificationQueue <- tx:
	default:
		// we failed to write to the output queue, since the queue was full.
		// adding the metric here allows us to monitor how frequently it happens.
		transactionMessagesDroppedFromPool.Inc(nil)
	}
	return nil
}

func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) network.OutgoingMessage {
	dec := protocol.NewDecoderBytes(rawmsg.Data)
	ntx := 0
	unverifiedTxGroup := make([]transactions.SignedTxn, 1)
	for {
		if len(unverifiedTxGroup) == ntx {
			n := make([]transactions.SignedTxn, len(unverifiedTxGroup)*2)
			copy(n, unverifiedTxGroup)
			unverifiedTxGroup = n
		}

		err := dec.Decode(&unverifiedTxGroup[ntx])
		if err == io.EOF {
			break
		}
		if err != nil {
			logging.Base().Warnf("Received a non-decodable txn: %v", err)
			return network.OutgoingMessage{Action: network.Disconnect}
		}
		ntx++
	}
	if ntx == 0 {
		logging.Base().Warnf("Received empty tx group")
		return network.OutgoingMessage{Action: network.Disconnect}
	}
	unverifiedTxGroup = unverifiedTxGroup[:ntx]

	select {
	case handler.backlogQueue <- &txBacklogMsg{
		rawmsg:            &rawmsg,
		unverifiedTxGroup: unverifiedTxGroup,
	}:
	default:
		// if we failed here we want to increase the corresponding metric. It might suggest that we
		// want to increase the queue size.
		transactionMessagesDroppedFromBacklog.Inc(nil)
	}

	return network.OutgoingMessage{Action: network.Ignore}
}

// checkAlreadyCommitted test to see if the given transaction ( in the txBacklogMsg ) was already committed, and
// whether it would qualify as a candidate for the transaction pool.
//
// Note that this also checks the consistency of the transaction's group hash,
// which is required for safe transaction signature caching behavior.
func (handler *TxHandler) checkAlreadyCommitted(tx *txBacklogMsg) (processingDone bool) {
	// do a quick test to check that this transaction could potentially be committed, to reject dup pending transactions
	err := handler.txPool.Test(tx.unverifiedTxGroup)
	if err != nil {
		logging.Base().Debugf("txPool rejected transaction: %v", err)
		return true
	}
	return false
}

func (handler *TxHandler) processDecoded(unverifiedTxGroup []transactions.SignedTxn) (disconnect bool) {
	tx := &txBacklogMsg{
		unverifiedTxGroup: unverifiedTxGroup,
	}
	if handler.checkAlreadyCommitted(tx) {
		return false
	}

	// build the transaction verification context
	latest := handler.ledger.Latest()
	latestHdr, err := handler.ledger.BlockHdr(latest)
	if err != nil {
		logging.Base().Warnf("Could not get header for previous block %v: %v", latest, err)
		return false
	}

	unverifiedTxnGroups := bookkeeping.SignedTxnsToGroups(unverifiedTxGroup)
	err = verify.PaysetGroups(context.Background(), unverifiedTxnGroups, latestHdr, handler.txVerificationPool, handler.ledger.VerifiedTransactionCache())
	if err != nil {
		// transaction is invalid
		logging.Base().Warnf("One or more transactions were malformed: %v", err)
		return true
	}

	// at this point, we've verified the transaction group,
	// so we can safely treat the transaction as a verified transaction.
	verifiedTxGroup := unverifiedTxGroup

	// save the transaction, if it has high enough fee and not already in the cache
	err = handler.txPool.Remember(pooldata.SignedTxGroup{Transactions: verifiedTxGroup})
	if err != nil {
		logging.Base().Debugf("could not remember tx: %v", err)
		return false
	}

	// if we remembered without any error ( i.e. txpool wasn't full ), then we should pin these transactions.
	err = handler.ledger.VerifiedTransactionCache().Pin(verifiedTxGroup)
	if err != nil {
		logging.Base().Warnf("unable to pin transaction: %v", err)
	}

	return false
}

// filterAlreadyCommitted scan the list of signed transaction groups, and filter out the ones that have already been included,
// or that should not be added to the transaction pool.
// the resulting slice is using the *same* underlying array as the input slice, and the caller must ensure that this would not
// cause issue on the caller side. The hasError describe whether any of the removed transacation groups was
// removed for a reason *other* than being duplicate ( for instance, malformed transaction )
func (handler *TxHandler) filterAlreadyCommitted(unverifiedTxGroups []pooldata.SignedTxGroup) (filteredGroups []pooldata.SignedTxGroup, hasError bool) {
	remainedTxnsGroupOffset := 0
	for idx, utxng := range unverifiedTxGroups {
		err := handler.txPool.Test(utxng.Transactions)
		switch err.(type) {
		case nil:
			// no error was generated.
			if remainedTxnsGroupOffset != idx {
				unverifiedTxGroups[remainedTxnsGroupOffset] = utxng
			}
			remainedTxnsGroupOffset++
		case *ledgercore.TransactionInLedgerError:
			// this is a duplicate transaction group.
		default:
			// some non-duplicate error was reported on this group.
			hasError = true
		}
	}
	return unverifiedTxGroups[:remainedTxnsGroupOffset], hasError
}

// processDecodedArray receives a slice of transaction groups and attempt to add them to the transaction pool.
// The processDecodedArray returns whether the node should be disconnecting from the source of these transactions ( in case a malicious transaction is found )
// as well as whether all the provided transactions were included in the transaction pool or committed.
func (handler *TxHandler) processDecodedArray(unverifiedTxGroups []pooldata.SignedTxGroup) (disconnect, allTransactionIncluded bool) {
	var hasError bool
	unverifiedTxGroups, hasError = handler.filterAlreadyCommitted(unverifiedTxGroups)

	if len(unverifiedTxGroups) == 0 {
		return false, !hasError
	}

	// build the transaction verification context
	latest := handler.ledger.Latest()
	latestHdr, err := handler.ledger.BlockHdr(latest)
	if err != nil {
		// being unable to retrieve the last's block header is not something a working node is expected to expirience ( ever ).
		logging.Base().Errorf("Could not get header for previous block %d: %v", latest, err)
		// returning a disconnect=true, would not fix the problem for the local node, but would force the remote node to pick a different
		// relay, which ( hopefully ! ) would not have the same issue as this one.
		return true, false
	}

	unverifiedTxnGroups := make([][]transactions.SignedTxn, len(unverifiedTxGroups))
	for i, unverifiedGroup := range unverifiedTxGroups {
		unverifiedTxnGroups[i] = unverifiedGroup.Transactions
	}

	err = verify.PaysetGroups(context.Background(), unverifiedTxnGroups, latestHdr, handler.txVerificationPool, handler.ledger.VerifiedTransactionCache())
	if err != nil {
		// transaction is invalid
		logging.Base().Warnf("One or more transactions were malformed: %v", err)
		return true, false
	}

	// at this point, we've verified the transaction group,
	// so we can safely treat the transaction as a verified transaction.
	verifiedTxGroups := unverifiedTxGroups

	// before calling RememberArray we should reallocate the individual remaining
	// signed transactions - these transactions were allocated in bulk by the
	// transaction sync. By re-allocating the backing storage, we would allow the
	// original backing storage ( which includes transactions that won't go into the
	// transaction pool ) to be garbge collected.
	for i, group := range verifiedTxGroups {
		copiedTransactions := make(pooldata.SignedTxnSlice, len(group.Transactions))
		copy(copiedTransactions, group.Transactions)
		verifiedTxGroups[i].Transactions = copiedTransactions
	}

	// save the transaction, if it has high enough fee and not already in the cache
	err = handler.txPool.RememberArray(verifiedTxGroups)
	if err != nil {
		logging.Base().Debugf("could not remember tx: %v", err)
		return false, false
	}

	// if we remembered without any error ( i.e. txpool wasn't full ), then we should pin these transactions.
	err = handler.ledger.VerifiedTransactionCache().PinGroups(verifiedTxGroups)
	if err != nil {
		logging.Base().Warnf("unable to pin transaction: %v", err)
	}

	return false, !hasError
}

// SolicitedTxHandler handles messages received through channels other than the gossip network.
// It therefore circumvents the notion of incoming/outgoing messages
type SolicitedTxHandler interface {
	Handle(txgroup []transactions.SignedTxn) error
}

type solicitedTxHandler struct {
	txHandler *TxHandler
}

// SolicitedTxHandler converts a transaction handler to a SolicitedTxHandler
func (handler *TxHandler) SolicitedTxHandler() SolicitedTxHandler {
	return &solicitedTxHandler{txHandler: handler}
}

func (handler *solicitedTxHandler) Handle(txgroup []transactions.SignedTxn) error {
	disconnect := handler.txHandler.processDecoded(txgroup)
	if disconnect {
		return fmt.Errorf("invalid transaction")
	}
	return nil
}

// SolicitedAsyncTxHandler handles slices of transaction groups received from the transaction sync.
// It provides a non-blocking queueing for the processing of these transaction groups, which allows
// the single-threaded transaction sync to keep processing other messages.
type SolicitedAsyncTxHandler interface {
	// HandleTransactionGroups enqueues the given slice of transaction groups that came from the given network peer with
	// the given message sequence number. The provided acknowledgement channel provides a feedback for the transaction sync
	// that the entire transaction group slice was added ( or already included ) within the transaction pool. The method
	// return true if it's able to enqueue the processing task, or false if it's unable to enqueue the processing task.
	HandleTransactionGroups(networkPeer interface{}, ackCh chan uint64, messageSeq uint64, groups []pooldata.SignedTxGroup) bool
	Start()
	Stop()
}

type solicitedAsyncTxHandler struct {
	txHandler     *TxHandler
	backlogGroups chan *txGroups
	stopped       sync.WaitGroup
	stopCtxFunc   context.CancelFunc
	// skipNextBacklogWarning is used to avoid repeated backlog full warning messages.
	skipNextBacklogWarning bool
}

type txGroups struct {
	// the network package opaque network peer
	networkPeer interface{}
	// the feedback channel, in case we've successfully added the transaction groups to the transaction pool.
	ackCh chan uint64
	// the message sequence number, which would be written back to the feedback channel
	messageSeq uint64
	// the transactions groups slice
	txGroups []pooldata.SignedTxGroup
}

// SolicitedAsyncTxHandler converts a transaction handler to a SolicitedTxHandler
func (handler *TxHandler) SolicitedAsyncTxHandler() SolicitedAsyncTxHandler {
	return &solicitedAsyncTxHandler{
		txHandler:              handler,
		backlogGroups:          make(chan *txGroups, txBacklogSize),
		skipNextBacklogWarning: false,
	}
}

// HandleTransactionGroups implements the solicitedAsyncTxHandler.HandleTransactionGroups interface.
// It enqueues the given slice of transaction groups that came from the given network peer with
// the given message sequence number. The provided acknowledgement channel provides a feedback for the transaction sync
// that the entire transaction group slice was added ( or already included ) within the transaction pool. The method
// return true if it's able to enqueue the processing task, or false if it's unable to enqueue the processing task.
func (handler *solicitedAsyncTxHandler) HandleTransactionGroups(networkPeer interface{}, ackCh chan uint64, messageSeq uint64, groups []pooldata.SignedTxGroup) (enqueued bool) {
	select {
	case handler.backlogGroups <- &txGroups{networkPeer: networkPeer, txGroups: groups, ackCh: ackCh, messageSeq: messageSeq}:
		// reset the skipNextBacklogWarning once the number of pending items on the backlogGroups channels goes to
		// less than half of it's capacity.
		if handler.skipNextBacklogWarning && (len(handler.backlogGroups)*2 < cap(handler.backlogGroups)) {
			handler.skipNextBacklogWarning = false
		}
		enqueued = true
	default:
		if !handler.skipNextBacklogWarning {
			logging.Base().Warnf("solicitedAsyncTxHandler exhusted groups backlog")
			handler.skipNextBacklogWarning = true
		}
		// if we failed here we want to increase the corresponding metric. It might suggest that we
		// want to increase the queue size.
		transactionMessagesDroppedFromBacklog.Inc(nil)
	}
	return
}

func (handler *solicitedAsyncTxHandler) Start() {
	if handler.stopCtxFunc == nil {
		handler.txHandler.Start()
		var ctx context.Context
		ctx, handler.stopCtxFunc = context.WithCancel(context.Background())
		handler.stopped.Add(1)
		go handler.loop(ctx)
	}
}

func (handler *solicitedAsyncTxHandler) Stop() {
	if handler.stopCtxFunc != nil {
		handler.stopCtxFunc()
		handler.stopped.Wait()
		handler.stopCtxFunc = nil
		handler.txHandler.Stop()
	}
}

func (handler *solicitedAsyncTxHandler) loop(ctx context.Context) {
	defer handler.stopped.Done()
	var groups *txGroups
	for {
		select {
		case <-ctx.Done():
			return
		case groups = <-handler.backlogGroups:
		}
		disconnect, allTransactionsIncluded := handler.txHandler.processDecodedArray(groups.txGroups)
		if disconnect {
			handler.txHandler.net.Disconnect(groups.networkPeer)
			handler.txHandler.net.RequestConnectOutgoing(false, make(chan struct{}))
			transactionMessagesDroppedFromPool.Inc(nil)
		} else if allTransactionsIncluded {
			for _, txnGroup := range groups.txGroups {
				// We reencode here instead of using rawmsg.Data to avoid broadcasting non-canonical encodings
				err := handler.txHandler.net.Relay(ctx, protocol.TxnTag, reencode(txnGroup.Transactions), false, groups.networkPeer)
				if err != nil {
					logging.Base().Infof("solicitedAsyncTxHandler was unable to relay transaction message : %v", err)
					break
				}
			}
			select {
			case groups.ackCh <- groups.messageSeq:
				// all good, write was successful.
			default:
				// unable to write since channel was full - log this:
				logging.Base().Warnf("solicitedAsyncTxHandler was unable to ack transaction groups inclusion since the acknowledgement channel was full")
			}
			// we've processed this message, so increase the counter.
			transactionMessagesHandled.Inc(nil)
		} else {
			transactionMessagesDroppedFromPool.Inc(nil)
		}
		// clear out the groups; that would allow the GC to collect the group's memory allocations while we wait for the next task.
		*groups = txGroups{}
	}
}