summaryrefslogtreecommitdiff
path: root/util/execpool/stream.go
blob: 29ec4613f13c42f1eb9438b34cad2b80d2189234 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
// Copyright (C) 2019-2024 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand.  If not, see <https://www.gnu.org/licenses/>.

package execpool

import (
	"context"
	"errors"
	"sync"
	"time"

	"github.com/algorand/go-algorand/logging"
)

// ErrShuttingDownError is the error returned when a job is not processed because the service is shutting down
var ErrShuttingDownError = errors.New("not processed, execpool service is shutting down")

// waitForNextJobDuration is the time to wait before sending the batch to the exec pool
// If the incoming rate is low, an input job in the batch may wait no less than
// waitForNextJobDuration before it is sent for processing.
// This can introduce a latency to the propagation in the network (e.g. sigs in txn or vote),
// since every relay will go through this wait time before broadcasting the result.
// However, when the incoming rate is high, the batch will fill up quickly and will send
// for processing before waitForNextJobDuration.
const waitForNextJobDuration = 2 * time.Millisecond

const txnPerWorksetThreshold = 32

// batchSizeBlockLimit is the limit when the batch exceeds, will be added to the exec pool, even if the pool is saturated
// and the stream  will be blocked until the exec pool accepts the batch
const batchSizeBlockLimit = 1024

// InputJob is the interface the incoming jobs need to implement
type InputJob interface {
	GetNumberOfBatchableItems() (count uint64, err error)
}

// BatchProcessor is the interface of the functions needed to prepare a batch from the stream,
// process and return the results
type BatchProcessor interface {
	// ProcessBatch processes a batch packed from the stream in the execpool
	ProcessBatch(jobs []InputJob)
	// GetErredUnprocessed returns an unprocessed jobs because of an err
	GetErredUnprocessed(ue InputJob, err error)
	// Cleanup called on the unprocessed jobs when the service shuts down
	Cleanup(ue []InputJob, err error)
}

// StreamToBatch makes batches from incoming stream of jobs, and submits the batches to the exec pool
type StreamToBatch struct {
	inputChan      <-chan InputJob
	executionPool  BacklogPool
	ctx            context.Context
	activeLoopWg   sync.WaitGroup
	batchProcessor BatchProcessor
}

// MakeStreamToBatch creates a new stream to batch converter
func MakeStreamToBatch(inputChan <-chan InputJob, execPool BacklogPool,
	batchProcessor BatchProcessor) *StreamToBatch {

	return &StreamToBatch{
		inputChan:      inputChan,
		executionPool:  execPool,
		batchProcessor: batchProcessor,
	}
}

// Start is called when the StreamToBatch is created and whenever it needs to restart after
// the ctx is canceled
func (sv *StreamToBatch) Start(ctx context.Context) {
	sv.ctx = ctx
	sv.activeLoopWg.Add(1)
	go sv.batchingLoop()
}

// WaitForStop waits until the batching loop terminates afer the ctx is canceled
func (sv *StreamToBatch) WaitForStop() {
	sv.activeLoopWg.Wait()
}

func (sv *StreamToBatch) batchingLoop() {
	defer sv.activeLoopWg.Done()
	timer := time.NewTicker(waitForNextJobDuration)
	defer timer.Stop()
	var added bool
	var numberOfJobsInCurrent uint64
	var numberOfBatchAttempts uint64
	uJobs := make([]InputJob, 0, 8)
	defer func() { sv.batchProcessor.Cleanup(uJobs, ErrShuttingDownError) }()
	for {
		select {
		case job := <-sv.inputChan:
			numberOfBatchable, err := job.GetNumberOfBatchableItems()
			if err != nil {
				sv.batchProcessor.GetErredUnprocessed(job, err)
				continue
			}

			// if no batchable items here, send this as a task of its own
			if numberOfBatchable == 0 {
				addErr := sv.addBatchToThePoolNow([]InputJob{job})
				if addErr != nil {
					return
				}
				continue // job is handled, continue
			}

			// add this job to the list of batchable jobs
			numberOfJobsInCurrent = numberOfJobsInCurrent + numberOfBatchable
			uJobs = append(uJobs, job)
			if numberOfJobsInCurrent > txnPerWorksetThreshold {
				// enough jobs in the batch to efficiently process

				if numberOfJobsInCurrent > batchSizeBlockLimit {
					// do not consider adding more jobs to this batch.
					// bypass the exec pool situation and queue anyway
					// this is to prevent creation of very large batches
					addErr := sv.addBatchToThePoolNow(uJobs)
					if addErr != nil {
						return
					}
					added = true
				} else {
					added, err = sv.tryAddBatchToThePool(uJobs)
					if err != nil {
						return
					}
				}
				if added {
					numberOfJobsInCurrent = 0
					uJobs = make([]InputJob, 0, 8)
					numberOfBatchAttempts = 0
				} else {
					// was not added because of the exec pool buffer length
					numberOfBatchAttempts++
				}
			}
		case <-timer.C:
			// timer ticked. it is time to send the batch even if it is not full
			if numberOfJobsInCurrent == 0 {
				// nothing batched yet... wait some more
				continue
			}
			var err error
			if numberOfBatchAttempts > 1 {
				// bypass the exec pool situation and queue anyway
				// this is to prevent long delays in the propagation (sigs txn/vote)
				// at least one job has waited 3 x waitForNextJobDuration
				err = sv.addBatchToThePoolNow(uJobs)
				added = true
			} else {
				added, err = sv.tryAddBatchToThePool(uJobs)
			}
			if err != nil {
				return
			}
			if added {
				numberOfJobsInCurrent = 0
				uJobs = make([]InputJob, 0, 8)
				numberOfBatchAttempts = 0
			} else {
				// was not added because of the exec pool buffer length. wait for some more
				numberOfBatchAttempts++
			}
		case <-sv.ctx.Done():
			return
		}
	}
}

func (sv *StreamToBatch) tryAddBatchToThePool(uJobs []InputJob) (added bool, err error) {
	// if the exec pool buffer is full, can go back and collect
	// more jobs instead of waiting in the exec pool buffer
	// e.g. more signatures to the batch do not harm performance but introduce latency when delayed (see crypto.BenchmarkBatchVerifierBig)

	// if the buffer is full
	if l, c := sv.executionPool.BufferSize(); l == c {
		return false, nil
	}
	err = sv.addBatchToThePoolNow(uJobs)
	if err != nil {
		// An error is returned when the context of the pool expires
		return false, err
	}
	return true, nil
}

func (sv *StreamToBatch) addBatchToThePoolNow(unprocessed []InputJob) error {
	// if the context is canceled when the task is in the queue, it should be canceled
	// copy the ctx here so that when the StreamToBatch is started again, and a new context
	// is created, this task still gets canceled due to the ctx at the time of this task
	taskCtx := sv.ctx
	function := func(arg interface{}) interface{} {
		uJobs := arg.([]InputJob)
		if taskCtx.Err() != nil {
			// ctx is canceled. the results will be returned
			sv.batchProcessor.Cleanup(uJobs, ErrShuttingDownError)
			return nil
		}

		sv.batchProcessor.ProcessBatch(uJobs)
		return nil
	}

	// EnqueueBacklog returns an error when the context is canceled
	err := sv.executionPool.EnqueueBacklog(sv.ctx, function, unprocessed, nil)
	if err != nil {
		logging.Base().Infof("addBatchToThePoolNow: EnqueueBacklog returned an error and StreamToBatch will stop: %v", err)
	}
	return err
}