diff --git a/config.go b/config.go index 1b845270..410719c2 100644 --- a/config.go +++ b/config.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "net" "os" "path/filepath" "sort" @@ -9,6 +10,9 @@ import ( "strings" flags "github.com/btcsuite/go-flags" + "github.com/lightningnetwork/lnd/brontide" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcutil" ) @@ -302,3 +306,12 @@ func supportedSubsystems() []string { sort.Strings(subsystems) return subsystems } + +// noiseDial is a factory function which creates a connmgr compliant dialing +// function by returning a closure which includes the server's identity key. +func noiseDial(idPriv *btcec.PrivateKey) func(net.Addr) (net.Conn, error) { + return func(a net.Addr) (net.Conn, error) { + lnAddr := a.(*lnwire.NetAddress) + return brontide.Dial(idPriv, lnAddr) + } +} diff --git a/fundingmanager.go b/fundingmanager.go index 5682888d..0cbc5912 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -337,12 +337,15 @@ func (f *fundingManager) handleFundingRequest(fmsg *fundingRequestMsg) { ourDustLimit := lnwallet.DefaultDustLimit() theirDustlimit := msg.DustLimit + // Attempt to initialize a reservation within the wallet. If the wallet // has insufficient resources to create the channel, then the reservation // attempt may be rejected. Note that since we're on the responding // side of a single funder workflow, we don't commit any funds to the // channel ourselves. // TODO(roasbeef): passing num confs 1 is irrelevant here, make signed? + // TODO(roasbeef): assuming this was an inbound connection, replace + // port with default advertised port reservation, err := f.wallet.InitChannelReservation(amt, 0, fmsg.peer.addr.IdentityKey, fmsg.peer.addr.Address, 1, delay, ourDustLimit) diff --git a/log.go b/log.go index 86beb0ab..86abafc9 100644 --- a/log.go +++ b/log.go @@ -4,6 +4,7 @@ import ( "fmt" "os" + "github.com/btcsuite/btcd/connmgr" "github.com/btcsuite/btclog" "github.com/btcsuite/seelog" "github.com/lightningnetwork/lnd/chainntnfs" @@ -28,6 +29,7 @@ var ( hswcLog = btclog.Disabled utxnLog = btclog.Disabled brarLog = btclog.Disabled + cmgrLog = btclog.Disabled ) // subsystemLoggers maps each subsystem identifier to its associated logger. @@ -43,6 +45,7 @@ var subsystemLoggers = map[string]btclog.Logger{ "HSWC": hswcLog, "UTXN": utxnLog, "BRAR": brarLog, + "CMGR": cmgrLog, } // useLogger updates the logger references for subsystemID to logger. Invalid @@ -89,6 +92,10 @@ func useLogger(subsystemID string, logger btclog.Logger) { case "BRAR": brarLog = logger + + case "CMGR": + cmgrLog = logger + connmgr.UseLogger(logger) } } diff --git a/peer.go b/peer.go index 1e4f2bc2..d2c620c5 100644 --- a/peer.go +++ b/peer.go @@ -11,6 +11,7 @@ import ( "sync/atomic" "time" + "github.com/btcsuite/btcd/connmgr" "github.com/btcsuite/fastsha256" "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lightning-onion" @@ -65,7 +66,8 @@ type peer struct { connected int32 disconnect int32 - conn net.Conn + connReq *connmgr.ConnReq + conn net.Conn addr *lnwire.NetAddress lightningID wire.ShaHash @@ -318,6 +320,10 @@ func (p *peer) Disconnect() { close(p.quit) + if p.connReq != nil { + p.server.connMgr.Disconnect(p.connReq.ID()) + } + // Launch a goroutine to clean up the remaining resources. go func() { // Tell the switch to unregister all links associated with this @@ -762,6 +768,8 @@ func (p *peer) executeCooperativeClose(channel *lnwallet.LightningChannel) (*wir // handleLocalClose kicks-off the workflow to execute a cooperative or forced // unilateral closure of the channel initiated by a local sub-system. +// TODO(roasbeef): if no more active channels with peer call Remove on connMgr +// with peerID func (p *peer) handleLocalClose(req *closeLinkReq) { var ( err error diff --git a/server.go b/server.go index 11229f02..7690a64f 100644 --- a/server.go +++ b/server.go @@ -5,7 +5,9 @@ import ( "net" "sync" "sync/atomic" + "time" + "github.com/btcsuite/btcd/connmgr" "github.com/btcsuite/fastsha256" "github.com/lightningnetwork/lightning-onion" "github.com/lightningnetwork/lnd/brontide" @@ -37,8 +39,9 @@ type server struct { // long-term identity private key. lightningID [32]byte - listeners []net.Listener - peers map[int32]*peer + peersMtx sync.RWMutex + peersByID map[int32]*peer + peersByPub map[string]*peer rpcServer *rpcServer @@ -60,6 +63,12 @@ type server struct { sphinx *sphinx.Router + connMgr *connmgr.ConnManager + + pendingConnMtx sync.RWMutex + persistentConnReqs map[string]*connmgr.ConnReq + pendingConnRequests map[string]*connectPeerMsg + newPeers chan *peer donePeers chan *peer queries chan interface{} @@ -104,13 +113,17 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier, sphinx: sphinx.NewRouter(privKey, activeNetParams.Params), lightningID: fastsha256.Sum256(serializedPubKey), - listeners: listeners, + pendingConnRequests: make(map[string]*connectPeerMsg), + persistentConnReqs: make(map[string]*connmgr.ConnReq), - peers: make(map[int32]*peer), - newPeers: make(chan *peer, 100), - donePeers: make(chan *peer, 100), - queries: make(chan interface{}), - quit: make(chan struct{}), + peersByID: make(map[int32]*peer), + peersByPub: make(map[string]*peer), + + newPeers: make(chan *peer, 10), + donePeers: make(chan *peer, 10), + + queries: make(chan interface{}), + quit: make(chan struct{}), } // If the debug HTLC flag is on, then we invoice a "master debug" @@ -137,7 +150,7 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier, } var targetPeer *peer - for _, peer := range s.peers { // TODO: threadsafe api + for _, peer := range s.peersByID { // TODO: threadsafe API nodePub := peer.addr.IdentityKey.SerializeCompressed() nodeVertex := graph.NewVertex(nodePub[:]) @@ -157,6 +170,7 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier, return nil } s.routingMgr = routing.NewRoutingManager(graph.NewVertex(selfVertex), routingMgrConfig) + s.htlcSwitch = newHtlcSwitch(serializedPubKey, s.routingMgr) s.rpcServer = newRpcServer(s) @@ -168,6 +182,57 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier, // TODO(roasbeef): introduce closure and config system to decouple the // initialization above ^ + // Create the connection manager which will be responsible for + // maintaining persistent outbound connections and also accepting new + // incoming connections + cmgr, err := connmgr.New(&connmgr.Config{ + Listeners: listeners, + OnAccept: s.inboundPeerConnected, + RetryDuration: time.Second * 5, + TargetOutbound: 100, + GetNewAddress: nil, + Dial: noiseDial(s.identityPriv), + OnConnection: s.outboundPeerConnected, + }) + if err != nil { + return nil, err + } + s.connMgr = cmgr + + // In order to promote liveness of our active channels, instruct the + // connection manager to attempt to establish and maintain persistent + // connections to all our direct channel counter parties. + linkNodes, err := s.chanDB.FetchAllLinkNodes() + if err != nil && err != channeldb.ErrLinkNodesNotFound { + return nil, err + } + for _, node := range linkNodes { + // Create a wrapper address which couples the IP and the pubkey + // so the brontide authenticated connection can be established. + lnAddr := &lnwire.NetAddress{ + IdentityKey: node.IdentityPub, + Address: node.Addresses[0], + } + pubStr := string(node.IdentityPub.SerializeCompressed()) + srvrLog.Debugf("Attempting persistent connection to channel "+ + "peer %v", lnAddr) + + // Send the persistent connection request to the connection + // manager, saving the request itself so we can + // cancel/restart the process as needed. + connReq := &connmgr.ConnReq{ + Addr: lnAddr, + Permanent: true, + } + s.persistentConnReqs[pubStr] = connReq + go s.connMgr.Connect(connReq) + + s.pendingConnRequests[pubStr] = &connectPeerMsg{ + resp: make(chan int32, 1), + err: make(chan error, 1), + } + } + return s, nil } @@ -179,12 +244,6 @@ func (s *server) Start() error { return nil } - // Start all the listeners. - for _, l := range s.listeners { - s.wg.Add(1) - go s.listener(l) - } - // Start the notification server. This is used so channel management // goroutines can be notified when a funding transaction reaches a // sufficient number of confirmations, or when the input for the @@ -226,13 +285,6 @@ func (s *server) Stop() error { return nil } - // Stop all the listeners. - for _, listener := range s.listeners { - if err := listener.Close(); err != nil { - return err - } - } - // Shutdown the wallet, funding manager, and the rpc server. s.chainNotifier.Stop() s.rpcServer.Stop() @@ -256,9 +308,116 @@ func (s *server) WaitForShutdown() { s.wg.Wait() } +// peerConnected is a function that handles initialization a newly connected +// peer by adding it to the server's global list of all active peers, and +// starting all the goroutines the peer needs to function properly. +func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, inbound bool) { + brontideConn := conn.(*brontide.Conn) + peerAddr := &lnwire.NetAddress{ + IdentityKey: brontideConn.RemotePub(), + Address: conn.RemoteAddr().(*net.TCPAddr), + ChainNet: activeNetParams.Net, + } + + // Now that we've established a connection, create a peer, and + // it to the set of currently active peers. + peer, err := newPeer(conn, s, peerAddr, false) + if err != nil { + srvrLog.Errorf("unable to create peer %v", err) + conn.Close() + return + } + if connReq != nil { + peer.connReq = connReq + } + + // TODO(roasbeef): update IP address for link-node + // * also mark last-seen, do it one single transaction? + + peer.Start() + s.newPeers <- peer + + // If this was an RPC initiated outbound connection that was + // successfully established, then send a response back to the client so + pubStr := string(peerAddr.IdentityKey.SerializeCompressed()) + s.pendingConnMtx.RLock() + msg, ok := s.pendingConnRequests[pubStr] + s.pendingConnMtx.RUnlock() + if ok { + msg.resp <- peer.id + msg.err <- nil + + s.pendingConnMtx.Lock() + delete(s.pendingConnRequests, pubStr) + s.pendingConnMtx.Unlock() + } +} + +// inboundPeerConnected initializes a new peer in response to a new inbound +// connection. +func (s *server) inboundPeerConnected(conn net.Conn) { + s.peersMtx.Lock() + defer s.peersMtx.Unlock() + + srvrLog.Tracef("New inbound connection from %v", conn.RemoteAddr()) + + nodePub := conn.(*brontide.Conn).RemotePub() + + // If we already have an outbound connection to this peer, simply drop + // the connection. + pubStr := string(nodePub.SerializeCompressed()) + if _, ok := s.peersByPub[pubStr]; ok { + srvrLog.Errorf("Received inbound connection from peer %x, but "+ + "already connected, dropping conn", + nodePub.SerializeCompressed()) + conn.Close() + return + } + + // However, if we receive an incoming connection from a peer we're + // attempting to maintain a persistent connection with then we need to + // cancel the ongoing connection attempts to ensure that we don't end + // up with a duplicate connecting to the same peer. + s.pendingConnMtx.RLock() + if connReq, ok := s.persistentConnReqs[pubStr]; ok { + fmt.Println("trying to cancel out attempt") + s.connMgr.Remove(connReq.ID()) + } + s.pendingConnMtx.RUnlock() + + s.peerConnected(conn, nil, false) +} + +// outboundPeerConnected initializes a new peer in response to a new outbound +// connection. +func (s *server) outboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) { + s.peersMtx.Lock() + defer s.peersMtx.Unlock() + + srvrLog.Tracef("Established connection to: %v", conn.RemoteAddr()) + + nodePub := conn.(*brontide.Conn).RemotePub() + + // If we already have an inbound connection from this peer, simply drop + // the connection. + pubStr := string(nodePub.SerializeCompressed()) + if _, ok := s.peersByPub[pubStr]; ok { + srvrLog.Errorf("Established outbound connection to peer %x, but "+ + "already connected, dropping conn", + nodePub.SerializeCompressed()) + s.connMgr.Remove(connReq.ID()) + return + } + + s.peerConnected(conn, connReq, true) +} + // addPeer adds the passed peer to the server's global state of all active // peers. func (s *server) addPeer(p *peer) { + s.peersMtx.Lock() + defer s.peersMtx.Unlock() + if p == nil { return } @@ -269,12 +428,16 @@ func (s *server) addPeer(p *peer) { return } - s.peers[p.id] = p + s.peersByID[p.id] = p + s.peersByPub[string(p.addr.IdentityKey.SerializeCompressed())] = p } // removePeer removes the passed peer from the server's state of all active // peers. func (s *server) removePeer(p *peer) { + s.peersMtx.Lock() + defer s.peersMtx.Unlock() + srvrLog.Debugf("removing peer %v", p) if p == nil { @@ -287,7 +450,8 @@ func (s *server) removePeer(p *peer) { return } - delete(s.peers, p.id) + delete(s.peersByID, p.id) + delete(s.peersByPub, string(p.addr.IdentityKey.SerializeCompressed())) } // connectPeerMsg is a message requesting the server to open a connection to a @@ -331,6 +495,8 @@ type openChanReq struct { // // NOTE: This MUST be run as a goroutine. func (s *server) queryHandler() { + go s.connMgr.Start() + out: for { select { @@ -343,7 +509,6 @@ out: s.removePeer(p) case query := <-s.queries: - // TODO(roasbeef): make all goroutines? switch msg := query.(type) { case *connectPeerMsg: s.handleConnectPeer(msg) @@ -357,14 +522,16 @@ out: } } + s.connMgr.Stop() + s.wg.Done() } // handleListPeers sends a lice of all currently active peers to the original // caller. func (s *server) handleListPeers(msg *listPeersMsg) { - peers := make([]*peer, 0, len(s.peers)) - for _, peer := range s.peers { + peers := make([]*peer, 0, len(s.peersByID)) + for _, peer := range s.peersByID { peers = append(peers, peer) } @@ -377,75 +544,74 @@ func (s *server) handleListPeers(msg *listPeersMsg) { func (s *server) handleConnectPeer(msg *connectPeerMsg) { addr := msg.addr + targetPub := string(msg.addr.IdentityKey.SerializeCompressed()) + // Ensure we're not already connected to this // peer. - targetPub := msg.addr.IdentityKey - for _, peer := range s.peers { - if peer.addr.IdentityKey.IsEqual(targetPub) { - msg.err <- fmt.Errorf( - "already connected to peer: %v", - peer.addr, - ) - msg.resp <- -1 - return - } + s.peersMtx.RLock() + peer, ok := s.peersByPub[targetPub] + if ok { + s.peersMtx.RUnlock() + msg.err <- fmt.Errorf("already connected to peer: %v", peer) + msg.resp <- -1 + return } + s.peersMtx.RUnlock() - // Launch a goroutine to connect to the requested peer so we can - // continue to handle queries. - // - // TODO(roasbeef): semaphore to limit the number of goroutines for - // async requests. - go func() { - srvrLog.Debugf("connecting to %v", addr) + // If there's already a pending connection request for this pubkey, + // then we ignore this request to ensure we don't create a redundant + // connection. + s.pendingConnMtx.RLock() + if _, ok := s.pendingConnRequests[targetPub]; ok { + s.pendingConnMtx.RUnlock() + msg.err <- fmt.Errorf("connection attempt to %v is pending", + addr) + msg.resp <- -1 + return + } + if _, ok := s.persistentConnReqs[targetPub]; ok { + s.pendingConnMtx.RUnlock() + msg.err <- fmt.Errorf("connection attempt to %v is pending", + addr) + msg.resp <- -1 + return + } + s.pendingConnMtx.RUnlock() - // Attempt to connect to the remote node. If the we can't make - // the connection, or the crypto negotation breaks down, then - // return an error to the caller. - conn, err := brontide.Dial(s.identityPriv, addr) - if err != nil { - msg.err <- err - msg.resp <- -1 - return - } + // If there's not already a pending or active connection to this node, + // then instruct the connection manager to attempt to establish a + // persistent connection to the peer. + srvrLog.Debugf("connecting to %v", addr) + go s.connMgr.Connect(&connmgr.ConnReq{ + Addr: addr, + Permanent: true, + }) - // Now that we've established a connection, create a peer, and - // it to the set of currently active peers. - peer, err := newPeer(conn, s, msg.addr, false) - if err != nil { - srvrLog.Errorf("unable to create peer %v", err) - conn.Close() - msg.resp <- -1 - msg.err <- err - return - } - - // TODO(roasbeef): update IP address for link-node - // * also mark last-seen, do it one single transaction? - - peer.Start() - s.newPeers <- peer - - msg.resp <- peer.id - msg.err <- nil - }() + // Finally, we store the original request keyed by the public key so we + // can dispatch the response to the RPC client once a connection has + // been initiated. + s.pendingConnMtx.Lock() + s.pendingConnRequests[targetPub] = msg + s.pendingConnMtx.Unlock() } // handleOpenChanReq first locates the target peer, and if found hands off the // request to the funding manager allowing it to initiate the channel funding // workflow. func (s *server) handleOpenChanReq(req *openChanReq) { + var targetPeer *peer + + pubStr := string(req.targetPubkey.SerializeCompressed()) + // First attempt to locate the target peer to open a channel with, if // we're unable to locate the peer then this request will fail. - var targetPeer *peer - for _, peer := range s.peers { // TODO(roasbeef): threadsafe api - // We found the the target - if peer.addr.IdentityKey.IsEqual(req.targetPubkey) || - req.targetPeerID == peer.id { - targetPeer = peer - break - } + s.peersMtx.RLock() + if peer, ok := s.peersByID[req.targetPeerID]; ok { + targetPeer = peer + } else if peer, ok := s.peersByPub[pubStr]; ok { + targetPeer = peer } + s.peersMtx.RUnlock() if targetPeer == nil { req.err <- fmt.Errorf("unable to find peer nodeID(%x), "+ @@ -506,46 +672,3 @@ func (s *server) Peers() []*peer { return <-resp } - -// listener is a goroutine dedicated to accepting in coming peer connections -// from the passed listener. -// -// NOTE: This MUST be run as a goroutine. -func (s *server) listener(l net.Listener) { - srvrLog.Infof("Server listening on %s", l.Addr()) - for atomic.LoadInt32(&s.shutdown) == 0 { - conn, err := l.Accept() - if err != nil { - // Only log the error message if we aren't currently - // shutting down. - if atomic.LoadInt32(&s.shutdown) == 0 { - srvrLog.Errorf("Can't accept connection: %v", err) - } - continue - } - - srvrLog.Tracef("New inbound connection from %v", conn.RemoteAddr()) - - brontideConn := conn.(*brontide.Conn) - peerAddr := &lnwire.NetAddress{ - IdentityKey: brontideConn.RemotePub(), - Address: conn.RemoteAddr().(*net.TCPAddr), - ChainNet: activeNetParams.Net, - } - - peer, err := newPeer(conn, s, peerAddr, true) - if err != nil { - srvrLog.Errorf("unable to create peer: %v", err) - conn.Close() - continue - } - - // TODO(roasbeef): update IP address for link-node - // * also mark last-seen, do it one single transaction? - - peer.Start() - s.newPeers <- peer - } - - s.wg.Done() -}