diff options
author | Tsachi Herman <tsachi.herman@algorand.com> | 2021-12-10 16:23:35 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-12-10 16:23:35 -0500 |
commit | 18cd6cda606c7d035bed4d2cfefaf4f25d009fa3 (patch) | |
tree | 93fc8f78b6b858718cc65e8b387453d04934bc4e | |
parent | b2ca02fe56e9fc350a3a1d8ff281c8833ca6a284 (diff) |
make sure the block service is not attempting to access the ledger after being stopped. (#3303)
## Summary
The block service was attempting to serve block via the http handler even after it has been stopped.
This lead to undesired downstream failures in the ledger, which was shutdown as well.
## Test Plan
unit test added.
-rw-r--r-- | rpcs/blockService.go | 32 | ||||
-rw-r--r-- | rpcs/blockService_test.go | 61 |
2 files changed, 87 insertions, 6 deletions
diff --git a/rpcs/blockService.go b/rpcs/blockService.go index 2e77aba8e..698751b25 100644 --- a/rpcs/blockService.go +++ b/rpcs/blockService.go @@ -19,6 +19,7 @@ package rpcs import ( "context" "encoding/binary" + "errors" "net/http" "path" "strconv" @@ -29,6 +30,8 @@ import ( "github.com/algorand/go-codec/codec" + "github.com/algorand/go-deadlock" + "github.com/algorand/go-algorand/agreement" "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/crypto" @@ -61,6 +64,8 @@ const ( BlockAndCertValue = "blockAndCert" // block+cert request data (as the value of requestDataTypeKey) ) +var errBlockServiceClosed = errors.New("block service is shutting down") + // BlockService represents the Block RPC API type BlockService struct { ledger *data.Ledger @@ -74,6 +79,7 @@ type BlockService struct { enableArchiverFallback bool log logging.Logger closeWaitGroup sync.WaitGroup + mu deadlock.Mutex } // EncodedBlockCert defines how GetBlockBytes encodes a block and its certificate @@ -118,6 +124,8 @@ func MakeBlockService(log logging.Logger, config config.Local, ledger *data.Ledg // Start listening to catchup requests over ws func (bs *BlockService) Start() { + bs.mu.Lock() + defer bs.mu.Unlock() if bs.enableServiceOverGossip { handlers := []network.TaggedMessageHandler{ {Tag: protocol.UniCatchupReqTag, MessageHandler: network.HandlerFunc(bs.processIncomingMessage)}, @@ -133,12 +141,14 @@ func (bs *BlockService) Start() { // Stop servicing catchup requests over ws func (bs *BlockService) Stop() { + bs.mu.Lock() close(bs.stop) + bs.mu.Unlock() bs.closeWaitGroup.Wait() } // ServerHTTP returns blocks -// Either /v{version}/block/{round} or ?b={round}&v={version} +// Either /v{version}/{genesisID}/block/{round} or ?b={round}&v={version} // Uses gorilla/mux for path argument parsing. func (bs *BlockService) ServeHTTP(response http.ResponseWriter, request *http.Request) { pathVars := mux.Vars(request) @@ -200,7 +210,7 @@ func (bs *BlockService) ServeHTTP(response http.ResponseWriter, request *http.Re response.WriteHeader(http.StatusBadRequest) return } - encodedBlockCert, err := RawBlockBytes(bs.ledger, basics.Round(round)) + encodedBlockCert, err := bs.rawBlockBytes(basics.Round(round)) if err != nil { switch err.(type) { case ledgercore.ErrNoEntry: @@ -321,7 +331,7 @@ func (bs *BlockService) redirectRequest(round uint64, response http.ResponseWrit bs.log.Debugf("redirectRequest: %s", err.Error()) return false } - parsedURL.Path = FormatBlockQuery(round, parsedURL.Path, bs.net) + parsedURL.Path = strings.Replace(FormatBlockQuery(round, parsedURL.Path, bs.net), "{genesisID}", bs.genesisID, 1) http.Redirect(response, request, parsedURL.String(), http.StatusTemporaryRedirect) bs.log.Debugf("redirectRequest: redirected block request to %s", parsedURL.String()) return true @@ -356,6 +366,22 @@ func (bs *BlockService) getRandomArchiver() (endpointAddress string) { return } +// rawBlockBytes returns the block/cert for a given round, while taking the lock +// to ensure the block service is currently active. +func (bs *BlockService) rawBlockBytes(round basics.Round) ([]byte, error) { + bs.mu.Lock() + defer bs.mu.Unlock() + select { + case _, ok := <-bs.stop: + if !ok { + // service is closed. + return nil, errBlockServiceClosed + } + default: + } + return RawBlockBytes(bs.ledger, round) +} + func topicBlockBytes(log logging.Logger, dataLedger *data.Ledger, round basics.Round, requestType string) network.Topics { blk, cert, err := dataLedger.EncodedBlockCert(round) if err != nil { diff --git a/rpcs/blockService_test.go b/rpcs/blockService_test.go index 542e8783e..828f2265a 100644 --- a/rpcs/blockService_test.go +++ b/rpcs/blockService_test.go @@ -19,7 +19,9 @@ package rpcs import ( "context" "fmt" + "io/ioutil" "net/http" + "strings" "testing" "time" @@ -118,7 +120,7 @@ func TestHandleCatchupReqNegative(t *testing.T) { require.Equal(t, roundNumberParseErrMsg, string(val)) } -// TestRedirectBasic tests the case when the block service redirects the request to elsewhere +// TestRedirectFallbackArchiver tests the case when the block service fallback to another in the absense of a given block. func TestRedirectFallbackArchiver(t *testing.T) { partitiontest.PartitionTest(t) @@ -136,8 +138,8 @@ func TestRedirectFallbackArchiver(t *testing.T) { net2 := &httpTestPeerSource{} config := config.GetDefaultLocal() - bs1 := MakeBlockService(log, config, ledger1, net1, "{genesisID}") - bs2 := MakeBlockService(log, config, ledger2, net2, "{genesisID}") + bs1 := MakeBlockService(log, config, ledger1, net1, "test-genesis-ID") + bs2 := MakeBlockService(log, config, ledger2, net2, "test-genesis-ID") nodeA := &basicRPCNode{} nodeB := &basicRPCNode{} @@ -159,6 +161,7 @@ func TestRedirectFallbackArchiver(t *testing.T) { ctx := context.Background() parsedURL.Path = FormatBlockQuery(uint64(2), parsedURL.Path, net1) + parsedURL.Path = strings.Replace(parsedURL.Path, "{genesisID}", "test-genesis-ID", 1) blockURL := parsedURL.String() request, err := http.NewRequest("GET", blockURL, nil) require.NoError(t, err) @@ -170,6 +173,58 @@ func TestRedirectFallbackArchiver(t *testing.T) { require.NoError(t, err) require.Equal(t, http.StatusOK, response.StatusCode) + bodyData, err := ioutil.ReadAll(response.Body) + require.NoError(t, err) + require.NotEqual(t, 0, len(bodyData)) +} + +// TestBlockServiceShutdown tests that the block service is shutting down correctly. +func TestBlockServiceShutdown(t *testing.T) { + partitiontest.PartitionTest(t) + + log := logging.TestingLog(t) + + ledger1 := makeLedger(t, "l1") + addBlock(t, ledger1) + + net1 := &httpTestPeerSource{} + + config := config.GetDefaultLocal() + bs1 := MakeBlockService(log, config, ledger1, net1, "test-genesis-ID") + bs1.Start() + + nodeA := &basicRPCNode{} + + nodeA.RegisterHTTPHandler(BlockServiceBlockPath, bs1) + nodeA.start() + defer nodeA.stop() + + parsedURL, err := network.ParseHostOrURL(nodeA.rootURL()) + require.NoError(t, err) + + client := http.Client{} + + ctx := context.Background() + parsedURL.Path = FormatBlockQuery(uint64(1), parsedURL.Path, net1) + parsedURL.Path = strings.Replace(parsedURL.Path, "{genesisID}", "test-genesis-ID", 1) + blockURL := parsedURL.String() + request, err := http.NewRequest("GET", blockURL, nil) + require.NoError(t, err) + requestCtx, requestCancel := context.WithTimeout(ctx, time.Duration(config.CatchupHTTPBlockFetchTimeoutSec)*time.Second) + defer requestCancel() + request = request.WithContext(requestCtx) + network.SetUserAgentHeader(request.Header) + + requestDone := make(chan struct{}) + go func() { + defer close(requestDone) + client.Do(request) + }() + + bs1.Stop() + ledger1.Close() + + <-requestDone } // TestRedirectBasic tests the case when the block service redirects the request to elsewhere |