summaryrefslogtreecommitdiff
path: root/util/execpool/pool.go
blob: d44657206fda38bf8850a6a45e1bf087fa22ba6d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
// Copyright (C) 2019-2023 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand.  If not, see <https://www.gnu.org/licenses/>.

package execpool

import (
	"context"
	"runtime"
	"sync"
)

// The list of all valid priority values. When adding new ones, add them before numPrios.
// (i.e. there should be no gaps, and the first priority needs to be zero)
const (
	LowPriority Priority = iota
	HighPriority

	numPrios
)

// ExecutionPool interface exposes the core functionality of the execution pool.
type ExecutionPool interface {
	Enqueue(enqueueCtx context.Context, t ExecFunc, arg interface{}, i Priority, out chan interface{}) error
	GetOwner() interface{}
	Shutdown()
	GetParallelism() int
}

// A pool is a fixed set of worker goroutines which perform tasks in parallel.
type pool struct {
	inputs  []chan enqueuedTask
	wg      sync.WaitGroup
	owner   interface{}
	numCPUs int
}

// A ExecFunc is a unit of work to be executed by a Pool goroutine.
//
// Note that a ExecFunc will occupy a Pool goroutine, so do not schedule tasks
// that spend an excessive amount of time waiting.
type ExecFunc func(interface{}) interface{}

// A Priority specifies a hint to the Pool to execute a Task at some priority.
//
// Tasks with higher Priority values will tend to finish more quickly.
//
// If there are tasks with different priorities, a worker will pick the
// highest-priority task to execute next.
type Priority uint8

type enqueuedTask struct {
	execFunc ExecFunc
	arg      interface{}
	out      chan interface{}
}

// MakePool creates a pool.
func MakePool(owner interface{}) ExecutionPool {
	p := &pool{
		inputs:  make([]chan enqueuedTask, numPrios),
		numCPUs: runtime.NumCPU(),
		owner:   owner,
	}

	// initialize input channels.
	for i := 0; i < len(p.inputs); i++ {
		p.inputs[i] = make(chan enqueuedTask)
	}

	p.wg.Add(p.numCPUs)
	for i := 0; i < p.numCPUs; i++ {
		go p.worker()
	}

	return p
}

// GetParallelism returns the parallelism degree
func (p *pool) GetParallelism() int {
	return p.numCPUs
}

// GetOwner return the owner interface that was passed-in during pool creation.
//
// The idea is that a pool can be either passed-in or created locally. Before shutting down the
// pool, the caller should check if it was passed-in or not. Instead of having a separate flag for
// that purpose, the pool have an "owner" parameters that allows the caller to determine if it need
// to be shut down or not.
func (p *pool) GetOwner() interface{} {
	return p.owner
}

// Enqueue will enqueue a task for verification at a given priority.
//
// Enqueue blocks until the task is enqueued correctly, or until the passed-in
// context is cancelled.
// /
// Enqueue returns nil if task was enqueued successfully or the result of the
// expired context error.
func (p *pool) Enqueue(enqueueCtx context.Context, t ExecFunc, arg interface{}, i Priority, out chan interface{}) error {
	select {
	case p.inputs[i] <- enqueuedTask{
		execFunc: t,
		arg:      arg,
		out:      out,
	}:
		return nil
	case <-enqueueCtx.Done():
		return enqueueCtx.Err()
	}
}

// Shutdown will tell the pool's goroutines to terminate, returning when
// resources have been freed.
//
// It must be called at most once.
func (p *pool) Shutdown() {
	for _, ch := range p.inputs {
		close(ch)
	}
	p.wg.Wait()
}

// worker function blocks until a new task is pending on any of the channels and execute the above task.
// the implementation below would give higher priority for channels that are on higher priority slot.
func (p *pool) worker() {
	var t enqueuedTask
	var ok bool
	lowPrio := p.inputs[LowPriority]
	highPrio := p.inputs[HighPriority]
	defer p.wg.Done()
	for {

		select {
		case t, ok = <-highPrio:
		default:
			select {
			case t, ok = <-highPrio:
			case t, ok = <-lowPrio:
			}
		}

		if !ok {
			return
		}
		res := t.execFunc(t.arg)

		if t.out != nil {
			t.out <- res
		}
	}
}