summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTsachi Herman <tsachi.herman@algorand.com>2021-12-10 16:23:35 -0500
committerGitHub <noreply@github.com>2021-12-10 16:23:35 -0500
commit18cd6cda606c7d035bed4d2cfefaf4f25d009fa3 (patch)
tree93fc8f78b6b858718cc65e8b387453d04934bc4e
parentb2ca02fe56e9fc350a3a1d8ff281c8833ca6a284 (diff)
make sure the block service is not attempting to access the ledger after being stopped. (#3303)
## Summary The block service was attempting to serve block via the http handler even after it has been stopped. This lead to undesired downstream failures in the ledger, which was shutdown as well. ## Test Plan unit test added.
-rw-r--r--rpcs/blockService.go32
-rw-r--r--rpcs/blockService_test.go61
2 files changed, 87 insertions, 6 deletions
diff --git a/rpcs/blockService.go b/rpcs/blockService.go
index 2e77aba8e..698751b25 100644
--- a/rpcs/blockService.go
+++ b/rpcs/blockService.go
@@ -19,6 +19,7 @@ package rpcs
import (
"context"
"encoding/binary"
+ "errors"
"net/http"
"path"
"strconv"
@@ -29,6 +30,8 @@ import (
"github.com/algorand/go-codec/codec"
+ "github.com/algorand/go-deadlock"
+
"github.com/algorand/go-algorand/agreement"
"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/crypto"
@@ -61,6 +64,8 @@ const (
BlockAndCertValue = "blockAndCert" // block+cert request data (as the value of requestDataTypeKey)
)
+var errBlockServiceClosed = errors.New("block service is shutting down")
+
// BlockService represents the Block RPC API
type BlockService struct {
ledger *data.Ledger
@@ -74,6 +79,7 @@ type BlockService struct {
enableArchiverFallback bool
log logging.Logger
closeWaitGroup sync.WaitGroup
+ mu deadlock.Mutex
}
// EncodedBlockCert defines how GetBlockBytes encodes a block and its certificate
@@ -118,6 +124,8 @@ func MakeBlockService(log logging.Logger, config config.Local, ledger *data.Ledg
// Start listening to catchup requests over ws
func (bs *BlockService) Start() {
+ bs.mu.Lock()
+ defer bs.mu.Unlock()
if bs.enableServiceOverGossip {
handlers := []network.TaggedMessageHandler{
{Tag: protocol.UniCatchupReqTag, MessageHandler: network.HandlerFunc(bs.processIncomingMessage)},
@@ -133,12 +141,14 @@ func (bs *BlockService) Start() {
// Stop servicing catchup requests over ws
func (bs *BlockService) Stop() {
+ bs.mu.Lock()
close(bs.stop)
+ bs.mu.Unlock()
bs.closeWaitGroup.Wait()
}
// ServerHTTP returns blocks
-// Either /v{version}/block/{round} or ?b={round}&v={version}
+// Either /v{version}/{genesisID}/block/{round} or ?b={round}&v={version}
// Uses gorilla/mux for path argument parsing.
func (bs *BlockService) ServeHTTP(response http.ResponseWriter, request *http.Request) {
pathVars := mux.Vars(request)
@@ -200,7 +210,7 @@ func (bs *BlockService) ServeHTTP(response http.ResponseWriter, request *http.Re
response.WriteHeader(http.StatusBadRequest)
return
}
- encodedBlockCert, err := RawBlockBytes(bs.ledger, basics.Round(round))
+ encodedBlockCert, err := bs.rawBlockBytes(basics.Round(round))
if err != nil {
switch err.(type) {
case ledgercore.ErrNoEntry:
@@ -321,7 +331,7 @@ func (bs *BlockService) redirectRequest(round uint64, response http.ResponseWrit
bs.log.Debugf("redirectRequest: %s", err.Error())
return false
}
- parsedURL.Path = FormatBlockQuery(round, parsedURL.Path, bs.net)
+ parsedURL.Path = strings.Replace(FormatBlockQuery(round, parsedURL.Path, bs.net), "{genesisID}", bs.genesisID, 1)
http.Redirect(response, request, parsedURL.String(), http.StatusTemporaryRedirect)
bs.log.Debugf("redirectRequest: redirected block request to %s", parsedURL.String())
return true
@@ -356,6 +366,22 @@ func (bs *BlockService) getRandomArchiver() (endpointAddress string) {
return
}
+// rawBlockBytes returns the block/cert for a given round, while taking the lock
+// to ensure the block service is currently active.
+func (bs *BlockService) rawBlockBytes(round basics.Round) ([]byte, error) {
+ bs.mu.Lock()
+ defer bs.mu.Unlock()
+ select {
+ case _, ok := <-bs.stop:
+ if !ok {
+ // service is closed.
+ return nil, errBlockServiceClosed
+ }
+ default:
+ }
+ return RawBlockBytes(bs.ledger, round)
+}
+
func topicBlockBytes(log logging.Logger, dataLedger *data.Ledger, round basics.Round, requestType string) network.Topics {
blk, cert, err := dataLedger.EncodedBlockCert(round)
if err != nil {
diff --git a/rpcs/blockService_test.go b/rpcs/blockService_test.go
index 542e8783e..828f2265a 100644
--- a/rpcs/blockService_test.go
+++ b/rpcs/blockService_test.go
@@ -19,7 +19,9 @@ package rpcs
import (
"context"
"fmt"
+ "io/ioutil"
"net/http"
+ "strings"
"testing"
"time"
@@ -118,7 +120,7 @@ func TestHandleCatchupReqNegative(t *testing.T) {
require.Equal(t, roundNumberParseErrMsg, string(val))
}
-// TestRedirectBasic tests the case when the block service redirects the request to elsewhere
+// TestRedirectFallbackArchiver tests the case when the block service fallback to another in the absense of a given block.
func TestRedirectFallbackArchiver(t *testing.T) {
partitiontest.PartitionTest(t)
@@ -136,8 +138,8 @@ func TestRedirectFallbackArchiver(t *testing.T) {
net2 := &httpTestPeerSource{}
config := config.GetDefaultLocal()
- bs1 := MakeBlockService(log, config, ledger1, net1, "{genesisID}")
- bs2 := MakeBlockService(log, config, ledger2, net2, "{genesisID}")
+ bs1 := MakeBlockService(log, config, ledger1, net1, "test-genesis-ID")
+ bs2 := MakeBlockService(log, config, ledger2, net2, "test-genesis-ID")
nodeA := &basicRPCNode{}
nodeB := &basicRPCNode{}
@@ -159,6 +161,7 @@ func TestRedirectFallbackArchiver(t *testing.T) {
ctx := context.Background()
parsedURL.Path = FormatBlockQuery(uint64(2), parsedURL.Path, net1)
+ parsedURL.Path = strings.Replace(parsedURL.Path, "{genesisID}", "test-genesis-ID", 1)
blockURL := parsedURL.String()
request, err := http.NewRequest("GET", blockURL, nil)
require.NoError(t, err)
@@ -170,6 +173,58 @@ func TestRedirectFallbackArchiver(t *testing.T) {
require.NoError(t, err)
require.Equal(t, http.StatusOK, response.StatusCode)
+ bodyData, err := ioutil.ReadAll(response.Body)
+ require.NoError(t, err)
+ require.NotEqual(t, 0, len(bodyData))
+}
+
+// TestBlockServiceShutdown tests that the block service is shutting down correctly.
+func TestBlockServiceShutdown(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ log := logging.TestingLog(t)
+
+ ledger1 := makeLedger(t, "l1")
+ addBlock(t, ledger1)
+
+ net1 := &httpTestPeerSource{}
+
+ config := config.GetDefaultLocal()
+ bs1 := MakeBlockService(log, config, ledger1, net1, "test-genesis-ID")
+ bs1.Start()
+
+ nodeA := &basicRPCNode{}
+
+ nodeA.RegisterHTTPHandler(BlockServiceBlockPath, bs1)
+ nodeA.start()
+ defer nodeA.stop()
+
+ parsedURL, err := network.ParseHostOrURL(nodeA.rootURL())
+ require.NoError(t, err)
+
+ client := http.Client{}
+
+ ctx := context.Background()
+ parsedURL.Path = FormatBlockQuery(uint64(1), parsedURL.Path, net1)
+ parsedURL.Path = strings.Replace(parsedURL.Path, "{genesisID}", "test-genesis-ID", 1)
+ blockURL := parsedURL.String()
+ request, err := http.NewRequest("GET", blockURL, nil)
+ require.NoError(t, err)
+ requestCtx, requestCancel := context.WithTimeout(ctx, time.Duration(config.CatchupHTTPBlockFetchTimeoutSec)*time.Second)
+ defer requestCancel()
+ request = request.WithContext(requestCtx)
+ network.SetUserAgentHeader(request.Header)
+
+ requestDone := make(chan struct{})
+ go func() {
+ defer close(requestDone)
+ client.Do(request)
+ }()
+
+ bs1.Stop()
+ ledger1.Close()
+
+ <-requestDone
}
// TestRedirectBasic tests the case when the block service redirects the request to elsewhere