discovery: use error channels with test goroutines
This commit is contained in:
parent
20a5ee2f1e
commit
58c23074d1
@ -2,6 +2,7 @@ package discovery
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"reflect"
|
"reflect"
|
||||||
"sync"
|
"sync"
|
||||||
@ -295,19 +296,20 @@ func TestGossipSyncerFilterGossipMsgsAllInMemory(t *testing.T) {
|
|||||||
// Before we send off the query, we'll ensure we send the missing
|
// Before we send off the query, we'll ensure we send the missing
|
||||||
// channel update for that final ann. It will be below the horizon, so
|
// channel update for that final ann. It will be below the horizon, so
|
||||||
// shouldn't be sent anyway.
|
// shouldn't be sent anyway.
|
||||||
|
errCh := make(chan error, 1)
|
||||||
go func() {
|
go func() {
|
||||||
select {
|
select {
|
||||||
case <-time.After(time.Second * 15):
|
case <-time.After(time.Second * 15):
|
||||||
t.Fatalf("no query recvd")
|
errCh <- errors.New("no query received")
|
||||||
|
return
|
||||||
case query := <-chanSeries.updateReq:
|
case query := <-chanSeries.updateReq:
|
||||||
|
|
||||||
// It should be asking for the chan updates of short
|
// It should be asking for the chan updates of short
|
||||||
// chan ID 25.
|
// chan ID 25.
|
||||||
expectedID := lnwire.NewShortChanIDFromInt(25)
|
expectedID := lnwire.NewShortChanIDFromInt(25)
|
||||||
if expectedID != query {
|
if expectedID != query {
|
||||||
t.Fatalf("wrong query id: expected %v, got %v",
|
errCh <- fmt.Errorf("wrong query id: expected %v, got %v",
|
||||||
expectedID, query)
|
expectedID, query)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// If so, then we'll send back the missing update.
|
// If so, then we'll send back the missing update.
|
||||||
@ -317,6 +319,7 @@ func TestGossipSyncerFilterGossipMsgsAllInMemory(t *testing.T) {
|
|||||||
Timestamp: unixStamp(5),
|
Timestamp: unixStamp(5),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
errCh <- nil
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -335,6 +338,16 @@ func TestGossipSyncerFilterGossipMsgsAllInMemory(t *testing.T) {
|
|||||||
"messages: %v", len(msgs), spew.Sdump(msgs))
|
"messages: %v", len(msgs), spew.Sdump(msgs))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wait for error from goroutine.
|
||||||
|
select {
|
||||||
|
case <-time.After(time.Second * 30):
|
||||||
|
t.Fatalf("goroutine did not return within 30 seconds")
|
||||||
|
case err := <-errCh:
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestGossipSyncerApplyNoHistoricalGossipFilter tests that once a gossip filter
|
// TestGossipSyncerApplyNoHistoricalGossipFilter tests that once a gossip filter
|
||||||
@ -418,23 +431,26 @@ func TestGossipSyncerApplyGossipFilter(t *testing.T) {
|
|||||||
|
|
||||||
// Before we apply the horizon, we'll dispatch a response to the query
|
// Before we apply the horizon, we'll dispatch a response to the query
|
||||||
// that the syncer will issue.
|
// that the syncer will issue.
|
||||||
|
errCh := make(chan error, 1)
|
||||||
go func() {
|
go func() {
|
||||||
select {
|
select {
|
||||||
case <-time.After(time.Second * 15):
|
case <-time.After(time.Second * 15):
|
||||||
t.Fatalf("no query recvd")
|
errCh <- errors.New("no query recvd")
|
||||||
|
return
|
||||||
case query := <-chanSeries.horizonReq:
|
case query := <-chanSeries.horizonReq:
|
||||||
// The syncer should have translated the time range
|
// The syncer should have translated the time range
|
||||||
// into the proper star time.
|
// into the proper star time.
|
||||||
if remoteHorizon.FirstTimestamp != uint32(query.start.Unix()) {
|
if remoteHorizon.FirstTimestamp != uint32(query.start.Unix()) {
|
||||||
t.Fatalf("wrong query stamp: expected %v, got %v",
|
errCh <- fmt.Errorf("wrong query stamp: expected %v, got %v",
|
||||||
remoteHorizon.FirstTimestamp, query.start)
|
remoteHorizon.FirstTimestamp, query.start)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// For this first response, we'll send back an empty
|
// For this first response, we'll send back an empty
|
||||||
// set of messages. As result, we shouldn't send any
|
// set of messages. As result, we shouldn't send any
|
||||||
// messages.
|
// messages.
|
||||||
chanSeries.horizonResp <- []lnwire.Message{}
|
chanSeries.horizonResp <- []lnwire.Message{}
|
||||||
|
errCh <- nil
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -452,19 +468,30 @@ func TestGossipSyncerApplyGossipFilter(t *testing.T) {
|
|||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wait for error result from goroutine.
|
||||||
|
select {
|
||||||
|
case <-time.After(time.Second * 30):
|
||||||
|
t.Fatalf("goroutine did not return within 30 seconds")
|
||||||
|
case err := <-errCh:
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// If we repeat the process, but give the syncer a set of valid
|
// If we repeat the process, but give the syncer a set of valid
|
||||||
// messages, then these should be sent to the remote peer.
|
// messages, then these should be sent to the remote peer.
|
||||||
go func() {
|
go func() {
|
||||||
select {
|
select {
|
||||||
case <-time.After(time.Second * 15):
|
case <-time.After(time.Second * 15):
|
||||||
t.Fatalf("no query recvd")
|
errCh <- errors.New("no query recvd")
|
||||||
|
return
|
||||||
case query := <-chanSeries.horizonReq:
|
case query := <-chanSeries.horizonReq:
|
||||||
// The syncer should have translated the time range
|
// The syncer should have translated the time range
|
||||||
// into the proper star time.
|
// into the proper star time.
|
||||||
if remoteHorizon.FirstTimestamp != uint32(query.start.Unix()) {
|
if remoteHorizon.FirstTimestamp != uint32(query.start.Unix()) {
|
||||||
t.Fatalf("wrong query stamp: expected %v, got %v",
|
errCh <- fmt.Errorf("wrong query stamp: expected %v, got %v",
|
||||||
remoteHorizon.FirstTimestamp, query.start)
|
remoteHorizon.FirstTimestamp, query.start)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// For this first response, we'll send back a proper
|
// For this first response, we'll send back a proper
|
||||||
@ -475,6 +502,7 @@ func TestGossipSyncerApplyGossipFilter(t *testing.T) {
|
|||||||
Timestamp: unixStamp(5),
|
Timestamp: unixStamp(5),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
errCh <- nil
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
err = syncer.ApplyGossipFilter(remoteHorizon)
|
err = syncer.ApplyGossipFilter(remoteHorizon)
|
||||||
@ -493,6 +521,16 @@ func TestGossipSyncerApplyGossipFilter(t *testing.T) {
|
|||||||
1, len(msgs))
|
1, len(msgs))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wait for error result from goroutine.
|
||||||
|
select {
|
||||||
|
case <-time.After(time.Second * 30):
|
||||||
|
t.Fatalf("goroutine did not return within 30 seconds")
|
||||||
|
case err := <-errCh:
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestGossipSyncerReplyShortChanIDsWrongChainHash tests that if we get a chan
|
// TestGossipSyncerReplyShortChanIDsWrongChainHash tests that if we get a chan
|
||||||
@ -578,21 +616,24 @@ func TestGossipSyncerReplyShortChanIDs(t *testing.T) {
|
|||||||
|
|
||||||
// We'll then craft a reply to the upcoming query for all the matching
|
// We'll then craft a reply to the upcoming query for all the matching
|
||||||
// channel announcements for a particular set of short channel ID's.
|
// channel announcements for a particular set of short channel ID's.
|
||||||
|
errCh := make(chan error, 1)
|
||||||
go func() {
|
go func() {
|
||||||
select {
|
select {
|
||||||
case <-time.After(time.Second * 15):
|
case <-time.After(time.Second * 15):
|
||||||
t.Fatalf("no query recvd")
|
errCh <- errors.New("no query recvd")
|
||||||
|
return
|
||||||
case chanIDs := <-chanSeries.annReq:
|
case chanIDs := <-chanSeries.annReq:
|
||||||
// The set of chan ID's should match exactly.
|
// The set of chan ID's should match exactly.
|
||||||
if !reflect.DeepEqual(chanIDs, queryChanIDs) {
|
if !reflect.DeepEqual(chanIDs, queryChanIDs) {
|
||||||
t.Fatalf("wrong chan IDs: expected %v, got %v",
|
errCh <- fmt.Errorf("wrong chan IDs: expected %v, got %v",
|
||||||
queryChanIDs, chanIDs)
|
queryChanIDs, chanIDs)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// If they do, then we'll send back a response with
|
// If they do, then we'll send back a response with
|
||||||
// some canned messages.
|
// some canned messages.
|
||||||
chanSeries.annResp <- queryReply
|
chanSeries.annResp <- queryReply
|
||||||
|
errCh <- nil
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -638,6 +679,16 @@ func TestGossipSyncerReplyShortChanIDs(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wait for error from goroutine.
|
||||||
|
select {
|
||||||
|
case <-time.After(time.Second * 30):
|
||||||
|
t.Fatalf("goroutine did not return within 30 seconds")
|
||||||
|
case err := <-errCh:
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestGossipSyncerReplyChanRangeQuery tests that if we receive a
|
// TestGossipSyncerReplyChanRangeQuery tests that if we receive a
|
||||||
@ -673,20 +724,24 @@ func TestGossipSyncerReplyChanRangeQuery(t *testing.T) {
|
|||||||
lnwire.NewShortChanIDFromInt(4),
|
lnwire.NewShortChanIDFromInt(4),
|
||||||
lnwire.NewShortChanIDFromInt(5),
|
lnwire.NewShortChanIDFromInt(5),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
errCh := make(chan error, 1)
|
||||||
go func() {
|
go func() {
|
||||||
select {
|
select {
|
||||||
case <-time.After(time.Second * 15):
|
case <-time.After(time.Second * 15):
|
||||||
t.Fatalf("no query recvd")
|
errCh <- errors.New("no query recvd")
|
||||||
|
return
|
||||||
case filterReq := <-chanSeries.filterRangeReqs:
|
case filterReq := <-chanSeries.filterRangeReqs:
|
||||||
// We should be querying for block 100 to 150.
|
// We should be querying for block 100 to 150.
|
||||||
if filterReq.startHeight != 100 && filterReq.endHeight != 150 {
|
if filterReq.startHeight != 100 && filterReq.endHeight != 150 {
|
||||||
t.Fatalf("wrong height range: %v", spew.Sdump(filterReq))
|
errCh <- fmt.Errorf("wrong height range: %v", spew.Sdump(filterReq))
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the proper request was sent, then we'll respond
|
// If the proper request was sent, then we'll respond
|
||||||
// with our set of short channel ID's.
|
// with our set of short channel ID's.
|
||||||
chanSeries.filterRangeResp <- resp
|
chanSeries.filterRangeResp <- resp
|
||||||
|
errCh <- nil
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -738,6 +793,16 @@ func TestGossipSyncerReplyChanRangeQuery(t *testing.T) {
|
|||||||
t.Fatalf("mismatched response: expected %v, got %v",
|
t.Fatalf("mismatched response: expected %v, got %v",
|
||||||
spew.Sdump(resp), spew.Sdump(respMsgs))
|
spew.Sdump(resp), spew.Sdump(respMsgs))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wait for error from goroutine.
|
||||||
|
select {
|
||||||
|
case <-time.After(time.Second * 30):
|
||||||
|
t.Fatalf("goroutine did not return within 30 seconds")
|
||||||
|
case err := <-errCh:
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestGossipSyncerReplyChanRangeQueryNoNewChans tests that if we issue a reply
|
// TestGossipSyncerReplyChanRangeQueryNoNewChans tests that if we issue a reply
|
||||||
@ -762,21 +827,23 @@ func TestGossipSyncerReplyChanRangeQueryNoNewChans(t *testing.T) {
|
|||||||
|
|
||||||
// We'll then launch a goroutine to reply to the query no new channels.
|
// We'll then launch a goroutine to reply to the query no new channels.
|
||||||
resp := []lnwire.ShortChannelID{}
|
resp := []lnwire.ShortChannelID{}
|
||||||
|
errCh := make(chan error, 1)
|
||||||
go func() {
|
go func() {
|
||||||
select {
|
select {
|
||||||
case <-time.After(time.Second * 15):
|
case <-time.After(time.Second * 15):
|
||||||
t.Fatalf("no query recvd")
|
errCh <- errors.New("no query recvd")
|
||||||
|
return
|
||||||
case filterReq := <-chanSeries.filterRangeReqs:
|
case filterReq := <-chanSeries.filterRangeReqs:
|
||||||
// We should be querying for block 100 to 150.
|
// We should be querying for block 100 to 150.
|
||||||
if filterReq.startHeight != 100 && filterReq.endHeight != 150 {
|
if filterReq.startHeight != 100 && filterReq.endHeight != 150 {
|
||||||
t.Fatalf("wrong height range: %v",
|
errCh <- fmt.Errorf("wrong height range: %v",
|
||||||
spew.Sdump(filterReq))
|
spew.Sdump(filterReq))
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the proper request was sent, then we'll respond
|
// If the proper request was sent, then we'll respond
|
||||||
// with our blank set of short chan ID's.
|
// with our blank set of short chan ID's.
|
||||||
chanSeries.filterRangeResp <- resp
|
chanSeries.filterRangeResp <- resp
|
||||||
|
errCh <- nil
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -806,6 +873,16 @@ func TestGossipSyncerReplyChanRangeQueryNoNewChans(t *testing.T) {
|
|||||||
t.Fatalf("complete wasn't set")
|
t.Fatalf("complete wasn't set")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wait for error from goroutine.
|
||||||
|
select {
|
||||||
|
case <-time.After(time.Second * 30):
|
||||||
|
t.Fatalf("goroutine did not return within 30 seconds")
|
||||||
|
case err := <-errCh:
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestGossipSyncerGenChanRangeQuery tests that given the current best known
|
// TestGossipSyncerGenChanRangeQuery tests that given the current best known
|
||||||
@ -913,21 +990,25 @@ func TestGossipSyncerProcessChanRangeReply(t *testing.T) {
|
|||||||
|
|
||||||
// As we're about to send the final response, we'll launch a goroutine
|
// As we're about to send the final response, we'll launch a goroutine
|
||||||
// to respond back with a filtered set of chan ID's.
|
// to respond back with a filtered set of chan ID's.
|
||||||
|
errCh := make(chan error, 1)
|
||||||
go func() {
|
go func() {
|
||||||
select {
|
select {
|
||||||
case <-time.After(time.Second * 15):
|
case <-time.After(time.Second * 15):
|
||||||
t.Fatalf("no query recvd")
|
errCh <- errors.New("no query received")
|
||||||
|
return
|
||||||
|
|
||||||
case req := <-chanSeries.filterReq:
|
case req := <-chanSeries.filterReq:
|
||||||
// We should get a request for the entire range of short
|
// We should get a request for the entire range of short
|
||||||
// chan ID's.
|
// chan ID's.
|
||||||
if !reflect.DeepEqual(expectedReq, req) {
|
if !reflect.DeepEqual(expectedReq, req) {
|
||||||
t.Fatalf("wrong request: expected %v, got %v",
|
errCh <- fmt.Errorf("wrong request: expected %v, got %v",
|
||||||
expectedReq, req)
|
expectedReq, req)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// We'll send back only the last two to simulate filtering.
|
// We'll send back only the last two to simulate filtering.
|
||||||
chanSeries.filterResp <- expectedReq[1:]
|
chanSeries.filterResp <- expectedReq[1:]
|
||||||
|
errCh <- nil
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -946,24 +1027,37 @@ func TestGossipSyncerProcessChanRangeReply(t *testing.T) {
|
|||||||
syncer.newChansToQuery, expectedReq[1:])
|
syncer.newChansToQuery, expectedReq[1:])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wait for error from goroutine.
|
||||||
|
select {
|
||||||
|
case <-time.After(time.Second * 30):
|
||||||
|
t.Fatalf("goroutine did not return within 30 seconds")
|
||||||
|
case err := <-errCh:
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// We'll repeat our final reply again, but this time we won't send any
|
// We'll repeat our final reply again, but this time we won't send any
|
||||||
// new channels. As a result, we should transition over to the
|
// new channels. As a result, we should transition over to the
|
||||||
// chansSynced state.
|
// chansSynced state.
|
||||||
|
errCh = make(chan error, 1)
|
||||||
go func() {
|
go func() {
|
||||||
select {
|
select {
|
||||||
case <-time.After(time.Second * 15):
|
case <-time.After(time.Second * 15):
|
||||||
t.Fatalf("no query recvd")
|
errCh <- errors.New("no query received")
|
||||||
|
return
|
||||||
case req := <-chanSeries.filterReq:
|
case req := <-chanSeries.filterReq:
|
||||||
// We should get a request for the entire range of short
|
// We should get a request for the entire range of short
|
||||||
// chan ID's.
|
// chan ID's.
|
||||||
if !reflect.DeepEqual(expectedReq[2], req[0]) {
|
if !reflect.DeepEqual(expectedReq[2], req[0]) {
|
||||||
t.Fatalf("wrong request: expected %v, got %v",
|
errCh <- fmt.Errorf("wrong request: expected %v, got %v",
|
||||||
expectedReq[2], req[0])
|
expectedReq[2], req[0])
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// We'll send back only the last two to simulate filtering.
|
// We'll send back only the last two to simulate filtering.
|
||||||
chanSeries.filterResp <- []lnwire.ShortChannelID{}
|
chanSeries.filterResp <- []lnwire.ShortChannelID{}
|
||||||
|
errCh <- nil
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
if err := syncer.processChanRangeReply(replies[2]); err != nil {
|
if err := syncer.processChanRangeReply(replies[2]); err != nil {
|
||||||
@ -974,6 +1068,16 @@ func TestGossipSyncerProcessChanRangeReply(t *testing.T) {
|
|||||||
t.Fatalf("wrong state: expected %v instead got %v",
|
t.Fatalf("wrong state: expected %v instead got %v",
|
||||||
chansSynced, syncer.state)
|
chansSynced, syncer.state)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wait for error from goroutine.
|
||||||
|
select {
|
||||||
|
case <-time.After(time.Second * 30):
|
||||||
|
t.Fatalf("goroutine did not return within 30 seconds")
|
||||||
|
case err := <-errCh:
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestGossipSyncerSynchronizeChanIDs tests that we properly request chunks of
|
// TestGossipSyncerSynchronizeChanIDs tests that we properly request chunks of
|
||||||
|
Loading…
Reference in New Issue
Block a user