test: add new test for streaming SCB updates

This commit is contained in:
Olaoluwa Osuntokun 2019-03-11 17:38:38 -07:00
parent acc37f7449
commit 266ddbaceb
No known key found for this signature in database
GPG Key ID: CE58F7F8E20FD9A2

@ -15,6 +15,7 @@ import (
"path/filepath" "path/filepath"
"reflect" "reflect"
"strings" "strings"
"sync"
"sync/atomic" "sync/atomic"
"testing" "testing"
"time" "time"
@ -28,6 +29,7 @@ import (
"github.com/btcsuite/btcutil" "github.com/btcsuite/btcutil"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/go-errors/errors" "github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/chanbackup"
"github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lntest" "github.com/lightningnetwork/lnd/lntest"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
@ -13101,6 +13103,194 @@ func testSweepAllCoins(net *lntest.NetworkHarness, t *harnessTest) {
} }
} }
// testChannelBackupUpdates tests that both the streaming channel update RPC,
// and the on-disk channels.backup are updated each time a channel is
// opened/closed.
func testChannelBackupUpdates(net *lntest.NetworkHarness, t *harnessTest) {
ctxb := context.Background()
// First, we'll make a temp directory that we'll use to store our
// backup file, so we can check in on it during the test easily.
backupDir, err := ioutil.TempDir("", "")
if err != nil {
t.Fatalf("unable to create backup dir: %v", err)
}
defer os.RemoveAll(backupDir)
// First, we'll create a new node, Carol. We'll also create a temporary
// file that Carol will use to store her channel backups.
backupFilePath := filepath.Join(
backupDir, chanbackup.DefaultBackupFileName,
)
carolArgs := fmt.Sprintf("--backupfilepath=%v", backupFilePath)
carol, err := net.NewNode("carol", []string{carolArgs})
if err != nil {
t.Fatalf("unable to create new node: %v", err)
}
defer shutdownAndAssert(net, t, carol)
// Next, we'll register for streaming notifications for changes to the
// backup file.
backupStream, err := carol.SubscribeChannelBackups(
ctxb, &lnrpc.ChannelBackupSubscription{},
)
if err != nil {
t.Fatalf("unable to create backup stream: %v", err)
}
// We'll use this goroutine to proxy any updates to a channel we can
// easily use below.
var wg sync.WaitGroup
backupUpdates := make(chan *lnrpc.ChanBackupSnapshot)
streamErr := make(chan error)
streamQuit := make(chan struct{})
wg.Add(1)
go func() {
defer wg.Done()
for {
snapshot, err := backupStream.Recv()
if err != nil {
select {
case streamErr <- err:
case <-streamQuit:
return
}
}
select {
case backupUpdates <- snapshot:
case <-streamQuit:
return
}
}
}()
defer close(streamQuit)
// With Carol up, we'll now connect her to Alice, and open a channel
// between them.
ctxt, _ := context.WithTimeout(ctxb, defaultTimeout)
if err := net.ConnectNodes(ctxt, carol, net.Alice); err != nil {
t.Fatalf("unable to connect carol to alice: %v", err)
}
// Next, we'll open two channels between Alice and Carol back to back.
var chanPoints []*lnrpc.ChannelPoint
numChans := 2
chanAmt := btcutil.Amount(1000000)
for i := 0; i < numChans; i++ {
ctxt, _ := context.WithTimeout(ctxb, defaultTimeout)
chanPoint := openChannelAndAssert(
ctxt, t, net, net.Alice, carol,
lntest.OpenChannelParams{
Amt: chanAmt,
},
)
chanPoints = append(chanPoints, chanPoint)
}
// Using this helper function, we'll maintain a pointe rot he latest
// channel backup so we can compare it to the on disk state.
var currentBackup *lnrpc.ChanBackupSnapshot
assertBackupNtfns := func(numNtfns int) {
for i := 0; i < numNtfns; i++ {
select {
case err := <-streamErr:
t.Fatalf("error with backup stream: %v", err)
case currentBackup = <-backupUpdates:
case <-time.After(time.Second * 5):
t.Fatalf("didn't receive channel backup "+
"notification %v", i+1)
}
}
}
// assertBackupFileState is a helper function that we'll use to compare
// the on disk back up file to our currentBackup pointer above.
assertBackupFileState := func() {
err := lntest.WaitNoError(func() error {
packedBackup, err := ioutil.ReadFile(backupFilePath)
if err != nil {
return fmt.Errorf("unable to read backup "+
"file: %v", err)
}
// As each back up file will be encrypted with a fresh
// nonce, we can't compare them directly, so instead
// we'll compare the length which is a proxy for the
// number of channels that the multi-backup contains.
rawBackup := currentBackup.MultiChanBackup.MultiChanBackup
if len(rawBackup) != len(packedBackup) {
return fmt.Errorf("backup files don't match: "+
"expected %x got %x", rawBackup, packedBackup)
}
// Additionally, we'll assert that both backups up
// returned are valid.
for i, backup := range [][]byte{rawBackup, packedBackup} {
snapshot := &lnrpc.ChanBackupSnapshot{
MultiChanBackup: &lnrpc.MultiChanBackup{
MultiChanBackup: backup,
},
}
resp, err := carol.VerifyChanBackup(ctxb, snapshot)
if err != nil {
return fmt.Errorf("unable to verify "+
"back up: %v", err)
}
if !resp.SinglesValid || !resp.MultiValid {
return fmt.Errorf("backup #%v is "+
"invalid", i)
}
}
return nil
}, time.Second*15)
if err != nil {
t.Fatalf("backup state invalid: %v", err)
}
}
// As these two channels were just open, we should've got two
// notifications for channel backups.
assertBackupNtfns(2)
// The on disk file should also exactly match the latest backup that we
// have.
assertBackupFileState()
// Next, we'll close the channels one by one. After each channel
// closure, we should get a notification, and the on-disk state should
// match this state as well.
for i := 0; i < numChans; i++ {
// To ensure force closes also trigger an update, we'll force
// close half of the channels.
forceClose := i%2 == 0
chanPoint := chanPoints[i]
ctxt, _ = context.WithTimeout(ctxb, channelCloseTimeout)
closeChannelAndAssert(
ctxt, t, net, net.Alice, chanPoint, forceClose,
)
// We should get a single notification after closing, and the
// on-disk state should match this latest notifications.
assertBackupNtfns(1)
assertBackupFileState()
// If we force closed the channel, then we'll mine enough
// blocks to ensure all outputs have been swept.
if forceClose {
cleanupForceClose(t, net, net.Alice, chanPoint)
}
}
}
type testCase struct { type testCase struct {
name string name string
test func(net *lntest.NetworkHarness, t *harnessTest) test func(net *lntest.NetworkHarness, t *harnessTest)
@ -13328,6 +13518,10 @@ var testsCases = []*testCase{
name: "send update disable channel", name: "send update disable channel",
test: testSendUpdateDisableChannel, test: testSendUpdateDisableChannel,
}, },
{
name: "streaming channel backup update",
test: testChannelBackupUpdates,
},
} }
// TestLightningNetworkDaemon performs a series of integration tests amongst a // TestLightningNetworkDaemon performs a series of integration tests amongst a