summaryrefslogtreecommitdiff
path: root/ledger/tracker.go
blob: 44a255b03680732cc99e998ac4ce8a605b680d5d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
// Copyright (C) 2019-2021 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand.  If not, see <https://www.gnu.org/licenses/>.

package ledger

import (
	"context"
	"database/sql"
	"errors"
	"fmt"
	"reflect"
	"sync"
	"time"

	"github.com/algorand/go-algorand/config"
	"github.com/algorand/go-algorand/crypto"
	"github.com/algorand/go-algorand/data/basics"
	"github.com/algorand/go-algorand/data/bookkeeping"
	"github.com/algorand/go-algorand/ledger/internal"
	"github.com/algorand/go-algorand/ledger/ledgercore"
	"github.com/algorand/go-algorand/logging"
	"github.com/algorand/go-algorand/logging/telemetryspec"
	"github.com/algorand/go-algorand/util/db"
	"github.com/algorand/go-deadlock"
)

// ledgerTracker defines part of the API for any state machine that
// tracks the ledger's blockchain.  In addition to the API below,
// each ledgerTracker provides a tracker-specific read-only API
// (e.g., querying the balance of an account).
//
// A tracker's read-only API must be indexed by rounds, and the
// tracker must be prepared to answer queries for rounds until a
// subsequent call to committedUpTo().
//
// For example, the rewards AVL tree must be prepared to answer
// queries for old rounds, even if the tree has moved on in response
// to newBlock() calls.  It should do so by remembering the precise
// answer for old rounds, until committedUpTo() allows it to GC
// those old answers.
//
// The ledger provides a RWMutex to ensure that the tracker is invoked
// with at most one modification API call (below), OR zero modification
// calls and any number of read-only calls.  If internally the tracker
// modifies state in response to read-only calls, it is the tracker's
// responsibility to ensure thread-safety.
type ledgerTracker interface {
	// loadFromDisk loads the state of a tracker from persistent
	// storage.  The ledger argument allows loadFromDisk to load
	// blocks from the database, or access its own state.  The
	// ledgerForTracker interface abstracts away the details of
	// ledger internals so that individual trackers can be tested
	// in isolation. The provided round number represents the
	// current accounts storage round number.
	loadFromDisk(ledgerForTracker, basics.Round) error

	// newBlock informs the tracker of a new block along with
	// a given ledgercore.StateDelta as produced by BlockEvaluator.
	newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta)

	// committedUpTo informs the tracker that the block database has
	// committed all blocks up to and including rnd to persistent
	// storage.  This can allow the tracker
	// to garbage-collect state that will not be needed.
	//
	// committedUpTo() returns the round number of the earliest
	// block that this tracker needs to be stored in the block
	// database for subsequent calls to loadFromDisk().
	// All blocks with round numbers before that may be deleted to
	// save space, and the tracker is expected to still function
	// after a restart and a call to loadFromDisk().
	// For example, returning 0 means that no blocks can be deleted.
	// Separetly, the method returns the lookback that is being
	// maintained by the tracker.
	committedUpTo(basics.Round) (minRound, lookback basics.Round)

	// produceCommittingTask prepares a deferredCommitRange; Preparing a deferredCommitRange is a joint
	// effort, and all the trackers contribute to that effort. All the trackers are being handed a
	// pointer to the deferredCommitRange, and have the ability to either modify it, or return a
	// nil. If nil is returned, the commit would be skipped.
	produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange

	// prepareCommit, commitRound and postCommit are called when it is time to commit tracker's data.
	// If an error returned the process is aborted.

	// prepareCommit aligns the data structures stored in the deferredCommitContext with the current
	// state of the tracker. It allows the tracker to decide what data is going to be persisted
	// on the coming commitRound.
	prepareCommit(*deferredCommitContext) error
	// commitRound is called for each of the trackers after a deferredCommitContext was agreed upon
	// by all the prepareCommit calls. The commitRound is being executed within a single transactional
	// context, and so, if any of the tracker's commitRound calls fails, the transaction is rolled back.
	commitRound(context.Context, *sql.Tx, *deferredCommitContext) error
	// postCommit is called only on a successful commitRound. In that case, each of the trackers have
	// the chance to update it's internal data structures, knowing that the given deferredCommitContext
	// has completed. An optional context is provided for long-running operations.
	postCommit(context.Context, *deferredCommitContext)

	// postCommitUnlocked is called only on a successful commitRound. In that case, each of the trackers have
	// the chance to make changes that aren't state-dependent.
	// An optional context is provided for long-running operations.
	postCommitUnlocked(context.Context, *deferredCommitContext)

	// handleUnorderedCommit is a special method for handling deferred commits that are out of order.
	// Tracker might update own state in this case. For example, account updates tracker cancels
	// scheduled catchpoint writing that deferred commit.
	handleUnorderedCommit(uint64, basics.Round, basics.Round)

	// close terminates the tracker, reclaiming any resources
	// like open database connections or goroutines.  close may
	// be called even if loadFromDisk() is not called or does
	// not succeed.
	close()
}

// ledgerForTracker defines the part of the ledger that a tracker can
// access.  This is particularly useful for testing trackers in isolation.
type ledgerForTracker interface {
	trackerDB() db.Pair
	blockDB() db.Pair
	trackerLog() logging.Logger
	trackerEvalVerified(bookkeeping.Block, internal.LedgerForEvaluator) (ledgercore.StateDelta, error)

	Latest() basics.Round
	Block(basics.Round) (bookkeeping.Block, error)
	BlockHdr(basics.Round) (bookkeeping.BlockHeader, error)
	GenesisHash() crypto.Digest
	GenesisProto() config.ConsensusParams
	GenesisAccounts() map[basics.Address]basics.AccountData
}

type trackerRegistry struct {
	trackers []ledgerTracker
	// the accts has some exceptional usages in the tracker registry.
	accts *accountUpdates

	// ctx is the context for the committing go-routine.
	ctx context.Context
	// ctxCancel is the canceling function for canceling the committing go-routine ( i.e. signaling the committing go-routine that it's time to abort )
	ctxCancel context.CancelFunc

	// deferredCommits is the channel of pending deferred commits
	deferredCommits chan *deferredCommitContext

	// commitSyncerClosed is the blocking channel for synchronizing closing the commitSyncer goroutine. Once it's closed, the
	// commitSyncer can be assumed to have aborted.
	commitSyncerClosed chan struct{}

	// accountsWriting provides synchronization around the background writing of account balances.
	accountsWriting sync.WaitGroup

	// dbRound is always exactly accountsRound(),
	// cached to avoid SQL queries.
	dbRound basics.Round

	dbs db.Pair
	log logging.Logger

	// the synchronous mode that would be used for the account database.
	synchronousMode db.SynchronousMode

	// the synchronous mode that would be used while the accounts database is being rebuilt.
	accountsRebuildSynchronousMode db.SynchronousMode

	mu deadlock.RWMutex

	// lastFlushTime is the time we last flushed updates to
	// the accounts DB (bumping dbRound).
	lastFlushTime time.Time
}

// deferredCommitRange is used during the calls to produceCommittingTask, and used as a data structure
// to syncronize the various trackers and create a uniformity around which rounds need to be persisted
// next.
type deferredCommitRange struct {
	offset   uint64
	oldBase  basics.Round
	lookback basics.Round

	// pendingDeltas is the number of accounts that were modified within this commit context.
	// note that in this number we might have the same account being modified several times.
	pendingDeltas int

	isCatchpointRound bool

	// catchpointWriting is a pointer to a varible with the same name in the catchpointTracker.
	// it's used in order to reset the catchpointWriting flag from the acctupdates's
	// prepareCommit/commitRound ( which is called before the corresponding catchpoint tracker method )
	catchpointWriting *int32
}

// deferredCommitContext is used in order to syncornize the persistence of a given deferredCommitRange.
// prepareCommit, commitRound and postCommit are all using it to exchange data.
type deferredCommitContext struct {
	deferredCommitRange

	newBase   basics.Round
	flushTime time.Time

	genesisProto config.ConsensusParams

	deltas                 []ledgercore.AccountDeltas
	roundTotals            ledgercore.AccountTotals
	compactAccountDeltas   compactAccountDeltas
	compactCreatableDeltas map[basics.CreatableIndex]ledgercore.ModifiedCreatable

	updatedPersistedAccounts []persistedAccountData

	committedRoundDigest     crypto.Digest
	trieBalancesHash         crypto.Digest
	updatingBalancesDuration time.Duration
	catchpointLabel          string

	stats       telemetryspec.AccountsUpdateMetrics
	updateStats bool
}

var errMissingAccountUpdateTracker = errors.New("initializeTrackerCaches : called without a valid accounts update tracker")

func (tr *trackerRegistry) initialize(l ledgerForTracker, trackers []ledgerTracker, cfg config.Local) (err error) {
	tr.dbs = l.trackerDB()
	tr.log = l.trackerLog()

	err = tr.dbs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
		tr.dbRound, err = accountsRound(tx)
		return err
	})

	if err != nil {
		return err
	}

	tr.ctx, tr.ctxCancel = context.WithCancel(context.Background())
	tr.deferredCommits = make(chan *deferredCommitContext, 1)
	tr.commitSyncerClosed = make(chan struct{})
	tr.synchronousMode = db.SynchronousMode(cfg.LedgerSynchronousMode)
	tr.accountsRebuildSynchronousMode = db.SynchronousMode(cfg.AccountsRebuildSynchronousMode)
	go tr.commitSyncer(tr.deferredCommits)

	tr.trackers = append([]ledgerTracker{}, trackers...)

	for _, tracker := range tr.trackers {
		if accts, ok := tracker.(*accountUpdates); ok {
			tr.accts = accts
			break
		}
	}
	return
}

func (tr *trackerRegistry) loadFromDisk(l ledgerForTracker) error {
	tr.mu.RLock()
	dbRound := tr.dbRound
	tr.mu.RUnlock()

	for _, lt := range tr.trackers {
		err := lt.loadFromDisk(l, dbRound)
		if err != nil {
			// find the tracker name.
			trackerName := reflect.TypeOf(lt).String()
			return fmt.Errorf("tracker %s failed to loadFromDisk : %v", trackerName, err)
		}
	}

	err := tr.initializeTrackerCaches(l)
	if err != nil {
		return err
	}
	// the votes have a special dependency on the account updates, so we need to initialize these separetly.
	tr.accts.voters = &votersTracker{}
	err = tr.accts.voters.loadFromDisk(l, tr.accts)
	return err
}

func (tr *trackerRegistry) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) {
	for _, lt := range tr.trackers {
		lt.newBlock(blk, delta)
	}
}

func (tr *trackerRegistry) committedUpTo(rnd basics.Round) basics.Round {
	minBlock := rnd
	maxLookback := basics.Round(0)
	for _, lt := range tr.trackers {
		retainRound, lookback := lt.committedUpTo(rnd)
		if retainRound < minBlock {
			minBlock = retainRound
		}
		if lookback > maxLookback {
			maxLookback = lookback
		}
	}

	tr.scheduleCommit(rnd, maxLookback)

	return minBlock
}

func (tr *trackerRegistry) scheduleCommit(blockqRound, maxLookback basics.Round) {
	tr.mu.RLock()
	dbRound := tr.dbRound
	tr.mu.RUnlock()

	dcc := &deferredCommitContext{
		deferredCommitRange: deferredCommitRange{
			lookback: maxLookback,
		},
	}
	cdr := &dcc.deferredCommitRange
	for _, lt := range tr.trackers {
		cdr = lt.produceCommittingTask(blockqRound, dbRound, cdr)
		if cdr == nil {
			break
		}
	}
	if cdr != nil {
		dcc.deferredCommitRange = *cdr
	} else {
		dcc = nil
	}

	tr.mu.RLock()
	// If we recently flushed, wait to aggregate some more blocks.
	// ( unless we're creating a catchpoint, in which case we want to flush it right away
	//   so that all the instances of the catchpoint would contain exactly the same data )
	flushTime := time.Now()
	if dcc != nil && !flushTime.After(tr.lastFlushTime.Add(balancesFlushInterval)) && !dcc.isCatchpointRound && dcc.pendingDeltas < pendingDeltasFlushThreshold {
		dcc = nil
	}
	tr.mu.RUnlock()

	if dcc != nil {
		tr.accountsWriting.Add(1)
		tr.deferredCommits <- dcc
	}
}

// waitAccountsWriting waits for all the pending ( or current ) account writing to be completed.
func (tr *trackerRegistry) waitAccountsWriting() {
	tr.accountsWriting.Wait()
}

func (tr *trackerRegistry) close() {
	if tr.ctxCancel != nil {
		tr.ctxCancel()
	}

	// close() is called from reloadLedger() when and trackerRegistry is not initialized yet
	if tr.commitSyncerClosed != nil {
		tr.waitAccountsWriting()
		// this would block until the commitSyncerClosed channel get closed.
		<-tr.commitSyncerClosed
	}

	for _, lt := range tr.trackers {
		lt.close()
	}
	tr.trackers = nil
	tr.accts = nil
}

// commitSyncer is the syncer go-routine function which perform the database updates. Internally, it dequeues deferredCommits and
// send the tasks to commitRound for completing the operation.
func (tr *trackerRegistry) commitSyncer(deferredCommits chan *deferredCommitContext) {
	defer close(tr.commitSyncerClosed)
	for {
		select {
		case commit, ok := <-deferredCommits:
			if !ok {
				return
			}
			tr.commitRound(commit)
		case <-tr.ctx.Done():
			// drain the pending commits queue:
			drained := false
			for !drained {
				select {
				case <-deferredCommits:
					tr.accountsWriting.Done()
				default:
					drained = true
				}
			}
			return
		}
	}
}

// commitRound commits the given deferredCommitContext via the trackers.
func (tr *trackerRegistry) commitRound(dcc *deferredCommitContext) {
	defer tr.accountsWriting.Done()
	tr.mu.RLock()

	offset := dcc.offset
	dbRound := dcc.oldBase
	lookback := dcc.lookback

	// we can exit right away, as this is the result of mis-ordered call to committedUpTo.
	if tr.dbRound < dbRound || offset < uint64(tr.dbRound-dbRound) {
		tr.log.Warnf("out of order deferred commit: offset %d, dbRound %d but current tracker DB round is %d", offset, dbRound, tr.dbRound)
		for _, lt := range tr.trackers {
			lt.handleUnorderedCommit(offset, dbRound, lookback)
		}
		tr.mu.RUnlock()
		return
	}

	// adjust the offset according to what happened meanwhile..
	offset -= uint64(tr.dbRound - dbRound)

	// if this iteration need to flush out zero rounds, just return right away.
	// this usecase can happen when two subsequent calls to committedUpTo concludes that the same rounds range need to be
	// flush, without the commitRound have a chance of committing these rounds.
	if offset == 0 {
		tr.mu.RUnlock()
		return
	}

	dbRound = tr.dbRound
	newBase := basics.Round(offset) + dbRound

	dcc.offset = offset
	dcc.oldBase = dbRound
	dcc.newBase = newBase
	dcc.flushTime = time.Now()

	for _, lt := range tr.trackers {
		err := lt.prepareCommit(dcc)
		if err != nil {
			tr.log.Errorf(err.Error())
			tr.mu.RUnlock()
			return
		}
	}
	tr.mu.RUnlock()

	start := time.Now()
	ledgerCommitroundCount.Inc(nil)
	err := tr.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
		for _, lt := range tr.trackers {
			err0 := lt.commitRound(ctx, tx, dcc)
			if err0 != nil {
				return err0
			}
		}

		err = updateAccountsRound(tx, dbRound+basics.Round(offset))
		if err != nil {
			return err
		}

		return nil
	})
	ledgerCommitroundMicros.AddMicrosecondsSince(start, nil)

	if err != nil {
		tr.log.Warnf("unable to advance tracker db snapshot (%d-%d): %v", dbRound, dbRound+basics.Round(offset), err)
		return
	}

	tr.mu.Lock()
	tr.dbRound = newBase
	for _, lt := range tr.trackers {
		lt.postCommit(tr.ctx, dcc)
	}
	tr.lastFlushTime = dcc.flushTime
	tr.mu.Unlock()

	for _, lt := range tr.trackers {
		lt.postCommitUnlocked(tr.ctx, dcc)
	}

}

// initializeTrackerCaches fills up the accountUpdates cache with the most recent ~320 blocks ( on normal execution ).
// the method also support balances recovery in cases where the difference between the lastBalancesRound and the lastestBlockRound
// is far greater than 320; in these cases, it would flush to disk periodically in order to avoid high memory consumption.
func (tr *trackerRegistry) initializeTrackerCaches(l ledgerForTracker) (err error) {
	lastestBlockRound := l.Latest()
	lastBalancesRound := tr.dbRound

	var blk bookkeeping.Block
	var delta ledgercore.StateDelta

	if tr.accts == nil {
		return errMissingAccountUpdateTracker
	}

	accLedgerEval := accountUpdatesLedgerEvaluator{
		au: tr.accts,
	}

	if lastBalancesRound < lastestBlockRound {
		accLedgerEval.prevHeader, err = l.BlockHdr(lastBalancesRound)
		if err != nil {
			return err
		}
	}

	skipAccountCacheMessage := make(chan struct{})
	writeAccountCacheMessageCompleted := make(chan struct{})
	defer func() {
		close(skipAccountCacheMessage)
		select {
		case <-writeAccountCacheMessageCompleted:
			if err == nil {
				tr.log.Infof("initializeTrackerCaches completed initializing account data caches")
			}
		default:
		}
	}()

	catchpointInterval := uint64(0)
	for _, tracker := range tr.trackers {
		if catchpointTracker, ok := tracker.(*catchpointTracker); ok {
			catchpointInterval = catchpointTracker.catchpointInterval
			break
		}
	}

	// this goroutine logs a message once if the parent function have not completed in initializingAccountCachesMessageTimeout seconds.
	// the message is important, since we're blocking on the ledger block database here, and we want to make sure that we log a message
	// within the above timeout.
	go func() {
		select {
		case <-time.After(initializingAccountCachesMessageTimeout):
			tr.log.Infof("initializeTrackerCaches is initializing account data caches")
			close(writeAccountCacheMessageCompleted)
		case <-skipAccountCacheMessage:
		}
	}()

	blocksStream := make(chan bookkeeping.Block, initializeCachesReadaheadBlocksStream)
	blockEvalFailed := make(chan struct{}, 1)
	var blockRetrievalError error
	go func() {
		defer close(blocksStream)
		for roundNumber := lastBalancesRound + 1; roundNumber <= lastestBlockRound; roundNumber++ {
			blk, blockRetrievalError = l.Block(roundNumber)
			if blockRetrievalError != nil {
				return
			}
			select {
			case blocksStream <- blk:
			case <-blockEvalFailed:
				return
			}
		}
	}()

	lastFlushedRound := lastBalancesRound
	const accountsCacheLoadingMessageInterval = 5 * time.Second
	lastProgressMessage := time.Now().Add(-accountsCacheLoadingMessageInterval / 2)

	// rollbackSynchronousMode ensures that we switch to "fast writing mode" when we start flushing out rounds to disk, and that
	// we exit this mode when we're done.
	rollbackSynchronousMode := false
	defer func() {
		if rollbackSynchronousMode {
			// restore default synchronous mode
			err0 := tr.dbs.Wdb.SetSynchronousMode(context.Background(), tr.synchronousMode, tr.synchronousMode >= db.SynchronousModeFull)
			// override the returned error only in case there is no error - since this
			// operation has a lower criticality.
			if err == nil {
				err = err0
			}
		}
	}()

	for blk := range blocksStream {
		delta, err = l.trackerEvalVerified(blk, &accLedgerEval)
		if err != nil {
			close(blockEvalFailed)
			return
		}
		tr.newBlock(blk, delta)

		// flush to disk if any of the following applies:
		// 1. if we have loaded up more than initializeCachesRoundFlushInterval rounds since the last time we flushed the data to disk
		// 2. if we completed the loading and we loaded up more than 320 rounds.
		flushIntervalExceed := blk.Round()-lastFlushedRound > initializeCachesRoundFlushInterval
		loadCompleted := (lastestBlockRound == blk.Round() && lastBalancesRound+basics.Round(blk.ConsensusProtocol().MaxBalLookback) < lastestBlockRound)
		if flushIntervalExceed || loadCompleted {
			// adjust the last flush time, so that we would not hold off the flushing due to "working too fast"
			tr.lastFlushTime = time.Now().Add(-balancesFlushInterval)

			if !rollbackSynchronousMode {
				// switch to rebuild synchronous mode to improve performance
				err0 := tr.dbs.Wdb.SetSynchronousMode(context.Background(), tr.accountsRebuildSynchronousMode, tr.accountsRebuildSynchronousMode >= db.SynchronousModeFull)
				if err0 != nil {
					tr.log.Warnf("initializeTrackerCaches was unable to switch to rbuild synchronous mode : %v", err0)
				} else {
					// flip the switch to rollback the synchronous mode once we're done.
					rollbackSynchronousMode = true
				}
			}

			var roundsBehind basics.Round

			// flush the account data
			tr.scheduleCommit(blk.Round(), basics.Round(config.Consensus[blk.BlockHeader.CurrentProtocol].MaxBalLookback))
			// wait for the writing to complete.
			tr.waitAccountsWriting()

			func() {
				tr.mu.RLock()
				defer tr.mu.RUnlock()

				// The au.dbRound after writing should be ~320 behind the block round.
				roundsBehind = blk.Round() - tr.dbRound
			}()

			// are we too far behind ? ( taking into consideration the catchpoint writing, which can stall the writing for quite a bit )
			if roundsBehind > initializeCachesRoundFlushInterval+basics.Round(catchpointInterval) {
				// we're unable to persist changes. This is unexpected, but there is no point in keep trying batching additional changes since any further changes
				// would just accumulate in memory.
				close(blockEvalFailed)
				tr.log.Errorf("initializeTrackerCaches was unable to fill up the account caches accounts round = %d, block round = %d. See above error for more details.", blk.Round()-roundsBehind, blk.Round())
				err = fmt.Errorf("initializeTrackerCaches failed to initialize the account data caches")
				return
			}

			// and once we flushed it to disk, update the lastFlushedRound
			lastFlushedRound = blk.Round()
		}

		// if enough time have passed since the last time we wrote a message to the log file then give the user an update about the progess.
		if time.Since(lastProgressMessage) > accountsCacheLoadingMessageInterval {
			// drop the initial message if we're got to this point since a message saying "still initializing" that comes after "is initializing" doesn't seems to be right.
			select {
			case skipAccountCacheMessage <- struct{}{}:
				// if we got to this point, we should be able to close the writeAccountCacheMessageCompleted channel to have the "completed initializing" message written.
				close(writeAccountCacheMessageCompleted)
			default:
			}
			tr.log.Infof("initializeTrackerCaches is still initializing account data caches, %d rounds loaded out of %d rounds", blk.Round()-lastBalancesRound, lastestBlockRound-lastBalancesRound)
			lastProgressMessage = time.Now()
		}

		// prepare for the next iteration.
		accLedgerEval.prevHeader = *delta.Hdr
	}

	if blockRetrievalError != nil {
		err = blockRetrievalError
	}
	return

}