package lntest import ( "bytes" "context" "encoding/hex" "flag" "fmt" "io" "io/ioutil" "net" "os" "os/exec" "path/filepath" "sync" "sync/atomic" "time" "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/integration/rpctest" "github.com/btcsuite/btcd/rpcclient" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/chanbackup" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnrpc/invoicesrpc" "github.com/lightningnetwork/lnd/lnrpc/routerrpc" "github.com/lightningnetwork/lnd/lnrpc/signrpc" "github.com/lightningnetwork/lnd/lnrpc/walletrpc" "github.com/lightningnetwork/lnd/lnrpc/watchtowerrpc" "github.com/lightningnetwork/lnd/lnrpc/wtclientrpc" "github.com/lightningnetwork/lnd/lntest/wait" "github.com/lightningnetwork/lnd/macaroons" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "gopkg.in/macaroon.v2" ) const ( // defaultNodePort is the start of the range for listening ports of // harness nodes. Ports are monotonically increasing starting from this // number and are determined by the results of nextAvailablePort(). defaultNodePort = 5555 // logPubKeyBytes is the number of bytes of the node's PubKey that will // be appended to the log file name. The whole PubKey is too long and // not really necessary to quickly identify what node produced which // log file. logPubKeyBytes = 4 // trickleDelay is the amount of time in milliseconds between each // release of announcements by AuthenticatedGossiper to the network. trickleDelay = 50 // listenerFormat is the format string that is used to generate local // listener addresses. listenerFormat = "127.0.0.1:%d" ) var ( // numActiveNodes is the number of active nodes within the test network. numActiveNodes = 0 numActiveNodesMtx sync.Mutex // lastPort is the last port determined to be free for use by a new // node. It should be used atomically. lastPort uint32 = defaultNodePort // logOutput is a flag that can be set to append the output from the // seed nodes to log files. logOutput = flag.Bool("logoutput", false, "log output from node n to file output-n.log") // logSubDir is the default directory where the logs are written to if // logOutput is true. logSubDir = flag.String("logdir", ".", "default dir to write logs to") // goroutineDump is a flag that can be set to dump the active // goroutines of test nodes on failure. goroutineDump = flag.Bool("goroutinedump", false, "write goroutine dump from node n to file pprof-n.log") // btcdExecutable is the full path to the btcd binary. btcdExecutable = flag.String( "btcdexec", "", "full path to btcd binary", ) ) // nextAvailablePort returns the first port that is available for listening by // a new node. It panics if no port is found and the maximum available TCP port // is reached. func nextAvailablePort() int { port := atomic.AddUint32(&lastPort, 1) for port < 65535 { // If there are no errors while attempting to listen on this // port, close the socket and return it as available. While it // could be the case that some other process picks up this port // between the time the socket is closed and it's reopened in // the harness node, in practice in CI servers this seems much // less likely than simply some other process already being // bound at the start of the tests. addr := fmt.Sprintf(listenerFormat, port) l, err := net.Listen("tcp4", addr) if err == nil { err := l.Close() if err == nil { return int(port) } } port = atomic.AddUint32(&lastPort, 1) } // No ports available? Must be a mistake. panic("no ports available for listening") } // ApplyPortOffset adds the given offset to the lastPort variable, making it // possible to run the tests in parallel without colliding on the same ports. func ApplyPortOffset(offset uint32) { _ = atomic.AddUint32(&lastPort, offset) } // GetLogDir returns the passed --logdir flag or the default value if it wasn't // set. func GetLogDir() string { if logSubDir != nil && *logSubDir != "" { return *logSubDir } return "." } // GetBtcdBinary returns the full path to the binary of the custom built btcd // executable or an empty string if none is set. func GetBtcdBinary() string { if btcdExecutable != nil { return *btcdExecutable } return "" } // GenerateBtcdListenerAddresses is a function that returns two listener // addresses with unique ports and should be used to overwrite rpctest's default // generator which is prone to use colliding ports. func GenerateBtcdListenerAddresses() (string, string) { return fmt.Sprintf(listenerFormat, nextAvailablePort()), fmt.Sprintf(listenerFormat, nextAvailablePort()) } // generateListeningPorts returns four ints representing ports to listen on // designated for the current lightning network test. This returns the next // available ports for the p2p, rpc, rest and profiling services. func generateListeningPorts() (int, int, int, int) { p2p := nextAvailablePort() rpc := nextAvailablePort() rest := nextAvailablePort() profile := nextAvailablePort() return p2p, rpc, rest, profile } // BackendConfig is an interface that abstracts away the specific chain backend // node implementation. type BackendConfig interface { // GenArgs returns the arguments needed to be passed to LND at startup // for using this node as a chain backend. GenArgs() []string // ConnectMiner is called to establish a connection to the test miner. ConnectMiner() error // DisconnectMiner is called to disconnect the miner. DisconnectMiner() error // Name returns the name of the backend type. Name() string } type NodeConfig struct { Name string // LogFilenamePrefix is is used to prefix node log files. Can be used // to store the current test case for simpler postmortem debugging. LogFilenamePrefix string BackendCfg BackendConfig NetParams *chaincfg.Params BaseDir string ExtraArgs []string DataDir string LogDir string TLSCertPath string TLSKeyPath string AdminMacPath string ReadMacPath string InvoiceMacPath string HasSeed bool Password []byte P2PPort int RPCPort int RESTPort int ProfilePort int AcceptKeySend bool FeeURL string Etcd bool } func (cfg NodeConfig) P2PAddr() string { return fmt.Sprintf(listenerFormat, cfg.P2PPort) } func (cfg NodeConfig) RPCAddr() string { return fmt.Sprintf(listenerFormat, cfg.RPCPort) } func (cfg NodeConfig) RESTAddr() string { return fmt.Sprintf(listenerFormat, cfg.RESTPort) } // DBDir returns the holding directory path of the graph database. func (cfg NodeConfig) DBDir() string { return filepath.Join(cfg.DataDir, "graph", cfg.NetParams.Name) } func (cfg NodeConfig) DBPath() string { return filepath.Join(cfg.DBDir(), "channel.db") } func (cfg NodeConfig) ChanBackupPath() string { return filepath.Join( cfg.DataDir, "chain", "bitcoin", fmt.Sprintf( "%v/%v", cfg.NetParams.Name, chanbackup.DefaultBackupFileName, ), ) } // genArgs generates a slice of command line arguments from the lightning node // config struct. func (cfg NodeConfig) genArgs() []string { var args []string switch cfg.NetParams { case &chaincfg.TestNet3Params: args = append(args, "--bitcoin.testnet") case &chaincfg.SimNetParams: args = append(args, "--bitcoin.simnet") case &chaincfg.RegressionNetParams: args = append(args, "--bitcoin.regtest") } backendArgs := cfg.BackendCfg.GenArgs() args = append(args, backendArgs...) args = append(args, "--bitcoin.active") args = append(args, "--nobootstrap") args = append(args, "--debuglevel=debug") args = append(args, "--bitcoin.defaultchanconfs=1") args = append(args, fmt.Sprintf("--db.batch-commit-interval=%v", 10*time.Millisecond)) args = append(args, fmt.Sprintf("--bitcoin.defaultremotedelay=%v", DefaultCSV)) args = append(args, fmt.Sprintf("--rpclisten=%v", cfg.RPCAddr())) args = append(args, fmt.Sprintf("--restlisten=%v", cfg.RESTAddr())) args = append(args, fmt.Sprintf("--restcors=https://%v", cfg.RESTAddr())) args = append(args, fmt.Sprintf("--listen=%v", cfg.P2PAddr())) args = append(args, fmt.Sprintf("--externalip=%v", cfg.P2PAddr())) args = append(args, fmt.Sprintf("--logdir=%v", cfg.LogDir)) args = append(args, fmt.Sprintf("--datadir=%v", cfg.DataDir)) args = append(args, fmt.Sprintf("--tlscertpath=%v", cfg.TLSCertPath)) args = append(args, fmt.Sprintf("--tlskeypath=%v", cfg.TLSKeyPath)) args = append(args, fmt.Sprintf("--configfile=%v", cfg.DataDir)) args = append(args, fmt.Sprintf("--adminmacaroonpath=%v", cfg.AdminMacPath)) args = append(args, fmt.Sprintf("--readonlymacaroonpath=%v", cfg.ReadMacPath)) args = append(args, fmt.Sprintf("--invoicemacaroonpath=%v", cfg.InvoiceMacPath)) args = append(args, fmt.Sprintf("--trickledelay=%v", trickleDelay)) args = append(args, fmt.Sprintf("--profile=%d", cfg.ProfilePort)) args = append(args, fmt.Sprintf("--protocol.legacy.no-gossip-throttle")) if !cfg.HasSeed { args = append(args, "--noseedbackup") } if cfg.ExtraArgs != nil { args = append(args, cfg.ExtraArgs...) } if cfg.AcceptKeySend { args = append(args, "--accept-keysend") } if cfg.Etcd { args = append(args, "--db.backend=etcd") args = append(args, "--db.etcd.embedded") } if cfg.FeeURL != "" { args = append(args, "--feeurl="+cfg.FeeURL) } return args } // HarnessNode represents an instance of lnd running within our test network // harness. Each HarnessNode instance also fully embeds an RPC client in // order to pragmatically drive the node. type HarnessNode struct { Cfg *NodeConfig // NodeID is a unique identifier for the node within a NetworkHarness. NodeID int // PubKey is the serialized compressed identity public key of the node. // This field will only be populated once the node itself has been // started via the start() method. PubKey [33]byte PubKeyStr string cmd *exec.Cmd pidFile string logFile *os.File // processExit is a channel that's closed once it's detected that the // process this instance of HarnessNode is bound to has exited. processExit chan struct{} chanWatchRequests chan *chanWatchRequest // For each outpoint, we'll track an integer which denotes the number of // edges seen for that channel within the network. When this number // reaches 2, then it means that both edge advertisements has propagated // through the network. openChans map[wire.OutPoint]int openClients map[wire.OutPoint][]chan struct{} closedChans map[wire.OutPoint]struct{} closeClients map[wire.OutPoint][]chan struct{} quit chan struct{} wg sync.WaitGroup lnrpc.LightningClient lnrpc.WalletUnlockerClient invoicesrpc.InvoicesClient // SignerClient cannot be embedded because the name collisions of the // methods SignMessage and VerifyMessage. SignerClient signrpc.SignerClient // conn is the underlying connection to the grpc endpoint of the node. conn *grpc.ClientConn // RouterClient, WalletKitClient, WatchtowerClient cannot be embedded, // because a name collision would occur with LightningClient. RouterClient routerrpc.RouterClient WalletKitClient walletrpc.WalletKitClient Watchtower watchtowerrpc.WatchtowerClient WatchtowerClient wtclientrpc.WatchtowerClientClient } // Assert *HarnessNode implements the lnrpc.LightningClient interface. var _ lnrpc.LightningClient = (*HarnessNode)(nil) var _ lnrpc.WalletUnlockerClient = (*HarnessNode)(nil) var _ invoicesrpc.InvoicesClient = (*HarnessNode)(nil) // newNode creates a new test lightning node instance from the passed config. func newNode(cfg NodeConfig) (*HarnessNode, error) { if cfg.BaseDir == "" { var err error cfg.BaseDir, err = ioutil.TempDir("", "lndtest-node") if err != nil { return nil, err } } cfg.DataDir = filepath.Join(cfg.BaseDir, "data") cfg.LogDir = filepath.Join(cfg.BaseDir, "log") cfg.TLSCertPath = filepath.Join(cfg.DataDir, "tls.cert") cfg.TLSKeyPath = filepath.Join(cfg.DataDir, "tls.key") networkDir := filepath.Join( cfg.DataDir, "chain", "bitcoin", cfg.NetParams.Name, ) cfg.AdminMacPath = filepath.Join(networkDir, "admin.macaroon") cfg.ReadMacPath = filepath.Join(networkDir, "readonly.macaroon") cfg.InvoiceMacPath = filepath.Join(networkDir, "invoice.macaroon") cfg.P2PPort, cfg.RPCPort, cfg.RESTPort, cfg.ProfilePort = generateListeningPorts() // Run all tests with accept keysend. The keysend code is very isolated // and it is highly unlikely that it would affect regular itests when // enabled. cfg.AcceptKeySend = true numActiveNodesMtx.Lock() nodeNum := numActiveNodes numActiveNodes++ numActiveNodesMtx.Unlock() return &HarnessNode{ Cfg: &cfg, NodeID: nodeNum, chanWatchRequests: make(chan *chanWatchRequest), openChans: make(map[wire.OutPoint]int), openClients: make(map[wire.OutPoint][]chan struct{}), closedChans: make(map[wire.OutPoint]struct{}), closeClients: make(map[wire.OutPoint][]chan struct{}), }, nil } // NewMiner creates a new miner using btcd backend. The logDir specifies the // miner node's log dir. When tests finished, during clean up, its logs are // copied to a file specified as logFilename. func NewMiner(logDir, logFilename string, netParams *chaincfg.Params, handler *rpcclient.NotificationHandlers, btcdBinary string) (*rpctest.Harness, func() error, error) { args := []string{ "--rejectnonstd", "--txindex", "--nowinservice", "--nobanning", "--debuglevel=debug", "--logdir=" + logDir, "--trickleinterval=100ms", } miner, err := rpctest.New(netParams, handler, args, btcdBinary) if err != nil { return nil, nil, fmt.Errorf( "unable to create mining node: %v", err, ) } cleanUp := func() error { if err := miner.TearDown(); err != nil { return fmt.Errorf( "failed to tear down miner, got error: %s", err, ) } // After shutting down the miner, we'll make a copy of the log // file before deleting the temporary log dir. logFile := fmt.Sprintf("%s/%s/btcd.log", logDir, netParams.Name) copyPath := fmt.Sprintf("%s/../%s", logDir, logFilename) err := CopyFile(filepath.Clean(copyPath), logFile) if err != nil { return fmt.Errorf("unable to copy file: %v", err) } if err = os.RemoveAll(logDir); err != nil { return fmt.Errorf( "cannot remove dir %s: %v", logDir, err, ) } return nil } return miner, cleanUp, nil } // DBPath returns the filepath to the channeldb database file for this node. func (hn *HarnessNode) DBPath() string { return hn.Cfg.DBPath() } // DBDir returns the path for the directory holding channeldb file(s). func (hn *HarnessNode) DBDir() string { return hn.Cfg.DBDir() } // Name returns the name of this node set during initialization. func (hn *HarnessNode) Name() string { return hn.Cfg.Name } // TLSCertStr returns the path where the TLS certificate is stored. func (hn *HarnessNode) TLSCertStr() string { return hn.Cfg.TLSCertPath } // TLSKeyStr returns the path where the TLS key is stored. func (hn *HarnessNode) TLSKeyStr() string { return hn.Cfg.TLSKeyPath } // ChanBackupPath returns the fielpath to the on-disk channels.backup file for // this node. func (hn *HarnessNode) ChanBackupPath() string { return hn.Cfg.ChanBackupPath() } // AdminMacPath returns the filepath to the admin.macaroon file for this node. func (hn *HarnessNode) AdminMacPath() string { return hn.Cfg.AdminMacPath } // ReadMacPath returns the filepath to the readonly.macaroon file for this node. func (hn *HarnessNode) ReadMacPath() string { return hn.Cfg.ReadMacPath } // InvoiceMacPath returns the filepath to the invoice.macaroon file for this // node. func (hn *HarnessNode) InvoiceMacPath() string { return hn.Cfg.InvoiceMacPath } // Start launches a new process running lnd. Additionally, the PID of the // launched process is saved in order to possibly kill the process forcibly // later. // // This may not clean up properly if an error is returned, so the caller should // call shutdown() regardless of the return value. func (hn *HarnessNode) start(lndBinary string, lndError chan<- error) error { hn.quit = make(chan struct{}) args := hn.Cfg.genArgs() hn.cmd = exec.Command(lndBinary, args...) // Redirect stderr output to buffer var errb bytes.Buffer hn.cmd.Stderr = &errb // Make sure the log file cleanup function is initialized, even // if no log file is created. var finalizeLogfile = func() { if hn.logFile != nil { hn.logFile.Close() } } // If the logoutput flag is passed, redirect output from the nodes to // log files. if *logOutput { dir := GetLogDir() fileName := fmt.Sprintf("%s/%d-%s-%s-%s.log", dir, hn.NodeID, hn.Cfg.LogFilenamePrefix, hn.Cfg.Name, hex.EncodeToString(hn.PubKey[:logPubKeyBytes])) // If the node's PubKey is not yet initialized, create a // temporary file name. Later, after the PubKey has been // initialized, the file can be moved to its final name with // the PubKey included. if bytes.Equal(hn.PubKey[:4], []byte{0, 0, 0, 0}) { fileName = fmt.Sprintf("%s/%d-%s-%s-tmp__.log", dir, hn.NodeID, hn.Cfg.LogFilenamePrefix, hn.Cfg.Name) // Once the node has done its work, the log file can be // renamed. finalizeLogfile = func() { if hn.logFile != nil { hn.logFile.Close() pubKeyHex := hex.EncodeToString( hn.PubKey[:logPubKeyBytes], ) newFileName := fmt.Sprintf("%s/"+ "%d-%s-%s-%s.log", dir, hn.NodeID, hn.Cfg.LogFilenamePrefix, hn.Cfg.Name, pubKeyHex) err := os.Rename(fileName, newFileName) if err != nil { fmt.Printf("could not rename "+ "%s to %s: %v\n", fileName, newFileName, err) } } } } // Create file if not exists, otherwise append. file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) if err != nil { return err } // Pass node's stderr to both errb and the file. w := io.MultiWriter(&errb, file) hn.cmd.Stderr = w // Pass the node's stdout only to the file. hn.cmd.Stdout = file // Let the node keep a reference to this file, such // that we can add to it if necessary. hn.logFile = file } if err := hn.cmd.Start(); err != nil { return err } // Launch a new goroutine which that bubbles up any potential fatal // process errors to the goroutine running the tests. hn.processExit = make(chan struct{}) hn.wg.Add(1) go func() { defer hn.wg.Done() err := hn.cmd.Wait() if err != nil { lndError <- errors.Errorf("%v\n%v\n", err, errb.String()) } // Signal any onlookers that this process has exited. close(hn.processExit) // Make sure log file is closed and renamed if necessary. finalizeLogfile() }() // Write process ID to a file. if err := hn.writePidFile(); err != nil { hn.cmd.Process.Kill() return err } // Since Stop uses the LightningClient to stop the node, if we fail to get a // connected client, we have to kill the process. useMacaroons := !hn.Cfg.HasSeed conn, err := hn.ConnectRPC(useMacaroons) if err != nil { hn.cmd.Process.Kill() return err } // If the node was created with a seed, we will need to perform an // additional step to unlock the wallet. The connection returned will // only use the TLS certs, and can only perform operations necessary to // unlock the daemon. if hn.Cfg.HasSeed { hn.WalletUnlockerClient = lnrpc.NewWalletUnlockerClient(conn) return nil } return hn.initLightningClient(conn) } // initClientWhenReady waits until the main gRPC server is detected as active, // then complete the normal HarnessNode gRPC connection creation. This can be // used it a node has just been unlocked, or has its wallet state initialized. func (hn *HarnessNode) initClientWhenReady() error { var ( conn *grpc.ClientConn connErr error ) if err := wait.NoError(func() error { conn, connErr = hn.ConnectRPC(true) return connErr }, DefaultTimeout); err != nil { return err } return hn.initLightningClient(conn) } // Init initializes a harness node by passing the init request via rpc. After // the request is submitted, this method will block until a // macaroon-authenticated RPC connection can be established to the harness node. // Once established, the new connection is used to initialize the // LightningClient and subscribes the HarnessNode to topology changes. func (hn *HarnessNode) Init(ctx context.Context, initReq *lnrpc.InitWalletRequest) (*lnrpc.InitWalletResponse, error) { ctxt, cancel := context.WithTimeout(ctx, DefaultTimeout) defer cancel() response, err := hn.InitWallet(ctxt, initReq) if err != nil { return nil, err } // Wait for the wallet to finish unlocking, such that we can connect to // it via a macaroon-authenticated rpc connection. var conn *grpc.ClientConn if err = wait.Predicate(func() bool { // If the node has been initialized stateless, we need to pass // the macaroon to the client. if initReq.StatelessInit { adminMac := &macaroon.Macaroon{} err := adminMac.UnmarshalBinary(response.AdminMacaroon) if err != nil { return false } conn, err = hn.ConnectRPCWithMacaroon(adminMac) return err == nil } // Normal initialization, we expect a macaroon to be in the // file system. conn, err = hn.ConnectRPC(true) return err == nil }, DefaultTimeout); err != nil { return nil, err } return response, hn.initLightningClient(conn) } // InitChangePassword initializes a harness node by passing the change password // request via RPC. After the request is submitted, this method will block until // a macaroon-authenticated RPC connection can be established to the harness // node. Once established, the new connection is used to initialize the // LightningClient and subscribes the HarnessNode to topology changes. func (hn *HarnessNode) InitChangePassword(ctx context.Context, chngPwReq *lnrpc.ChangePasswordRequest) (*lnrpc.ChangePasswordResponse, error) { ctxt, cancel := context.WithTimeout(ctx, DefaultTimeout) defer cancel() response, err := hn.ChangePassword(ctxt, chngPwReq) if err != nil { return nil, err } // Wait for the wallet to finish unlocking, such that we can connect to // it via a macaroon-authenticated rpc connection. var conn *grpc.ClientConn if err = wait.Predicate(func() bool { // If the node has been initialized stateless, we need to pass // the macaroon to the client. if chngPwReq.StatelessInit { adminMac := &macaroon.Macaroon{} err := adminMac.UnmarshalBinary(response.AdminMacaroon) if err != nil { return false } conn, err = hn.ConnectRPCWithMacaroon(adminMac) return err == nil } // Normal initialization, we expect a macaroon to be in the // file system. conn, err = hn.ConnectRPC(true) return err == nil }, DefaultTimeout); err != nil { return nil, err } return response, hn.initLightningClient(conn) } // Unlock attempts to unlock the wallet of the target HarnessNode. This method // should be called after the restart of a HarnessNode that was created with a // seed+password. Once this method returns, the HarnessNode will be ready to // accept normal gRPC requests and harness command. func (hn *HarnessNode) Unlock(ctx context.Context, unlockReq *lnrpc.UnlockWalletRequest) error { ctxt, _ := context.WithTimeout(ctx, DefaultTimeout) // Otherwise, we'll need to unlock the node before it's able to start // up properly. if _, err := hn.UnlockWallet(ctxt, unlockReq); err != nil { return err } // Now that the wallet has been unlocked, we'll wait for the RPC client // to be ready, then establish the normal gRPC connection. return hn.initClientWhenReady() } // initLightningClient constructs the grpc LightningClient from the given client // connection and subscribes the harness node to graph topology updates. // This method also spawns a lightning network watcher for this node, // which watches for topology changes. func (hn *HarnessNode) initLightningClient(conn *grpc.ClientConn) error { // Construct the LightningClient that will allow us to use the // HarnessNode directly for normal rpc operations. hn.conn = conn hn.LightningClient = lnrpc.NewLightningClient(conn) hn.InvoicesClient = invoicesrpc.NewInvoicesClient(conn) hn.RouterClient = routerrpc.NewRouterClient(conn) hn.WalletKitClient = walletrpc.NewWalletKitClient(conn) hn.Watchtower = watchtowerrpc.NewWatchtowerClient(conn) hn.WatchtowerClient = wtclientrpc.NewWatchtowerClientClient(conn) hn.SignerClient = signrpc.NewSignerClient(conn) // Set the harness node's pubkey to what the node claims in GetInfo. err := hn.FetchNodeInfo() if err != nil { return err } // Due to a race condition between the ChannelRouter starting and us // making the subscription request, it's possible for our graph // subscription to fail. To ensure we don't start listening for updates // until then, we'll create a dummy subscription to ensure we can do so // successfully before proceeding. We use a dummy subscription in order // to not consume an update from the real one. err = wait.NoError(func() error { req := &lnrpc.GraphTopologySubscription{} ctx, cancelFunc := context.WithCancel(context.Background()) topologyClient, err := hn.SubscribeChannelGraph(ctx, req) if err != nil { return err } // We'll wait to receive an error back within a one second // timeout. This is needed since creating the client's stream is // independent of the graph subscription being created. The // stream is closed from the server's side upon an error. errChan := make(chan error, 1) go func() { if _, err := topologyClient.Recv(); err != nil { errChan <- err } }() select { case err = <-errChan: case <-time.After(time.Second): } cancelFunc() return err }, DefaultTimeout) if err != nil { return err } // Launch the watcher that will hook into graph related topology change // from the PoV of this node. hn.wg.Add(1) subscribed := make(chan error) go hn.lightningNetworkWatcher(subscribed) return <-subscribed } // FetchNodeInfo queries an unlocked node to retrieve its public key. func (hn *HarnessNode) FetchNodeInfo() error { // Obtain the lnid of this node for quick identification purposes. ctxb := context.Background() info, err := hn.GetInfo(ctxb, &lnrpc.GetInfoRequest{}) if err != nil { return err } hn.PubKeyStr = info.IdentityPubkey pubkey, err := hex.DecodeString(info.IdentityPubkey) if err != nil { return err } copy(hn.PubKey[:], pubkey) return nil } // AddToLog adds a line of choice to the node's logfile. This is useful // to interleave test output with output from the node. func (hn *HarnessNode) AddToLog(line string) error { // If this node was not set up with a log file, just return early. if hn.logFile == nil { return nil } if _, err := hn.logFile.WriteString(line); err != nil { return err } return nil } // writePidFile writes the process ID of the running lnd process to a .pid file. func (hn *HarnessNode) writePidFile() error { filePath := filepath.Join(hn.Cfg.BaseDir, fmt.Sprintf("%v.pid", hn.NodeID)) pid, err := os.Create(filePath) if err != nil { return err } defer pid.Close() _, err = fmt.Fprintf(pid, "%v\n", hn.cmd.Process.Pid) if err != nil { return err } hn.pidFile = filePath return nil } // ReadMacaroon waits a given duration for the macaroon file to be created. If // the file is readable within the timeout, its content is de-serialized as a // macaroon and returned. func (hn *HarnessNode) ReadMacaroon(macPath string, timeout time.Duration) ( *macaroon.Macaroon, error) { // Wait until macaroon file is created and has valid content before // using it. var mac *macaroon.Macaroon err := wait.NoError(func() error { macBytes, err := ioutil.ReadFile(macPath) if err != nil { return fmt.Errorf("error reading macaroon file: %v", err) } newMac := &macaroon.Macaroon{} if err = newMac.UnmarshalBinary(macBytes); err != nil { return fmt.Errorf("error unmarshalling macaroon "+ "file: %v", err) } mac = newMac return nil }, timeout) return mac, err } // ConnectRPCWithMacaroon uses the TLS certificate and given macaroon to // create a gRPC client connection. func (hn *HarnessNode) ConnectRPCWithMacaroon(mac *macaroon.Macaroon) ( *grpc.ClientConn, error) { // Wait until TLS certificate is created and has valid content before // using it, up to 30 sec. var tlsCreds credentials.TransportCredentials err := wait.NoError(func() error { var err error tlsCreds, err = credentials.NewClientTLSFromFile( hn.Cfg.TLSCertPath, "", ) return err }, DefaultTimeout) if err != nil { return nil, fmt.Errorf("error reading TLS cert: %v", err) } opts := []grpc.DialOption{ grpc.WithBlock(), grpc.WithTransportCredentials(tlsCreds), } ctx, cancel := context.WithTimeout(context.Background(), DefaultTimeout) defer cancel() if mac == nil { return grpc.DialContext(ctx, hn.Cfg.RPCAddr(), opts...) } macCred := macaroons.NewMacaroonCredential(mac) opts = append(opts, grpc.WithPerRPCCredentials(macCred)) return grpc.DialContext(ctx, hn.Cfg.RPCAddr(), opts...) } // ConnectRPC uses the TLS certificate and admin macaroon files written by the // lnd node to create a gRPC client connection. func (hn *HarnessNode) ConnectRPC(useMacs bool) (*grpc.ClientConn, error) { // If we don't want to use macaroons, just pass nil, the next method // will handle it correctly. if !useMacs { return hn.ConnectRPCWithMacaroon(nil) } // If we should use a macaroon, always take the admin macaroon as a // default. mac, err := hn.ReadMacaroon(hn.Cfg.AdminMacPath, DefaultTimeout) if err != nil { return nil, err } return hn.ConnectRPCWithMacaroon(mac) } // SetExtraArgs assigns the ExtraArgs field for the node's configuration. The // changes will take effect on restart. func (hn *HarnessNode) SetExtraArgs(extraArgs []string) { hn.Cfg.ExtraArgs = extraArgs } // cleanup cleans up all the temporary files created by the node's process. func (hn *HarnessNode) cleanup() error { return os.RemoveAll(hn.Cfg.BaseDir) } // Stop attempts to stop the active lnd process. func (hn *HarnessNode) stop() error { // Do nothing if the process is not running. if hn.processExit == nil { return nil } // If start() failed before creating a client, we will just wait for the // child process to die. if hn.LightningClient != nil { // Don't watch for error because sometimes the RPC connection gets // closed before a response is returned. req := lnrpc.StopRequest{} ctx := context.Background() hn.LightningClient.StopDaemon(ctx, &req) } // Wait for lnd process and other goroutines to exit. select { case <-hn.processExit: case <-time.After(DefaultTimeout * 2): return fmt.Errorf("process did not exit") } close(hn.quit) hn.wg.Wait() hn.quit = nil hn.processExit = nil hn.LightningClient = nil hn.WalletUnlockerClient = nil hn.Watchtower = nil hn.WatchtowerClient = nil // Close any attempts at further grpc connections. if hn.conn != nil { err := hn.conn.Close() if err != nil { return fmt.Errorf("error attempting to stop grpc client: %v", err) } } return nil } // shutdown stops the active lnd process and cleans up any temporary directories // created along the way. func (hn *HarnessNode) shutdown() error { if err := hn.stop(); err != nil { return err } if err := hn.cleanup(); err != nil { return err } return nil } // closeChanWatchRequest is a request to the lightningNetworkWatcher to be // notified once it's detected within the test Lightning Network, that a // channel has either been added or closed. type chanWatchRequest struct { chanPoint wire.OutPoint chanOpen bool eventChan chan struct{} } // getChanPointFundingTxid returns the given channel point's funding txid in // raw bytes. func getChanPointFundingTxid(chanPoint *lnrpc.ChannelPoint) ([]byte, error) { var txid []byte // A channel point's funding txid can be get/set as a byte slice or a // string. In the case it is a string, decode it. switch chanPoint.GetFundingTxid().(type) { case *lnrpc.ChannelPoint_FundingTxidBytes: txid = chanPoint.GetFundingTxidBytes() case *lnrpc.ChannelPoint_FundingTxidStr: s := chanPoint.GetFundingTxidStr() h, err := chainhash.NewHashFromStr(s) if err != nil { return nil, err } txid = h[:] } return txid, nil } // lightningNetworkWatcher is a goroutine which is able to dispatch // notifications once it has been observed that a target channel has been // closed or opened within the network. In order to dispatch these // notifications, the GraphTopologySubscription client exposed as part of the // gRPC interface is used. func (hn *HarnessNode) lightningNetworkWatcher(subscribed chan error) { defer hn.wg.Done() graphUpdates := make(chan *lnrpc.GraphTopologyUpdate) hn.wg.Add(1) go func() { defer hn.wg.Done() req := &lnrpc.GraphTopologySubscription{} ctx, cancelFunc := context.WithCancel(context.Background()) defer cancelFunc() topologyClient, err := hn.SubscribeChannelGraph(ctx, req) if err != nil { msg := fmt.Sprintf("%s(%d): unable to create topology "+ "client: %v (%s)", hn.Name(), hn.NodeID, err, time.Now().String()) subscribed <- fmt.Errorf(msg) return } close(subscribed) for { update, err := topologyClient.Recv() if err == io.EOF { return } else if err != nil { return } select { case graphUpdates <- update: case <-hn.quit: return } } }() for { select { // A new graph update has just been received, so we'll examine // the current set of registered clients to see if we can // dispatch any requests. case graphUpdate := <-graphUpdates: // For each new channel, we'll increment the number of // edges seen by one. for _, newChan := range graphUpdate.ChannelUpdates { txidHash, _ := getChanPointFundingTxid(newChan.ChanPoint) txid, _ := chainhash.NewHash(txidHash) op := wire.OutPoint{ Hash: *txid, Index: newChan.ChanPoint.OutputIndex, } hn.openChans[op]++ // For this new channel, if the number of edges // seen is less than two, then the channel // hasn't been fully announced yet. if numEdges := hn.openChans[op]; numEdges < 2 { continue } // Otherwise, we'll notify all the registered // clients and remove the dispatched clients. for _, eventChan := range hn.openClients[op] { close(eventChan) } delete(hn.openClients, op) } // For each channel closed, we'll mark that we've // detected a channel closure while lnd was pruning the // channel graph. for _, closedChan := range graphUpdate.ClosedChans { txidHash, _ := getChanPointFundingTxid(closedChan.ChanPoint) txid, _ := chainhash.NewHash(txidHash) op := wire.OutPoint{ Hash: *txid, Index: closedChan.ChanPoint.OutputIndex, } hn.closedChans[op] = struct{}{} // As the channel has been closed, we'll notify // all register clients. for _, eventChan := range hn.closeClients[op] { close(eventChan) } delete(hn.closeClients, op) } // A new watch request, has just arrived. We'll either be able // to dispatch immediately, or need to add the client for // processing later. case watchRequest := <-hn.chanWatchRequests: targetChan := watchRequest.chanPoint // TODO(roasbeef): add update type also, checks for // multiple of 2 if watchRequest.chanOpen { // If this is an open request, then it can be // dispatched if the number of edges seen for // the channel is at least two. if numEdges := hn.openChans[targetChan]; numEdges >= 2 { close(watchRequest.eventChan) continue } // Otherwise, we'll add this to the list of // watch open clients for this out point. hn.openClients[targetChan] = append( hn.openClients[targetChan], watchRequest.eventChan, ) continue } // If this is a close request, then it can be // immediately dispatched if we've already seen a // channel closure for this channel. if _, ok := hn.closedChans[targetChan]; ok { close(watchRequest.eventChan) continue } // Otherwise, we'll add this to the list of close watch // clients for this out point. hn.closeClients[targetChan] = append( hn.closeClients[targetChan], watchRequest.eventChan, ) case <-hn.quit: return } } } // WaitForNetworkChannelOpen will block until a channel with the target // outpoint is seen as being fully advertised within the network. A channel is // considered "fully advertised" once both of its directional edges has been // advertised within the test Lightning Network. func (hn *HarnessNode) WaitForNetworkChannelOpen(ctx context.Context, op *lnrpc.ChannelPoint) error { eventChan := make(chan struct{}) txidHash, err := getChanPointFundingTxid(op) if err != nil { return err } txid, err := chainhash.NewHash(txidHash) if err != nil { return err } hn.chanWatchRequests <- &chanWatchRequest{ chanPoint: wire.OutPoint{ Hash: *txid, Index: op.OutputIndex, }, eventChan: eventChan, chanOpen: true, } select { case <-eventChan: return nil case <-ctx.Done(): return fmt.Errorf("channel not opened before timeout") } } // WaitForNetworkChannelClose will block until a channel with the target // outpoint is seen as closed within the network. A channel is considered // closed once a transaction spending the funding outpoint is seen within a // confirmed block. func (hn *HarnessNode) WaitForNetworkChannelClose(ctx context.Context, op *lnrpc.ChannelPoint) error { eventChan := make(chan struct{}) txidHash, err := getChanPointFundingTxid(op) if err != nil { return err } txid, err := chainhash.NewHash(txidHash) if err != nil { return err } hn.chanWatchRequests <- &chanWatchRequest{ chanPoint: wire.OutPoint{ Hash: *txid, Index: op.OutputIndex, }, eventChan: eventChan, chanOpen: false, } select { case <-eventChan: return nil case <-ctx.Done(): return fmt.Errorf("channel not closed before timeout") } } // WaitForBlockchainSync will block until the target nodes has fully // synchronized with the blockchain. If the passed context object has a set // timeout, then the goroutine will continually poll until the timeout has // elapsed. In the case that the chain isn't synced before the timeout is up, // then this function will return an error. func (hn *HarnessNode) WaitForBlockchainSync(ctx context.Context) error { errChan := make(chan error, 1) retryDelay := time.Millisecond * 100 go func() { for { select { case <-ctx.Done(): case <-hn.quit: return default: } getInfoReq := &lnrpc.GetInfoRequest{} getInfoResp, err := hn.GetInfo(ctx, getInfoReq) if err != nil { errChan <- err return } if getInfoResp.SyncedToChain { errChan <- nil return } select { case <-ctx.Done(): return case <-time.After(retryDelay): } } }() select { case <-hn.quit: return nil case err := <-errChan: return err case <-ctx.Done(): return fmt.Errorf("timeout while waiting for blockchain sync") } } // WaitForBalance waits until the node sees the expected confirmed/unconfirmed // balance within their wallet. func (hn *HarnessNode) WaitForBalance(expectedBalance btcutil.Amount, confirmed bool) error { ctx := context.Background() req := &lnrpc.WalletBalanceRequest{} var lastBalance btcutil.Amount doesBalanceMatch := func() bool { balance, err := hn.WalletBalance(ctx, req) if err != nil { return false } if confirmed { lastBalance = btcutil.Amount(balance.ConfirmedBalance) return btcutil.Amount(balance.ConfirmedBalance) == expectedBalance } lastBalance = btcutil.Amount(balance.UnconfirmedBalance) return btcutil.Amount(balance.UnconfirmedBalance) == expectedBalance } err := wait.Predicate(doesBalanceMatch, DefaultTimeout) if err != nil { return fmt.Errorf("balances not synced after deadline: "+ "expected %v, only have %v", expectedBalance, lastBalance) } return nil }