5a678d5beb
This commit adds a callback to the RestartNode method on the network harness in order to allow test authors to execute arbitrary logic in-between the restart process for the node.
931 lines
26 KiB
Go
931 lines
26 KiB
Go
package main
|
|
|
|
import (
|
|
"encoding/hex"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"log"
|
|
"net"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"runtime"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"golang.org/x/net/context"
|
|
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/grpclog"
|
|
|
|
"bytes"
|
|
|
|
"github.com/go-errors/errors"
|
|
"github.com/lightningnetwork/lnd/lnrpc"
|
|
"github.com/roasbeef/btcd/chaincfg"
|
|
"github.com/roasbeef/btcd/rpctest"
|
|
"github.com/roasbeef/btcd/txscript"
|
|
"github.com/roasbeef/btcd/wire"
|
|
"github.com/roasbeef/btcrpcclient"
|
|
"github.com/roasbeef/btcutil"
|
|
)
|
|
|
|
var (
|
|
// numActiveNodes is the number of active nodes within the test network.
|
|
numActiveNodes = 0
|
|
|
|
// defaultNodePort is the initial p2p port which will be used by the
|
|
// first created lightning node to listen on for incoming p2p
|
|
// connections. Subsequent allocated ports for future lighting nodes
|
|
// instances will be monotonically increasing odd numbers calculated as
|
|
// such: defaultP2pPort + (2 * harness.nodeNum).
|
|
defaultNodePort = 19555
|
|
|
|
// defaultClientPort is the initial rpc port which will be used by the
|
|
// first created lightning node to listen on for incoming rpc
|
|
// connections. Subsequent allocated ports for future rpc harness
|
|
// instances will be monotonically increasing even numbers calculated
|
|
// as such: defaultP2pPort + (2 * harness.nodeNum).
|
|
defaultClientPort = 19556
|
|
|
|
harnessNetParams = &chaincfg.SimNetParams
|
|
)
|
|
|
|
// generateListeningPorts returns two strings representing ports to listen on
|
|
// designated for the current lightning network test. If there haven't been any
|
|
// test instances created, the default ports are used. Otherwise, in order to
|
|
// support multiple test nodes running at once, the p2p and rpc port are
|
|
// incremented after each initialization.
|
|
func generateListeningPorts() (int, int) {
|
|
var p2p, rpc int
|
|
if numActiveNodes == 0 {
|
|
p2p = defaultNodePort
|
|
rpc = defaultClientPort
|
|
} else {
|
|
p2p = defaultNodePort + (2 * numActiveNodes)
|
|
rpc = defaultClientPort + (2 * numActiveNodes)
|
|
}
|
|
|
|
return p2p, rpc
|
|
}
|
|
|
|
// lightningNode represents an instance of lnd running within our test network
|
|
// harness. Each lightningNode instance also fully embedds an RPC client in
|
|
// order to pragmatically drive the node.
|
|
type lightningNode struct {
|
|
cfg *config
|
|
|
|
rpcAddr string
|
|
p2pAddr string
|
|
rpcCert []byte
|
|
|
|
nodeId int
|
|
|
|
// PubKey is the serialized compressed identity public key of the node.
|
|
// This field will only be populated once the node itself has been
|
|
// started via the start() method.
|
|
PubKey [33]byte
|
|
PubKeyStr string
|
|
|
|
cmd *exec.Cmd
|
|
pidFile string
|
|
|
|
// processExit is a channel that's closed once it's detected that the
|
|
// process this instance of lightningNode is bound to has exited.
|
|
processExit chan struct{}
|
|
|
|
extraArgs []string
|
|
|
|
lnrpc.LightningClient
|
|
}
|
|
|
|
// newLightningNode creates a new test lightning node instance from the passed
|
|
// rpc config and slice of extra arguments.
|
|
func newLightningNode(rpcConfig *btcrpcclient.ConnConfig, lndArgs []string) (*lightningNode, error) {
|
|
var err error
|
|
|
|
cfg := &config{
|
|
RPCHost: "127.0.0.1",
|
|
RPCUser: rpcConfig.User,
|
|
RPCPass: rpcConfig.Pass,
|
|
}
|
|
|
|
nodeNum := numActiveNodes
|
|
cfg.DataDir, err = ioutil.TempDir("", "lndtest-data")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cfg.LogDir, err = ioutil.TempDir("", "lndtest-log")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
cfg.PeerPort, cfg.RPCPort = generateListeningPorts()
|
|
|
|
numActiveNodes++
|
|
|
|
return &lightningNode{
|
|
cfg: cfg,
|
|
p2pAddr: net.JoinHostPort("127.0.0.1", strconv.Itoa(cfg.PeerPort)),
|
|
rpcAddr: net.JoinHostPort("127.0.0.1", strconv.Itoa(cfg.RPCPort)),
|
|
rpcCert: rpcConfig.Certificates,
|
|
nodeId: nodeNum,
|
|
processExit: make(chan struct{}),
|
|
extraArgs: lndArgs,
|
|
}, nil
|
|
}
|
|
|
|
// genArgs generates a slice of command line arguments from the lightningNode's
|
|
// current config struct.
|
|
func (l *lightningNode) genArgs() []string {
|
|
var args []string
|
|
|
|
encodedCert := hex.EncodeToString(l.rpcCert)
|
|
args = append(args, fmt.Sprintf("--btcdhost=%v", l.cfg.RPCHost))
|
|
args = append(args, fmt.Sprintf("--rpcuser=%v", l.cfg.RPCUser))
|
|
args = append(args, fmt.Sprintf("--rpcpass=%v", l.cfg.RPCPass))
|
|
args = append(args, fmt.Sprintf("--rawrpccert=%v", encodedCert))
|
|
args = append(args, fmt.Sprintf("--rpcport=%v", l.cfg.RPCPort))
|
|
args = append(args, fmt.Sprintf("--peerport=%v", l.cfg.PeerPort))
|
|
args = append(args, fmt.Sprintf("--logdir=%v", l.cfg.LogDir))
|
|
args = append(args, fmt.Sprintf("--datadir=%v", l.cfg.DataDir))
|
|
args = append(args, fmt.Sprintf("--simnet"))
|
|
|
|
if l.extraArgs != nil {
|
|
args = append(args, l.extraArgs...)
|
|
}
|
|
|
|
return args
|
|
}
|
|
|
|
// start launches a new process running lnd. Additionally, the PID of the
|
|
// launched process is saved in order to possibly kill the process forcibly
|
|
// later.
|
|
func (l *lightningNode) start(lndError chan error) error {
|
|
args := l.genArgs()
|
|
|
|
l.cmd = exec.Command("lnd", args...)
|
|
|
|
// Redirect stderr output to buffer
|
|
var errb bytes.Buffer
|
|
l.cmd.Stderr = &errb
|
|
|
|
if err := l.cmd.Start(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Launch a new goroutine which that bubbles up any potential fatal
|
|
// process errors to the goroutine running the tests.
|
|
go func() {
|
|
if err := l.cmd.Wait(); err != nil {
|
|
lndError <- errors.New(errb.String())
|
|
}
|
|
|
|
// Signal any onlookers that this process has exited.
|
|
close(l.processExit)
|
|
}()
|
|
|
|
pid, err := os.Create(filepath.Join(l.cfg.DataDir,
|
|
fmt.Sprintf("%v.pid", l.nodeId)))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
l.pidFile = pid.Name()
|
|
if _, err = fmt.Fprintf(pid, "%v\n", l.cmd.Process.Pid); err != nil {
|
|
return err
|
|
}
|
|
if err := pid.Close(); err != nil {
|
|
return err
|
|
}
|
|
|
|
opts := []grpc.DialOption{
|
|
grpc.WithInsecure(),
|
|
grpc.WithBlock(),
|
|
grpc.WithTimeout(time.Second * 20),
|
|
}
|
|
conn, err := grpc.Dial(l.rpcAddr, opts...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
l.LightningClient = lnrpc.NewLightningClient(conn)
|
|
|
|
// Obtain the lnid of this node for quick identification purposes.
|
|
ctxb := context.Background()
|
|
info, err := l.GetInfo(ctxb, &lnrpc.GetInfoRequest{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
l.PubKeyStr = info.IdentityPubkey
|
|
|
|
pubkey, err := hex.DecodeString(info.IdentityPubkey)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
copy(l.PubKey[:], pubkey)
|
|
|
|
return nil
|
|
}
|
|
|
|
// cleanup cleans up all the temporary files created by the node's process.
|
|
func (l *lightningNode) cleanup() error {
|
|
dirs := []string{
|
|
l.cfg.LogDir,
|
|
l.cfg.DataDir,
|
|
}
|
|
|
|
var err error
|
|
for _, dir := range dirs {
|
|
if err = os.RemoveAll(dir); err != nil {
|
|
log.Printf("Cannot remove dir %s: %v", dir, err)
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
// stop attempts to stop the active lnd process.
|
|
func (l *lightningNode) stop() error {
|
|
// We should skip node stop in case:
|
|
// - start of the node wasn't initiated
|
|
// - process wasn't spawned
|
|
// - process already finished
|
|
processFinished := l.cmd.ProcessState != nil &&
|
|
l.cmd.ProcessState.Exited()
|
|
if l.cmd == nil || l.cmd.Process == nil || processFinished {
|
|
return nil
|
|
}
|
|
|
|
if runtime.GOOS == "windows" {
|
|
return l.cmd.Process.Signal(os.Kill)
|
|
}
|
|
return l.cmd.Process.Signal(os.Interrupt)
|
|
}
|
|
|
|
// restart attempts to restart a lightning node by shutting it down cleanly,
|
|
// then restarting the process. This function is fully blocking. Upon restart,
|
|
// the RPC connection to the node will be re-attempted, continuing iff the
|
|
// connection attempt is successful. Additionally, if a callback is passed, the
|
|
// closure will be executed after the node has been shutdown, but before the
|
|
// process has been started up again.
|
|
func (l *lightningNode) restart(errChan chan error, callback func() error) error {
|
|
if err := l.stop(); err != nil {
|
|
return nil
|
|
}
|
|
|
|
<-l.processExit
|
|
|
|
l.processExit = make(chan struct{})
|
|
|
|
if callback != nil {
|
|
if err := callback(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return l.start(errChan)
|
|
}
|
|
|
|
// shutdown stops the active lnd process and clean up any temporary directories
|
|
// created along the way.
|
|
func (l *lightningNode) shutdown() error {
|
|
if err := l.stop(); err != nil {
|
|
return err
|
|
}
|
|
if err := l.cleanup(); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// networkHarness is an integration testing harness for the lightning network.
|
|
// The harness by default is created with two active nodes on the network:
|
|
// Alice and Bob.
|
|
type networkHarness struct {
|
|
rpcConfig btcrpcclient.ConnConfig
|
|
netParams *chaincfg.Params
|
|
Miner *rpctest.Harness
|
|
|
|
activeNodes map[int]*lightningNode
|
|
|
|
// Alice and Bob are the initial seeder nodes that are automatically
|
|
// created to be the initial participants of the test network.
|
|
Alice *lightningNode
|
|
Bob *lightningNode
|
|
|
|
seenTxns chan wire.ShaHash
|
|
watchRequests chan *watchRequest
|
|
|
|
// Channel for transmitting stderr output from failed lightning node
|
|
// to main process.
|
|
lndErrorChan chan error
|
|
|
|
sync.Mutex
|
|
}
|
|
|
|
// newNetworkHarness creates a new network test harness.
|
|
// TODO(roasbeef): add option to use golang's build library to a binary of the
|
|
// current repo. This'll save developers from having to manually `go install`
|
|
// within the repo each time before changes
|
|
func newNetworkHarness() (*networkHarness, error) {
|
|
return &networkHarness{
|
|
activeNodes: make(map[int]*lightningNode),
|
|
seenTxns: make(chan wire.ShaHash),
|
|
watchRequests: make(chan *watchRequest),
|
|
lndErrorChan: make(chan error),
|
|
}, nil
|
|
}
|
|
|
|
// InitializeSeedNodes initialized alice and bob nodes given an already
|
|
// running instance of btcd's rpctest harness and extra command line flags,
|
|
// which should be formatted properly - "--arg=value".
|
|
func (n *networkHarness) InitializeSeedNodes(r *rpctest.Harness, lndArgs []string) error {
|
|
nodeConfig := r.RPCConfig()
|
|
|
|
n.netParams = r.ActiveNet
|
|
n.Miner = r
|
|
n.rpcConfig = nodeConfig
|
|
|
|
var err error
|
|
n.Alice, err = newLightningNode(&nodeConfig, lndArgs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
n.Bob, err = newLightningNode(&nodeConfig, lndArgs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
n.activeNodes[n.Alice.nodeId] = n.Alice
|
|
n.activeNodes[n.Bob.nodeId] = n.Bob
|
|
|
|
return err
|
|
}
|
|
|
|
// ProcessErrors returns a channel used for reporting any fatal process errors.
|
|
// If any of the active nodes within the harness' test network incur a fatal
|
|
// error, that error is sent over this channel.
|
|
func (n *networkHarness) ProcessErrors() chan error {
|
|
return n.lndErrorChan
|
|
}
|
|
|
|
// fakeLogger is a fake grpclog.Logger implementation. This is used to stop
|
|
// grpc's logger from printing directly to stdout.
|
|
type fakeLogger struct{}
|
|
|
|
func (f *fakeLogger) Fatal(args ...interface{}) {}
|
|
func (f *fakeLogger) Fatalf(format string, args ...interface{}) {}
|
|
func (f *fakeLogger) Fatalln(args ...interface{}) {}
|
|
func (f *fakeLogger) Print(args ...interface{}) {}
|
|
func (f *fakeLogger) Printf(format string, args ...interface{}) {}
|
|
func (f *fakeLogger) Println(args ...interface{}) {}
|
|
|
|
// SetUp starts the initial seeder nodes within the test harness. The initial
|
|
// node's wallets will be funded wallets with ten 1 BTC outputs each. Finally
|
|
// rpc clients capable of communicating with the initial seeder nodes are
|
|
// created.
|
|
func (n *networkHarness) SetUp() error {
|
|
// Swap out grpc's default logger with out fake logger which drops the
|
|
// statements on the floor.
|
|
grpclog.SetLogger(&fakeLogger{})
|
|
|
|
// Start the initial seeder nodes within the test network, then connect
|
|
// their respective RPC clients.
|
|
var wg sync.WaitGroup
|
|
errChan := make(chan error, 2)
|
|
wg.Add(2)
|
|
go func() {
|
|
var err error
|
|
defer wg.Done()
|
|
if err = n.Alice.start(n.lndErrorChan); err != nil {
|
|
errChan <- err
|
|
return
|
|
}
|
|
}()
|
|
go func() {
|
|
var err error
|
|
defer wg.Done()
|
|
if err = n.Bob.start(n.lndErrorChan); err != nil {
|
|
errChan <- err
|
|
return
|
|
}
|
|
}()
|
|
wg.Wait()
|
|
select {
|
|
case err := <-errChan:
|
|
return err
|
|
default:
|
|
}
|
|
|
|
// Load up the wallets of the seeder nodes with 10 outputs of 1 BTC
|
|
// each.
|
|
ctxb := context.Background()
|
|
addrReq := &lnrpc.NewAddressRequest{lnrpc.NewAddressRequest_WITNESS_PUBKEY_HASH}
|
|
clients := []lnrpc.LightningClient{n.Alice, n.Bob}
|
|
for _, client := range clients {
|
|
for i := 0; i < 10; i++ {
|
|
resp, err := client.NewAddress(ctxb, addrReq)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
addr, err := btcutil.DecodeAddress(resp.Address, n.netParams)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
addrScript, err := txscript.PayToAddrScript(addr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
output := &wire.TxOut{
|
|
PkScript: addrScript,
|
|
Value: btcutil.SatoshiPerBitcoin,
|
|
}
|
|
if _, err := n.Miner.CoinbaseSpend([]*wire.TxOut{output}); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// We generate several blocks in order to give the outputs created
|
|
// above a good number of confirmations.
|
|
if _, err := n.Miner.Node.Generate(10); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Finally, make a connection between both of the nodes.
|
|
if err := n.ConnectNodes(ctxb, n.Alice, n.Bob); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Now block until both wallets have fully synced up.
|
|
expectedBalance := btcutil.Amount(btcutil.SatoshiPerBitcoin * 10).ToBTC()
|
|
balReq := &lnrpc.WalletBalanceRequest{}
|
|
balanceTicker := time.Tick(time.Millisecond * 50)
|
|
out:
|
|
for {
|
|
select {
|
|
case <-balanceTicker:
|
|
aliceResp, err := n.Alice.WalletBalance(ctxb, balReq)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
bobResp, err := n.Bob.WalletBalance(ctxb, balReq)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if aliceResp.Balance == expectedBalance &&
|
|
bobResp.Balance == expectedBalance {
|
|
break out
|
|
}
|
|
case <-time.After(time.Second * 30):
|
|
return fmt.Errorf("balances not synced after deadline")
|
|
}
|
|
}
|
|
|
|
// Now that the initial test network has been initialized, launch the
|
|
// network wather.
|
|
go n.networkWatcher()
|
|
|
|
return nil
|
|
}
|
|
|
|
// TearDownAll tears down all active nodes within the test lightning network.
|
|
func (n *networkHarness) TearDownAll() error {
|
|
for _, node := range n.activeNodes {
|
|
if err := node.shutdown(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// NewNode fully initializes a returns a new lightningNode binded to the
|
|
// current instance of the network harness. The created node is running, but
|
|
// not yet connected to other nodes within the network.
|
|
func (n *networkHarness) NewNode(extraArgs []string) (*lightningNode, error) {
|
|
n.Lock()
|
|
defer n.Unlock()
|
|
|
|
node, err := newLightningNode(&n.rpcConfig, extraArgs)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := node.start(n.lndErrorChan); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
n.activeNodes[node.nodeId] = node
|
|
|
|
return node, nil
|
|
}
|
|
|
|
// ConnectNodes establishes an encrypted+authenticated p2p connection from node
|
|
// a towards node b.
|
|
func (n *networkHarness) ConnectNodes(ctx context.Context, a, b *lightningNode) error {
|
|
bobInfo, err := b.GetInfo(ctx, &lnrpc.GetInfoRequest{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
req := &lnrpc.ConnectPeerRequest{
|
|
Addr: &lnrpc.LightningAddress{
|
|
Pubkey: bobInfo.IdentityPubkey,
|
|
Host: b.p2pAddr,
|
|
},
|
|
}
|
|
if _, err := a.ConnectPeer(ctx, req); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// RestartNode attempts to restart a lightning node by shutting it down
|
|
// cleanly, then restarting the process. This function is fully blocking. Upon
|
|
// restart, the RPC connection to the node will be re-attempted, continuing iff
|
|
// the connection attempt is successful. If the callback parameter is non-nil,
|
|
// then the function will be executed after the node shuts down, but *before*
|
|
// the process has been started up again.
|
|
//
|
|
// This method can be useful when testing edge cases such as a node broadcast
|
|
// and invalidated prior state, or persistent state recovery, simulating node
|
|
// crashes, etc.
|
|
func (n *networkHarness) RestartNode(node *lightningNode, callback func() error) error {
|
|
return node.restart(n.lndErrorChan, callback)
|
|
}
|
|
|
|
// TODO(roasbeef): add a WithChannel higher-order function?
|
|
// * python-like context manager w.r.t using a channel within a test
|
|
// * possibly adds more funds to the target wallet if the funds are not
|
|
// enough
|
|
|
|
// watchRequest encapsulates a request to the harness' network watcher to
|
|
// dispatch a notification once a transaction with the target txid is seen
|
|
// within the test network.
|
|
type watchRequest struct {
|
|
txid wire.ShaHash
|
|
eventChan chan struct{}
|
|
}
|
|
|
|
// networkWatcher is a goroutine which accepts async notification requests for
|
|
// the broadcast of a target transaction, and then dispatches the transaction
|
|
// once its seen on the network.
|
|
func (n *networkHarness) networkWatcher() {
|
|
seenTxns := make(map[wire.ShaHash]struct{})
|
|
clients := make(map[wire.ShaHash][]chan struct{})
|
|
|
|
for {
|
|
|
|
select {
|
|
case req := <-n.watchRequests:
|
|
// If we've already seen this transaction, then
|
|
// immediately dispatch the request. Otherwise, append
|
|
// to the list of clients who are watching for the
|
|
// broadcast of this transaction.
|
|
if _, ok := seenTxns[req.txid]; ok {
|
|
close(req.eventChan)
|
|
} else {
|
|
clients[req.txid] = append(clients[req.txid], req.eventChan)
|
|
}
|
|
case txid := <-n.seenTxns:
|
|
// Add this txid to our set of "seen" transactions. So
|
|
// we're able to dispatch any notifications for this
|
|
// txid which arrive *after* it's seen within the
|
|
// network.
|
|
seenTxns[txid] = struct{}{}
|
|
|
|
// If there isn't a registered notification for this
|
|
// transaction then ignore it.
|
|
txClients, ok := clients[txid]
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
// Otherwise, dispatch the notification to all clients,
|
|
// cleaning up the now un-needed state.
|
|
for _, client := range txClients {
|
|
close(client)
|
|
}
|
|
delete(clients, txid)
|
|
}
|
|
}
|
|
}
|
|
|
|
// OnTxAccepted is a callback to be called each time a new transaction has been
|
|
// broadcast on the network.
|
|
func (n *networkHarness) OnTxAccepted(hash *wire.ShaHash, amt btcutil.Amount) {
|
|
go func() {
|
|
n.seenTxns <- *hash
|
|
}()
|
|
}
|
|
|
|
// WaitForTxBroadcast blocks until the target txid is seen on the network. If
|
|
// the transaction isn't seen within the network before the passed timeout,
|
|
// then an error is returned.
|
|
// TODO(roasbeef): add another method which creates queue of all seen transactions
|
|
func (n *networkHarness) WaitForTxBroadcast(ctx context.Context, txid wire.ShaHash) error {
|
|
eventChan := make(chan struct{})
|
|
|
|
n.watchRequests <- &watchRequest{txid, eventChan}
|
|
|
|
select {
|
|
case <-eventChan:
|
|
return nil
|
|
case <-ctx.Done():
|
|
return fmt.Errorf("tx not seen before context timeout")
|
|
}
|
|
}
|
|
|
|
// OpenChannel attempts to open a channel between srcNode and destNode with the
|
|
// passed channel funding parameters. If the passed context has a timeout, then
|
|
// if the timeout is reached before the channel pending notification is
|
|
// received, an error is returned.
|
|
func (n *networkHarness) OpenChannel(ctx context.Context,
|
|
srcNode, destNode *lightningNode, amt btcutil.Amount,
|
|
numConfs uint32) (lnrpc.Lightning_OpenChannelClient, error) {
|
|
|
|
openReq := &lnrpc.OpenChannelRequest{
|
|
NodePubkey: destNode.PubKey[:],
|
|
LocalFundingAmount: int64(amt),
|
|
NumConfs: numConfs,
|
|
}
|
|
|
|
respStream, err := srcNode.OpenChannel(ctx, openReq)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to open channel between "+
|
|
"alice and bob: %v", err)
|
|
}
|
|
|
|
chanOpen := make(chan struct{})
|
|
errChan := make(chan error)
|
|
go func() {
|
|
// Consume the "channel pending" update. This waits until the node
|
|
// notifies us that the final message in the channel funding workflow
|
|
// has been sent to the remote node.
|
|
resp, err := respStream.Recv()
|
|
if err != nil {
|
|
errChan <- err
|
|
return
|
|
}
|
|
if _, ok := resp.Update.(*lnrpc.OpenStatusUpdate_ChanPending); !ok {
|
|
errChan <- fmt.Errorf("expected channel pending update, "+
|
|
"instead got %v", resp)
|
|
return
|
|
}
|
|
|
|
close(chanOpen)
|
|
}()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, fmt.Errorf("timeout reached before chan pending " +
|
|
"update sent")
|
|
case err := <-errChan:
|
|
return nil, err
|
|
case <-chanOpen:
|
|
return respStream, nil
|
|
}
|
|
}
|
|
|
|
// WaitForChannelOpen waits for a notification that a channel is open by
|
|
// consuming a message from the past open channel stream. If the passed context
|
|
// has a timeout, then if the timeout is reached before the channel has been
|
|
// opened, then an error is returned.
|
|
func (n *networkHarness) WaitForChannelOpen(ctx context.Context,
|
|
openChanStream lnrpc.Lightning_OpenChannelClient) (*lnrpc.ChannelPoint, error) {
|
|
|
|
errChan := make(chan error)
|
|
respChan := make(chan *lnrpc.ChannelPoint)
|
|
go func() {
|
|
resp, err := openChanStream.Recv()
|
|
if err != nil {
|
|
errChan <- fmt.Errorf("unable to read rpc resp: %v", err)
|
|
return
|
|
}
|
|
fundingResp, ok := resp.Update.(*lnrpc.OpenStatusUpdate_ChanOpen)
|
|
if !ok {
|
|
errChan <- fmt.Errorf("expected channel open update, "+
|
|
"instead got %v", resp)
|
|
return
|
|
}
|
|
|
|
respChan <- fundingResp.ChanOpen.ChannelPoint
|
|
}()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, fmt.Errorf("timeout reached while waiting for " +
|
|
"channel open")
|
|
case err := <-errChan:
|
|
return nil, err
|
|
case chanPoint := <-respChan:
|
|
return chanPoint, nil
|
|
}
|
|
}
|
|
|
|
// CloseChannel close channel attempts to close the channel indicated by the
|
|
// passed channel point, initiated by the passed lnNode. If the passed context
|
|
// has a timeout, then if the timeout is reached before the channel close is
|
|
// pending, then an error is returned.
|
|
func (n *networkHarness) CloseChannel(ctx context.Context,
|
|
lnNode *lightningNode, cp *lnrpc.ChannelPoint,
|
|
force bool) (lnrpc.Lightning_CloseChannelClient, error) {
|
|
|
|
closeReq := &lnrpc.CloseChannelRequest{
|
|
ChannelPoint: cp,
|
|
Force: force,
|
|
}
|
|
closeRespStream, err := lnNode.CloseChannel(ctx, closeReq)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to close channel: %v", err)
|
|
}
|
|
|
|
errChan := make(chan error)
|
|
fin := make(chan struct{})
|
|
go func() {
|
|
// Consume the "channel close" update in order to wait for the closing
|
|
// transaction to be broadcast, then wait for the closing tx to be seen
|
|
// within the network.
|
|
closeResp, err := closeRespStream.Recv()
|
|
if err != nil {
|
|
errChan <- err
|
|
return
|
|
}
|
|
pendingClose, ok := closeResp.Update.(*lnrpc.CloseStatusUpdate_ClosePending)
|
|
if !ok {
|
|
errChan <- fmt.Errorf("expected channel close update, "+
|
|
"instead got %v", pendingClose)
|
|
return
|
|
}
|
|
|
|
closeTxid, err := wire.NewShaHash(pendingClose.ClosePending.Txid)
|
|
if err != nil {
|
|
errChan <- err
|
|
return
|
|
}
|
|
if err := n.WaitForTxBroadcast(ctx, *closeTxid); err != nil {
|
|
errChan <- err
|
|
return
|
|
}
|
|
|
|
close(fin)
|
|
}()
|
|
|
|
// Wait until either the deadline for the context expires, an error
|
|
// occurs, or the channel close update is received.
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, fmt.Errorf("timeout reached before channel close " +
|
|
"initiated")
|
|
case err := <-errChan:
|
|
return nil, err
|
|
case <-fin:
|
|
return closeRespStream, nil
|
|
}
|
|
}
|
|
|
|
// WaitForChannelClose waits for a notification from the passed channel close
|
|
// stream that the node has deemed the channel has been fully closed. If the
|
|
// passed context has a timeout, then if the timeout is reached before the
|
|
// notification is received then an error is returned.
|
|
func (n *networkHarness) WaitForChannelClose(ctx context.Context,
|
|
closeChanStream lnrpc.Lightning_CloseChannelClient) (*wire.ShaHash, error) {
|
|
|
|
errChan := make(chan error)
|
|
updateChan := make(chan *lnrpc.CloseStatusUpdate_ChanClose)
|
|
go func() {
|
|
closeResp, err := closeChanStream.Recv()
|
|
if err != nil {
|
|
errChan <- err
|
|
return
|
|
}
|
|
|
|
closeFin, ok := closeResp.Update.(*lnrpc.CloseStatusUpdate_ChanClose)
|
|
if !ok {
|
|
errChan <- fmt.Errorf("expected channel close update, "+
|
|
"instead got %v", closeFin)
|
|
return
|
|
}
|
|
|
|
updateChan <- closeFin
|
|
}()
|
|
|
|
// Wait until either the deadline for the context expires, an error
|
|
// occurs, or the channel close update is received.
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, fmt.Errorf("timeout reached before update sent")
|
|
case err := <-errChan:
|
|
return nil, err
|
|
case update := <-updateChan:
|
|
return wire.NewShaHash(update.ChanClose.ClosingTxid)
|
|
}
|
|
}
|
|
|
|
// AssertChannelExists asserts that an active channel identified by
|
|
// channelPoint is known to exist from the point-of-view of node..
|
|
func (n *networkHarness) AssertChannelExists(ctx context.Context,
|
|
node *lightningNode, chanPoint *wire.OutPoint) error {
|
|
|
|
req := &lnrpc.ListChannelsRequest{}
|
|
resp, err := node.ListChannels(ctx, req)
|
|
if err != nil {
|
|
return fmt.Errorf("unable fetch node's channels: %v", err)
|
|
}
|
|
|
|
for _, channel := range resp.Channels {
|
|
if channel.ChannelPoint == chanPoint.String() {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
return fmt.Errorf("channel not found")
|
|
}
|
|
|
|
// DumpLogs reads the current logs generated by the passed node, and returns
|
|
// the logs as a single string. This function is useful for examining the logs
|
|
// of a particular node in the case of a test failure.
|
|
// Logs from lightning node being generated with delay - you should
|
|
// add time.Sleep() in order to get all logs.
|
|
func (n *networkHarness) DumpLogs(node *lightningNode) (string, error) {
|
|
logFile := fmt.Sprintf("%v/simnet/lnd.log", node.cfg.LogDir)
|
|
|
|
buf, err := ioutil.ReadFile(logFile)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return string(buf), nil
|
|
}
|
|
|
|
// SendCoins attemps to send amt satoshis from the internal mining node to the
|
|
// targetted lightning node.
|
|
func (n *networkHarness) SendCoins(ctx context.Context, amt btcutil.Amount,
|
|
target *lightningNode) error {
|
|
|
|
balReq := &lnrpc.WalletBalanceRequest{}
|
|
initialBalance, err := target.WalletBalance(ctx, balReq)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// First, obtain an address from the target lightning node, preferring
|
|
// to receive a p2wkh address s.t the output can immediately be used as
|
|
// an input to a funding transaction.
|
|
addrReq := &lnrpc.NewAddressRequest{
|
|
Type: lnrpc.NewAddressRequest_WITNESS_PUBKEY_HASH,
|
|
}
|
|
resp, err := target.NewAddress(ctx, addrReq)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
addr, err := btcutil.DecodeAddress(resp.Address, n.netParams)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
addrScript, err := txscript.PayToAddrScript(addr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Generate a transaction which creates an output to the target
|
|
// pkScript of the desired amount.
|
|
output := &wire.TxOut{
|
|
PkScript: addrScript,
|
|
Value: int64(amt),
|
|
}
|
|
if _, err := n.Miner.CoinbaseSpend([]*wire.TxOut{output}); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Finally, generate 6 new blocks to ensure the output gains a
|
|
// sufficient number of confirmations.
|
|
if _, err := n.Miner.Node.Generate(6); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Pause until the nodes current wallet balances reflects the amount
|
|
// sent to it above.
|
|
// TODO(roasbeef): factor out into helper func
|
|
for {
|
|
select {
|
|
case <-time.Tick(time.Millisecond * 50):
|
|
currentBal, err := target.WalletBalance(ctx, balReq)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if currentBal.Balance == initialBalance.Balance+amt.ToBTC() {
|
|
return nil
|
|
}
|
|
case <-time.After(time.Second * 30):
|
|
return fmt.Errorf("balances not synced after deadline")
|
|
}
|
|
}
|
|
}
|