summaryrefslogtreecommitdiff
path: root/network/multiplexer.go
blob: 0e97d63f28e78679383d5e6484b3b6bbb4ee200b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
// Copyright (C) 2019-2024 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand.  If not, see <https://www.gnu.org/licenses/>.

package network

import (
	"fmt"
	"sync/atomic"
)

// Multiplexer is a message handler that sorts incoming messages by Tag and passes
// them along to the relevant message handler for that type of message.
type Multiplexer struct {
	msgHandlers atomic.Value // stores map[Tag]MessageHandler, an immutable map.
}

// MakeMultiplexer creates an empty Multiplexer
func MakeMultiplexer() *Multiplexer {
	m := &Multiplexer{}
	m.ClearHandlers([]Tag{}) // allocate the map
	return m
}

// getHandlersMap retrieves the handlers map.
func (m *Multiplexer) getHandlersMap() map[Tag]MessageHandler {
	handlersVal := m.msgHandlers.Load()
	if handlers, valid := handlersVal.(map[Tag]MessageHandler); valid {
		return handlers
	}
	return nil
}

// Retrives the handler for the given message Tag from the handlers array while taking a read lock.
func (m *Multiplexer) getHandler(tag Tag) (MessageHandler, bool) {
	if handlers := m.getHandlersMap(); handlers != nil {
		handler, ok := handlers[tag]
		return handler, ok
	}
	return nil, false
}

// Handle is the "input" side of the multiplexer. It dispatches the message to the previously defined handler.
func (m *Multiplexer) Handle(msg IncomingMessage) OutgoingMessage {
	handler, ok := m.getHandler(msg.Tag)

	if ok {
		outmsg := handler.Handle(msg)
		return outmsg
	}
	return OutgoingMessage{}
}

// RegisterHandlers registers the set of given message handlers.
func (m *Multiplexer) RegisterHandlers(dispatch []TaggedMessageHandler) {
	mp := make(map[Tag]MessageHandler)
	if existingMap := m.getHandlersMap(); existingMap != nil {
		for k, v := range existingMap {
			mp[k] = v
		}
	}
	for _, v := range dispatch {
		if _, has := mp[v.Tag]; has {
			panic(fmt.Sprintf("Already registered a handler for tag %v", v.Tag))
		}
		mp[v.Tag] = v.MessageHandler
	}
	m.msgHandlers.Store(mp)
}

// ClearHandlers deregisters all the existing message handlers other than the one provided in the excludeTags list
func (m *Multiplexer) ClearHandlers(excludeTags []Tag) {
	if len(excludeTags) == 0 {
		m.msgHandlers.Store(make(map[Tag]MessageHandler))
		return
	}

	// convert into map, so that we can exclude duplicates.
	excludeTagsMap := make(map[Tag]bool)
	for _, tag := range excludeTags {
		excludeTagsMap[tag] = true
	}

	currentHandlersMap := m.getHandlersMap()
	newMap := make(map[Tag]MessageHandler, len(excludeTagsMap))
	for tag, handler := range currentHandlersMap {
		if excludeTagsMap[tag] {
			newMap[tag] = handler
		}
	}

	m.msgHandlers.Store(newMap)
}