package main import ( "bytes" "encoding/hex" "flag" "fmt" "io" "io/ioutil" "log" "net" "os" "path/filepath" "strconv" "sync" "time" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" "os/exec" "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/lnrpc" "github.com/roasbeef/btcd/chaincfg" "github.com/roasbeef/btcd/chaincfg/chainhash" "github.com/roasbeef/btcd/rpctest" "github.com/roasbeef/btcd/txscript" "github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcrpcclient" "github.com/roasbeef/btcutil" ) var ( // numActiveNodes is the number of active nodes within the test network. numActiveNodes = 0 // defaultNodePort is the initial p2p port which will be used by the // first created lightning node to listen on for incoming p2p // connections. Subsequent allocated ports for future lighting nodes // instances will be monotonically increasing odd numbers calculated as // such: defaultP2pPort + (2 * harness.nodeNum). defaultNodePort = 19555 // defaultClientPort is the initial rpc port which will be used by the // first created lightning node to listen on for incoming rpc // connections. Subsequent allocated ports for future rpc harness // instances will be monotonically increasing even numbers calculated // as such: defaultP2pPort + (2 * harness.nodeNum). defaultClientPort = 19556 harnessNetParams = &chaincfg.SimNetParams // logOutput is a flag that can be set to append the output from the // seed nodes to log files. logOutput = flag.Bool("logoutput", false, "log output from node n to file outputn.log") ) // generateListeningPorts returns two strings representing ports to listen on // designated for the current lightning network test. If there haven't been any // test instances created, the default ports are used. Otherwise, in order to // support multiple test nodes running at once, the p2p and rpc port are // incremented after each initialization. func generateListeningPorts() (int, int) { var p2p, rpc int if numActiveNodes == 0 { p2p = defaultNodePort rpc = defaultClientPort } else { p2p = defaultNodePort + (2 * numActiveNodes) rpc = defaultClientPort + (2 * numActiveNodes) } return p2p, rpc } // lightningNode represents an instance of lnd running within our test network // harness. Each lightningNode instance also fully embedds an RPC client in // order to pragmatically drive the node. type lightningNode struct { cfg *config rpcAddr string p2pAddr string rpcCert []byte nodeID int // PubKey is the serialized compressed identity public key of the node. // This field will only be populated once the node itself has been // started via the start() method. PubKey [33]byte PubKeyStr string cmd *exec.Cmd pidFile string // processExit is a channel that's closed once it's detected that the // process this instance of lightningNode is bound to has exited. processExit chan struct{} extraArgs []string chanWatchRequests chan *chanWatchRequest quit chan struct{} wg sync.WaitGroup lnrpc.LightningClient } // newLightningNode creates a new test lightning node instance from the passed // rpc config and slice of extra arguments. func newLightningNode(btcrpcConfig *btcrpcclient.ConnConfig, lndArgs []string) (*lightningNode, error) { var err error cfg := &config{ Bitcoin: &chainConfig{ RPCHost: btcrpcConfig.Host, RPCUser: btcrpcConfig.User, RPCPass: btcrpcConfig.Pass, }, } nodeNum := numActiveNodes cfg.DataDir, err = ioutil.TempDir("", "lndtest-data") if err != nil { return nil, err } cfg.LogDir, err = ioutil.TempDir("", "lndtest-log") if err != nil { return nil, err } cfg.TLSCertPath = filepath.Join(cfg.DataDir, "tls.cert") cfg.TLSKeyPath = filepath.Join(cfg.DataDir, "tls.key") cfg.PeerPort, cfg.RPCPort = generateListeningPorts() numActiveNodes++ lndArgs = append(lndArgs, "--externalip=127.0.0.1:"+ strconv.Itoa(cfg.PeerPort)) return &lightningNode{ cfg: cfg, p2pAddr: net.JoinHostPort("127.0.0.1", strconv.Itoa(cfg.PeerPort)), rpcAddr: net.JoinHostPort("127.0.0.1", strconv.Itoa(cfg.RPCPort)), rpcCert: btcrpcConfig.Certificates, nodeID: nodeNum, chanWatchRequests: make(chan *chanWatchRequest), processExit: make(chan struct{}), quit: make(chan struct{}), extraArgs: lndArgs, }, nil } // genArgs generates a slice of command line arguments from the lightningNode's // current config struct. func (l *lightningNode) genArgs() []string { var args []string encodedCert := hex.EncodeToString(l.rpcCert) args = append(args, "--bitcoin.active") args = append(args, "--bitcoin.simnet") args = append(args, fmt.Sprintf("--bitcoin.rpchost=%v", l.cfg.Bitcoin.RPCHost)) args = append(args, fmt.Sprintf("--bitcoin.rpcuser=%v", l.cfg.Bitcoin.RPCUser)) args = append(args, fmt.Sprintf("--bitcoin.rpcpass=%v", l.cfg.Bitcoin.RPCPass)) args = append(args, fmt.Sprintf("--bitcoin.rawrpccert=%v", encodedCert)) args = append(args, fmt.Sprintf("--rpcport=%v", l.cfg.RPCPort)) args = append(args, fmt.Sprintf("--peerport=%v", l.cfg.PeerPort)) args = append(args, fmt.Sprintf("--logdir=%v", l.cfg.LogDir)) args = append(args, fmt.Sprintf("--datadir=%v", l.cfg.DataDir)) args = append(args, fmt.Sprintf("--tlscertpath=%v", l.cfg.TLSCertPath)) args = append(args, fmt.Sprintf("--tlskeypath=%v", l.cfg.TLSKeyPath)) args = append(args, fmt.Sprintf("--configfile=%v", l.cfg.DataDir)) if l.extraArgs != nil { args = append(args, l.extraArgs...) } return args } // Start launches a new process running lnd. Additionally, the PID of the // launched process is saved in order to possibly kill the process forcibly // later. func (l *lightningNode) Start(lndError chan error) error { args := l.genArgs() l.cmd = exec.Command("lnd", args...) // Redirect stderr output to buffer var errb bytes.Buffer l.cmd.Stderr = &errb // If the logoutput flag is passed, redirect output from the nodes to // log files. if *logOutput { logFile := fmt.Sprintf("output%d.log", l.nodeID) // Create file if not exists, otherwise append. file, err := os.OpenFile(logFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) if err != nil { return err } // Pass node's stderr to both errb and the file. w := io.MultiWriter(&errb, file) l.cmd.Stderr = w // Pass the node's stdout only to the file. l.cmd.Stdout = file } if err := l.cmd.Start(); err != nil { return err } // Launch a new goroutine which that bubbles up any potential fatal // process errors to the goroutine running the tests. go func() { err := l.cmd.Wait() // Signal any onlookers that this process has exited. close(l.processExit) if err != nil { lndError <- errors.New(errb.String()) } }() pid, err := os.Create(filepath.Join(l.cfg.DataDir, fmt.Sprintf("%v.pid", l.nodeID))) if err != nil { return err } l.pidFile = pid.Name() if _, err = fmt.Fprintf(pid, "%v\n", l.cmd.Process.Pid); err != nil { return err } if err := pid.Close(); err != nil { return err } // Wait until TLS certificate is created before using it, up to 20 sec. tlsTimeout := time.After(20 * time.Second) for !fileExists(l.cfg.TLSCertPath) { time.Sleep(100 * time.Millisecond) select { case <-tlsTimeout: panic(fmt.Errorf("timeout waiting for TLS cert file " + "to be created after 20 seconds")) default: } } creds, err := credentials.NewClientTLSFromFile(l.cfg.TLSCertPath, "") if err != nil { return err } opts := []grpc.DialOption{ grpc.WithTransportCredentials(creds), grpc.WithBlock(), grpc.WithTimeout(time.Second * 20), } conn, err := grpc.Dial(l.rpcAddr, opts...) if err != nil { return err } l.LightningClient = lnrpc.NewLightningClient(conn) // Obtain the lnid of this node for quick identification purposes. ctxb := context.Background() info, err := l.GetInfo(ctxb, &lnrpc.GetInfoRequest{}) if err != nil { return err } l.PubKeyStr = info.IdentityPubkey pubkey, err := hex.DecodeString(info.IdentityPubkey) if err != nil { return err } copy(l.PubKey[:], pubkey) // Launch the watcher that'll hook into graph related topology change // from the PoV of this node. l.wg.Add(1) go l.lightningNetworkWatcher() return nil } // cleanup cleans up all the temporary files created by the node's process. func (l *lightningNode) cleanup() error { dirs := []string{ l.cfg.LogDir, l.cfg.DataDir, } var err error for _, dir := range dirs { if err = os.RemoveAll(dir); err != nil { log.Printf("Cannot remove dir %s: %v", dir, err) } } return err } // Stop attempts to stop the active lnd process. func (l *lightningNode) Stop() error { // We should skip node stop in case: // - start of the node wasn't initiated // - process wasn't spawned // - process already finished select { case <-l.quit: return nil case <-l.processExit: return nil default: } // Don't watch for error because sometimes the RPC connection gets // closed before a response is returned. req := lnrpc.StopRequest{} ctx := context.Background() l.LightningClient.StopDaemon(ctx, &req) close(l.quit) l.wg.Wait() return nil } // Restart attempts to restart a lightning node by shutting it down cleanly, // then restarting the process. This function is fully blocking. Upon restart, // the RPC connection to the node will be re-attempted, continuing iff the // connection attempt is successful. Additionally, if a callback is passed, the // closure will be executed after the node has been shutdown, but before the // process has been started up again. func (l *lightningNode) Restart(errChan chan error, callback func() error) error { if err := l.Stop(); err != nil { return nil } <-l.processExit l.processExit = make(chan struct{}) l.quit = make(chan struct{}) l.wg = sync.WaitGroup{} if callback != nil { if err := callback(); err != nil { return err } } return l.Start(errChan) } // Shutdown stops the active lnd process and clean up any temporary directories // created along the way. func (l *lightningNode) Shutdown() error { if err := l.Stop(); err != nil { return err } if err := l.cleanup(); err != nil { return err } return nil } // closeChanWatchRequest is a request to the lightningNetworkWatcher to be // notified once it's detected within the test Lightning Network, that a // channel has either been added or closed. type chanWatchRequest struct { chanPoint wire.OutPoint chanOpen bool eventChan chan struct{} } // lightningNetworkWatcher is a goroutine which is able to dispatch // notifications once it has been observed that a target channel has been // closed or opened within the network. In order to dispatch these // notifications, the GraphTopologySubscription client exposed as part of the // gRPC interface is used. func (l *lightningNode) lightningNetworkWatcher() { defer l.wg.Done() graphUpdates := make(chan *lnrpc.GraphTopologyUpdate) l.wg.Add(1) go func() { defer l.wg.Done() ctxb := context.Background() req := &lnrpc.GraphTopologySubscription{} topologyClient, err := l.SubscribeChannelGraph(ctxb, req) if err != nil { // We panic here in case of an error as failure to // create the topology client will cause all subsequent // tests to fail. panic(fmt.Errorf("unable to create topology "+ "client: %v", err)) } for { update, err := topologyClient.Recv() if err == io.EOF { return } else if err != nil { return } select { case graphUpdates <- update: case <-l.quit: return } } }() // For each outpoint, we'll track an integer which denotes the number // of edges seen for that channel within the network. When this number // reaches 2, then it means that both edge advertisements has // propagated through the network. openChans := make(map[wire.OutPoint]int) openClients := make(map[wire.OutPoint][]chan struct{}) closedChans := make(map[wire.OutPoint]struct{}) closeClients := make(map[wire.OutPoint][]chan struct{}) for { select { // A new graph update has just been received, so we'll examine // the current set of registered clients to see if we can // dispatch any requests. case graphUpdate := <-graphUpdates: // For each new channel, we'll increment the number of // edges seen by one. for _, newChan := range graphUpdate.ChannelUpdates { txid, _ := chainhash.NewHash(newChan.ChanPoint.FundingTxid) op := wire.OutPoint{ Hash: *txid, Index: newChan.ChanPoint.OutputIndex, } openChans[op]++ // For this new channel, if the number of edges // seen is less than two, then the channel // hasn't been fully announced yet. if numEdges := openChans[op]; numEdges < 2 { continue } // Otherwise, we'll notify all the registered // clients and remove the dispatched clients. for _, eventChan := range openClients[op] { close(eventChan) } delete(openClients, op) } // For each channel closed, we'll mark that we've // detected a channel closure while lnd was pruning the // channel graph. for _, closedChan := range graphUpdate.ClosedChans { txid, _ := chainhash.NewHash(closedChan.ChanPoint.FundingTxid) op := wire.OutPoint{ Hash: *txid, Index: closedChan.ChanPoint.OutputIndex, } closedChans[op] = struct{}{} // As the channel has been closed, we'll notify // all register clients. for _, eventChan := range closeClients[op] { close(eventChan) } delete(closeClients, op) } // A new watch request, has just arrived. We'll either be able // to dispatch immediately, or need to add the client for // processing later. case watchRequest := <-l.chanWatchRequests: targetChan := watchRequest.chanPoint // TODO(roasbeef): add update type also, checks for // multiple of 2 if watchRequest.chanOpen { // If this is a open request, then it can be // dispatched if the number of edges seen for // the channel is at least two. if numEdges := openChans[targetChan]; numEdges >= 2 { close(watchRequest.eventChan) continue } // Otherwise, we'll add this to the list of // watch open clients for this out point. openClients[targetChan] = append(openClients[targetChan], watchRequest.eventChan) continue } // If this is a close request, then it can be // immediately dispatched if we've already seen a // channel closure for this channel. if _, ok := closedChans[targetChan]; ok { close(watchRequest.eventChan) continue } // Otherwise, we'll add this to the list of close watch // clients for this out point. closeClients[targetChan] = append(closeClients[targetChan], watchRequest.eventChan) case <-l.quit: return } } } // WaitForNetworkChannelOpen will block until a channel with the target // outpoint is seen as being fully advertised within the network. A channel is // considered "fully advertised" once both of its directional edges has been // advertised within the test Lightning Network. func (l *lightningNode) WaitForNetworkChannelOpen(ctx context.Context, op *lnrpc.ChannelPoint) error { eventChan := make(chan struct{}) txid, err := chainhash.NewHash(op.FundingTxid) if err != nil { return err } l.chanWatchRequests <- &chanWatchRequest{ chanPoint: wire.OutPoint{ Hash: *txid, Index: op.OutputIndex, }, eventChan: eventChan, chanOpen: true, } select { case <-eventChan: return nil case <-ctx.Done(): return fmt.Errorf("channel not opened before timeout") } } // WaitForNetworkChannelClose will block until a channel with the target // outpoint is seen as closed within the network. A channel is considered // closed once a transaction spending the funding outpoint is seen within a // confirmed block. func (l *lightningNode) WaitForNetworkChannelClose(ctx context.Context, op *lnrpc.ChannelPoint) error { eventChan := make(chan struct{}) txid, err := chainhash.NewHash(op.FundingTxid) if err != nil { return err } l.chanWatchRequests <- &chanWatchRequest{ chanPoint: wire.OutPoint{ Hash: *txid, Index: op.OutputIndex, }, eventChan: eventChan, chanOpen: false, } select { case <-eventChan: return nil case <-ctx.Done(): return fmt.Errorf("channel not closed before timeout") } } // WaitForBlockchainSync will block until the target nodes has fully // synchronized with the blockchain. If the passed context object has a set // timeout, then the goroutine will continually poll until the timeout has // elapsed. In the case that the chain isn't synced before the timeout is up, // then this function will return an error. func (l *lightningNode) WaitForBlockchainSync(ctx context.Context) error { errChan := make(chan error, 1) retryDelay := time.Millisecond * 100 go func() { for { select { case <-ctx.Done(): case <-l.quit: return default: } getInfoReq := &lnrpc.GetInfoRequest{} getInfoResp, err := l.GetInfo(ctx, getInfoReq) if err != nil { errChan <- err return } if getInfoResp.SyncedToChain { errChan <- nil return } select { case <-ctx.Done(): return case <-time.After(retryDelay): } } }() select { case <-l.quit: return nil case err := <-errChan: return err case <-ctx.Done(): return fmt.Errorf("Timeout while waiting for blockchain sync") } } // networkHarness is an integration testing harness for the lightning network. // The harness by default is created with two active nodes on the network: // Alice and Bob. type networkHarness struct { rpcConfig btcrpcclient.ConnConfig netParams *chaincfg.Params Miner *rpctest.Harness activeNodes map[int]*lightningNode // Alice and Bob are the initial seeder nodes that are automatically // created to be the initial participants of the test network. Alice *lightningNode Bob *lightningNode seenTxns chan chainhash.Hash bitcoinWatchRequests chan *txWatchRequest // Channel for transmitting stderr output from failed lightning node // to main process. lndErrorChan chan error sync.Mutex } // newNetworkHarness creates a new network test harness. // TODO(roasbeef): add option to use golang's build library to a binary of the // current repo. This'll save developers from having to manually `go install` // within the repo each time before changes func newNetworkHarness() (*networkHarness, error) { return &networkHarness{ activeNodes: make(map[int]*lightningNode), seenTxns: make(chan chainhash.Hash), bitcoinWatchRequests: make(chan *txWatchRequest), lndErrorChan: make(chan error), }, nil } // InitializeSeedNodes initialized alice and bob nodes given an already // running instance of btcd's rpctest harness and extra command line flags, // which should be formatted properly - "--arg=value". func (n *networkHarness) InitializeSeedNodes(r *rpctest.Harness, lndArgs []string) error { nodeConfig := r.RPCConfig() n.netParams = r.ActiveNet n.Miner = r n.rpcConfig = nodeConfig var err error n.Alice, err = newLightningNode(&nodeConfig, lndArgs) if err != nil { return err } n.Bob, err = newLightningNode(&nodeConfig, lndArgs) if err != nil { return err } n.activeNodes[n.Alice.nodeID] = n.Alice n.activeNodes[n.Bob.nodeID] = n.Bob return err } // ProcessErrors returns a channel used for reporting any fatal process errors. // If any of the active nodes within the harness' test network incur a fatal // error, that error is sent over this channel. func (n *networkHarness) ProcessErrors() chan error { return n.lndErrorChan } // fakeLogger is a fake grpclog.Logger implementation. This is used to stop // grpc's logger from printing directly to stdout. type fakeLogger struct{} func (f *fakeLogger) Fatal(args ...interface{}) {} func (f *fakeLogger) Fatalf(format string, args ...interface{}) {} func (f *fakeLogger) Fatalln(args ...interface{}) {} func (f *fakeLogger) Print(args ...interface{}) {} func (f *fakeLogger) Printf(format string, args ...interface{}) {} func (f *fakeLogger) Println(args ...interface{}) {} // SetUp starts the initial seeder nodes within the test harness. The initial // node's wallets will be funded wallets with ten 1 BTC outputs each. Finally // rpc clients capable of communicating with the initial seeder nodes are // created. func (n *networkHarness) SetUp() error { // Swap out grpc's default logger with out fake logger which drops the // statements on the floor. grpclog.SetLogger(&fakeLogger{}) // Start the initial seeder nodes within the test network, then connect // their respective RPC clients. var wg sync.WaitGroup errChan := make(chan error, 2) wg.Add(2) go func() { var err error defer wg.Done() if err = n.Alice.Start(n.lndErrorChan); err != nil { errChan <- err return } }() go func() { var err error defer wg.Done() if err = n.Bob.Start(n.lndErrorChan); err != nil { errChan <- err return } }() wg.Wait() select { case err := <-errChan: return err default: } // Load up the wallets of the seeder nodes with 10 outputs of 1 BTC // each. ctxb := context.Background() addrReq := &lnrpc.NewAddressRequest{ Type: lnrpc.NewAddressRequest_WITNESS_PUBKEY_HASH, } clients := []lnrpc.LightningClient{n.Alice, n.Bob} for _, client := range clients { for i := 0; i < 10; i++ { resp, err := client.NewAddress(ctxb, addrReq) if err != nil { return err } addr, err := btcutil.DecodeAddress(resp.Address, n.netParams) if err != nil { return err } addrScript, err := txscript.PayToAddrScript(addr) if err != nil { return err } output := &wire.TxOut{ PkScript: addrScript, Value: btcutil.SatoshiPerBitcoin, } if _, err := n.Miner.SendOutputs([]*wire.TxOut{output}, 30); err != nil { return err } } } // We generate several blocks in order to give the outputs created // above a good number of confirmations. if _, err := n.Miner.Node.Generate(10); err != nil { return err } // Finally, make a connection between both of the nodes. if err := n.ConnectNodes(ctxb, n.Alice, n.Bob); err != nil { return err } // Now block until both wallets have fully synced up. expectedBalance := int64(btcutil.SatoshiPerBitcoin * 10) balReq := &lnrpc.WalletBalanceRequest{} balanceTicker := time.Tick(time.Millisecond * 50) out: for { select { case <-balanceTicker: aliceResp, err := n.Alice.WalletBalance(ctxb, balReq) if err != nil { return err } bobResp, err := n.Bob.WalletBalance(ctxb, balReq) if err != nil { return err } if aliceResp.Balance == expectedBalance && bobResp.Balance == expectedBalance { break out } case <-time.After(time.Second * 30): return fmt.Errorf("balances not synced after deadline") } } // Now that the initial test network has been initialized, launch the // network watcher. go n.networkWatcher() return nil } // TearDownAll tears down all active nodes within the test lightning network. func (n *networkHarness) TearDownAll() error { for _, node := range n.activeNodes { if err := node.Shutdown(); err != nil { return err } } return nil } // NewNode fully initializes a returns a new lightningNode binded to the // current instance of the network harness. The created node is running, but // not yet connected to other nodes within the network. func (n *networkHarness) NewNode(extraArgs []string) (*lightningNode, error) { n.Lock() defer n.Unlock() node, err := newLightningNode(&n.rpcConfig, extraArgs) if err != nil { return nil, err } if err := node.Start(n.lndErrorChan); err != nil { return nil, err } n.activeNodes[node.nodeID] = node return node, nil } // ConnectNodes establishes an encrypted+authenticated p2p connection from node // a towards node b. The function will return a non-nil error if the connection // was unable to be established. // // NOTE: This function may block for up to 15-seconds as it will not return // until the new connection is detected as being known to both nodes. func (n *networkHarness) ConnectNodes(ctx context.Context, a, b *lightningNode) error { bobInfo, err := b.GetInfo(ctx, &lnrpc.GetInfoRequest{}) if err != nil { return err } req := &lnrpc.ConnectPeerRequest{ Addr: &lnrpc.LightningAddress{ Pubkey: bobInfo.IdentityPubkey, Host: b.p2pAddr, }, } if _, err := a.ConnectPeer(ctx, req); err != nil { return err } timeout := time.After(time.Second * 15) for { select { case <-timeout: return fmt.Errorf("peers not connected within 15 seconds") default: } // If node B is seen in the ListPeers response from node A, // then we can exit early as the connection has been fully // established. resp, err := a.ListPeers(ctx, &lnrpc.ListPeersRequest{}) if err != nil { return err } for _, peer := range resp.Peers { if peer.PubKey == b.PubKeyStr { return nil } } } } // DisconnectNodes disconnects node a from node b by sending RPC message // from a node to b node func (n *networkHarness) DisconnectNodes(ctx context.Context, a, b *lightningNode) error { bobInfo, err := b.GetInfo(ctx, &lnrpc.GetInfoRequest{}) if err != nil { return err } req := &lnrpc.DisconnectPeerRequest{ PubKey: bobInfo.IdentityPubkey, } if _, err := a.DisconnectPeer(ctx, req); err != nil { return err } return nil } // RestartNode attempts to restart a lightning node by shutting it down // cleanly, then restarting the process. This function is fully blocking. Upon // restart, the RPC connection to the node will be re-attempted, continuing iff // the connection attempt is successful. If the callback parameter is non-nil, // then the function will be executed after the node shuts down, but *before* // the process has been started up again. // // This method can be useful when testing edge cases such as a node broadcast // and invalidated prior state, or persistent state recovery, simulating node // crashes, etc. func (n *networkHarness) RestartNode(node *lightningNode, callback func() error) error { return node.Restart(n.lndErrorChan, callback) } // TODO(roasbeef): add a WithChannel higher-order function? // * python-like context manager w.r.t using a channel within a test // * possibly adds more funds to the target wallet if the funds are not // enough // txWatchRequest encapsulates a request to the harness' Bitcoin network // watcher to dispatch a notification once a transaction with the target txid // is seen within the test network. type txWatchRequest struct { txid chainhash.Hash eventChan chan struct{} } // bitcoinNetworkWatcher is a goroutine which accepts async notification // requests for the broadcast of a target transaction, and then dispatches the // transaction once its seen on the Bitcoin network. func (n *networkHarness) networkWatcher() { seenTxns := make(map[chainhash.Hash]struct{}) clients := make(map[chainhash.Hash][]chan struct{}) for { select { case req := <-n.bitcoinWatchRequests: // If we've already seen this transaction, then // immediately dispatch the request. Otherwise, append // to the list of clients who are watching for the // broadcast of this transaction. if _, ok := seenTxns[req.txid]; ok { close(req.eventChan) } else { clients[req.txid] = append(clients[req.txid], req.eventChan) } case txid := <-n.seenTxns: // Add this txid to our set of "seen" transactions. So // we're able to dispatch any notifications for this // txid which arrive *after* it's seen within the // network. seenTxns[txid] = struct{}{} // If there isn't a registered notification for this // transaction then ignore it. txClients, ok := clients[txid] if !ok { continue } // Otherwise, dispatch the notification to all clients, // cleaning up the now un-needed state. for _, client := range txClients { close(client) } delete(clients, txid) } } } // OnTxAccepted is a callback to be called each time a new transaction has been // broadcast on the network. func (n *networkHarness) OnTxAccepted(hash *chainhash.Hash, amt btcutil.Amount) { go func() { n.seenTxns <- *hash }() } // WaitForTxBroadcast blocks until the target txid is seen on the network. If // the transaction isn't seen within the network before the passed timeout, // then an error is returned. // TODO(roasbeef): add another method which creates queue of all seen transactions func (n *networkHarness) WaitForTxBroadcast(ctx context.Context, txid chainhash.Hash) error { eventChan := make(chan struct{}) n.bitcoinWatchRequests <- &txWatchRequest{ txid: txid, eventChan: eventChan, } select { case <-eventChan: return nil case <-ctx.Done(): return fmt.Errorf("tx not seen before context timeout") } } // OpenChannel attempts to open a channel between srcNode and destNode with the // passed channel funding parameters. If the passed context has a timeout, then // if the timeout is reached before the channel pending notification is // received, an error is returned. func (n *networkHarness) OpenChannel(ctx context.Context, srcNode, destNode *lightningNode, amt btcutil.Amount, pushAmt btcutil.Amount) (lnrpc.Lightning_OpenChannelClient, error) { // Wait until srcNode and destNode have the latest chain synced. // Otherwise, we may run into a check within the funding manager that // prevents any funding workflows from being kicked off if the chain // isn't yet synced. if err := srcNode.WaitForBlockchainSync(ctx); err != nil { return nil, fmt.Errorf("Unable to sync srcNode chain: %v", err) } if err := destNode.WaitForBlockchainSync(ctx); err != nil { return nil, fmt.Errorf("Unable to sync destNode chain: %v", err) } openReq := &lnrpc.OpenChannelRequest{ NodePubkey: destNode.PubKey[:], LocalFundingAmount: int64(amt), PushSat: int64(pushAmt), } respStream, err := srcNode.OpenChannel(ctx, openReq) if err != nil { return nil, fmt.Errorf("unable to open channel between "+ "alice and bob: %v", err) } chanOpen := make(chan struct{}) errChan := make(chan error) go func() { // Consume the "channel pending" update. This waits until the node // notifies us that the final message in the channel funding workflow // has been sent to the remote node. resp, err := respStream.Recv() if err != nil { errChan <- err return } if _, ok := resp.Update.(*lnrpc.OpenStatusUpdate_ChanPending); !ok { errChan <- fmt.Errorf("expected channel pending update, "+ "instead got %v", resp) return } close(chanOpen) }() select { case <-ctx.Done(): return nil, fmt.Errorf("timeout reached before chan pending "+ "update sent: %v", err) case err := <-errChan: return nil, err case <-chanOpen: return respStream, nil } } // OpenPendingChannel attempts to open a channel between srcNode and destNode with the // passed channel funding parameters. If the passed context has a timeout, then // if the timeout is reached before the channel pending notification is // received, an error is returned. func (n *networkHarness) OpenPendingChannel(ctx context.Context, srcNode, destNode *lightningNode, amt btcutil.Amount, pushAmt btcutil.Amount) (*lnrpc.PendingUpdate, error) { // Wait until srcNode and destNode have blockchain synced if err := srcNode.WaitForBlockchainSync(ctx); err != nil { return nil, fmt.Errorf("Unable to sync srcNode chain: %v", err) } if err := destNode.WaitForBlockchainSync(ctx); err != nil { return nil, fmt.Errorf("Unable to sync destNode chain: %v", err) } openReq := &lnrpc.OpenChannelRequest{ NodePubkey: destNode.PubKey[:], LocalFundingAmount: int64(amt), PushSat: int64(pushAmt), } respStream, err := srcNode.OpenChannel(ctx, openReq) if err != nil { return nil, fmt.Errorf("unable to open channel between "+ "alice and bob: %v", err) } chanPending := make(chan *lnrpc.PendingUpdate) errChan := make(chan error) go func() { // Consume the "channel pending" update. This waits until the node // notifies us that the final message in the channel funding workflow // has been sent to the remote node. resp, err := respStream.Recv() if err != nil { errChan <- err return } pendingResp, ok := resp.Update.(*lnrpc.OpenStatusUpdate_ChanPending) if !ok { errChan <- fmt.Errorf("expected channel pending update, "+ "instead got %v", resp) return } chanPending <- pendingResp.ChanPending }() select { case <-ctx.Done(): return nil, fmt.Errorf("timeout reached before chan pending " + "update sent") case err := <-errChan: return nil, err case pendingChan := <-chanPending: return pendingChan, nil } } // WaitForChannelOpen waits for a notification that a channel is open by // consuming a message from the past open channel stream. If the passed context // has a timeout, then if the timeout is reached before the channel has been // opened, then an error is returned. func (n *networkHarness) WaitForChannelOpen(ctx context.Context, openChanStream lnrpc.Lightning_OpenChannelClient) (*lnrpc.ChannelPoint, error) { errChan := make(chan error) respChan := make(chan *lnrpc.ChannelPoint) go func() { resp, err := openChanStream.Recv() if err != nil { errChan <- fmt.Errorf("unable to read rpc resp: %v", err) return } fundingResp, ok := resp.Update.(*lnrpc.OpenStatusUpdate_ChanOpen) if !ok { errChan <- fmt.Errorf("expected channel open update, "+ "instead got %v", resp) return } respChan <- fundingResp.ChanOpen.ChannelPoint }() select { case <-ctx.Done(): return nil, fmt.Errorf("timeout reached while waiting for " + "channel open") case err := <-errChan: return nil, err case chanPoint := <-respChan: return chanPoint, nil } } // CloseChannel close channel attempts to close the channel indicated by the // passed channel point, initiated by the passed lnNode. If the passed context // has a timeout, then if the timeout is reached before the channel close is // pending, then an error is returned. func (n *networkHarness) CloseChannel(ctx context.Context, lnNode *lightningNode, cp *lnrpc.ChannelPoint, force bool) (lnrpc.Lightning_CloseChannelClient, *chainhash.Hash, error) { closeReq := &lnrpc.CloseChannelRequest{ ChannelPoint: cp, Force: force, } closeRespStream, err := lnNode.CloseChannel(ctx, closeReq) if err != nil { return nil, nil, fmt.Errorf("unable to close channel: %v", err) } errChan := make(chan error) fin := make(chan *chainhash.Hash) go func() { // Consume the "channel close" update in order to wait for the closing // transaction to be broadcast, then wait for the closing tx to be seen // within the network. closeResp, err := closeRespStream.Recv() if err != nil { errChan <- err return } pendingClose, ok := closeResp.Update.(*lnrpc.CloseStatusUpdate_ClosePending) if !ok { errChan <- fmt.Errorf("expected channel close update, "+ "instead got %v", pendingClose) return } closeTxid, err := chainhash.NewHash(pendingClose.ClosePending.Txid) if err != nil { errChan <- err return } if err := n.WaitForTxBroadcast(ctx, *closeTxid); err != nil { errChan <- err return } fin <- closeTxid }() // Wait until either the deadline for the context expires, an error // occurs, or the channel close update is received. select { case <-ctx.Done(): return nil, nil, fmt.Errorf("timeout reached before channel close " + "initiated") case err := <-errChan: return nil, nil, err case closeTxid := <-fin: return closeRespStream, closeTxid, nil } } // WaitForChannelClose waits for a notification from the passed channel close // stream that the node has deemed the channel has been fully closed. If the // passed context has a timeout, then if the timeout is reached before the // notification is received then an error is returned. func (n *networkHarness) WaitForChannelClose(ctx context.Context, closeChanStream lnrpc.Lightning_CloseChannelClient) (*chainhash.Hash, error) { errChan := make(chan error) updateChan := make(chan *lnrpc.CloseStatusUpdate_ChanClose) go func() { closeResp, err := closeChanStream.Recv() if err != nil { errChan <- err return } closeFin, ok := closeResp.Update.(*lnrpc.CloseStatusUpdate_ChanClose) if !ok { errChan <- fmt.Errorf("expected channel close update, "+ "instead got %v", closeFin) return } updateChan <- closeFin }() // Wait until either the deadline for the context expires, an error // occurs, or the channel close update is received. select { case <-ctx.Done(): return nil, fmt.Errorf("timeout reached before update sent") case err := <-errChan: return nil, err case update := <-updateChan: return chainhash.NewHash(update.ChanClose.ClosingTxid) } } // AssertChannelExists asserts that an active channel identified by // channelPoint is known to exist from the point-of-view of node.. func (n *networkHarness) AssertChannelExists(ctx context.Context, node *lightningNode, chanPoint *wire.OutPoint) error { req := &lnrpc.ListChannelsRequest{} resp, err := node.ListChannels(ctx, req) if err != nil { return fmt.Errorf("unable fetch node's channels: %v", err) } for _, channel := range resp.Channels { if channel.ChannelPoint == chanPoint.String() { return nil } } return fmt.Errorf("channel not found") } // DumpLogs reads the current logs generated by the passed node, and returns // the logs as a single string. This function is useful for examining the logs // of a particular node in the case of a test failure. // Logs from lightning node being generated with delay - you should // add time.Sleep() in order to get all logs. func (n *networkHarness) DumpLogs(node *lightningNode) (string, error) { logFile := fmt.Sprintf("%v/simnet/lnd.log", node.cfg.LogDir) buf, err := ioutil.ReadFile(logFile) if err != nil { return "", err } return string(buf), nil } // SendCoins attempts to send amt satoshis from the internal mining node to the // targeted lightning node. func (n *networkHarness) SendCoins(ctx context.Context, amt btcutil.Amount, target *lightningNode) error { balReq := &lnrpc.WalletBalanceRequest{} initialBalance, err := target.WalletBalance(ctx, balReq) if err != nil { return err } // First, obtain an address from the target lightning node, preferring // to receive a p2wkh address s.t the output can immediately be used as // an input to a funding transaction. addrReq := &lnrpc.NewAddressRequest{ Type: lnrpc.NewAddressRequest_WITNESS_PUBKEY_HASH, } resp, err := target.NewAddress(ctx, addrReq) if err != nil { return err } addr, err := btcutil.DecodeAddress(resp.Address, n.netParams) if err != nil { return err } addrScript, err := txscript.PayToAddrScript(addr) if err != nil { return err } // Generate a transaction which creates an output to the target // pkScript of the desired amount. output := &wire.TxOut{ PkScript: addrScript, Value: int64(amt), } if _, err := n.Miner.SendOutputs([]*wire.TxOut{output}, 30); err != nil { return err } // Finally, generate 6 new blocks to ensure the output gains a // sufficient number of confirmations. if _, err := n.Miner.Node.Generate(6); err != nil { return err } // Pause until the nodes current wallet balances reflects the amount // sent to it above. // TODO(roasbeef): factor out into helper func for { select { case <-time.Tick(time.Millisecond * 50): currentBal, err := target.WalletBalance(ctx, balReq) if err != nil { return err } if currentBal.Balance == initialBalance.Balance+int64(amt) { return nil } case <-time.After(time.Second * 30): return fmt.Errorf("balances not synced after deadline") } } }