package main import ( "bytes" "crypto/sha256" "encoding/hex" "fmt" "net" "strconv" "sync" "sync/atomic" "time" "github.com/boltdb/bolt" "github.com/lightningnetwork/lightning-onion" "github.com/lightningnetwork/lnd/brontide" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/discovery" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing" "github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcd/connmgr" "github.com/roasbeef/btcutil" "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/htlcswitch" ) // server is the main server of the Lightning Network Daemon. The server houses // global state pertaining to the wallet, database, and the rpcserver. // Additionally, the server is also used as a central messaging bus to interact // with any of its companion objects. type server struct { started int32 // atomic shutdown int32 // atomic // identityPriv is the private key used to authenticate any incoming // connections. identityPriv *btcec.PrivateKey // nodeSigner is an implementation of the MessageSigner implementation // that's backed by the identity private key of the running lnd node. nodeSigner *nodeSigner // lightningID is the sha256 of the public key corresponding to our // long-term identity private key. lightningID [32]byte peersMtx sync.RWMutex peersByID map[int32]*peer peersByPub map[string]*peer persistentPeers map[string]struct{} inboundPeers map[string]*peer outboundPeers map[string]*peer cc *chainControl fundingMgr *fundingManager chanDB *channeldb.DB htlcSwitch *htlcswitch.Switch invoices *invoiceRegistry breachArbiter *breachArbiter chanRouter *routing.ChannelRouter discoverSrv *discovery.AuthenticatedGossiper utxoNursery *utxoNursery sphinx *htlcswitch.OnionProcessor connMgr *connmgr.ConnManager pendingConnMtx sync.RWMutex persistentConnReqs map[string][]*connmgr.ConnReq broadcastRequests chan *broadcastReq sendRequests chan *sendReq newPeers chan *peer donePeers chan *peer queries chan interface{} // globalFeatures feature vector which affects HTLCs and thus are also // advertised to other nodes. globalFeatures *lnwire.FeatureVector // localFeatures is an feature vector which represent the features which // only affect the protocol between these two nodes. localFeatures *lnwire.FeatureVector wg sync.WaitGroup quit chan struct{} } // newServer creates a new instance of the server which is to listen using the // passed listener address. func newServer(listenAddrs []string, chanDB *channeldb.DB, cc *chainControl, privKey *btcec.PrivateKey) (*server, error) { var err error listeners := make([]net.Listener, len(listenAddrs)) for i, addr := range listenAddrs { listeners[i], err = brontide.NewListener(privKey, addr) if err != nil { return nil, err } } serializedPubKey := privKey.PubKey().SerializeCompressed() s := &server{ chanDB: chanDB, cc: cc, invoices: newInvoiceRegistry(chanDB), utxoNursery: newUtxoNursery(chanDB, cc.chainNotifier, cc.wallet), identityPriv: privKey, nodeSigner: newNodeSigner(privKey), // TODO(roasbeef): derive proper onion key based on rotation // schedule sphinx: htlcswitch.NewOnionProcessor( sphinx.NewRouter(privKey, activeNetParams.Params)), lightningID: sha256.Sum256(serializedPubKey), persistentPeers: make(map[string]struct{}), persistentConnReqs: make(map[string][]*connmgr.ConnReq), peersByID: make(map[int32]*peer), peersByPub: make(map[string]*peer), inboundPeers: make(map[string]*peer), outboundPeers: make(map[string]*peer), newPeers: make(chan *peer, 10), donePeers: make(chan *peer, 10), broadcastRequests: make(chan *broadcastReq), sendRequests: make(chan *sendReq), globalFeatures: globalFeatures, localFeatures: localFeatures, queries: make(chan interface{}), quit: make(chan struct{}), } // If the debug HTLC flag is on, then we invoice a "master debug" // invoice which all outgoing payments will be sent and all incoming // HTLCs with the debug R-Hash immediately settled. if cfg.DebugHTLC { kiloCoin := btcutil.Amount(btcutil.SatoshiPerBitcoin * 1000) s.invoices.AddDebugInvoice(kiloCoin, *debugPre) srvrLog.Debugf("Debug HTLC invoice inserted, preimage=%x, hash=%x", debugPre[:], debugHash[:]) } s.htlcSwitch = htlcswitch.New(htlcswitch.Config{ LocalChannelClose: func(pubKey []byte, request *htlcswitch.ChanClose) { s.peersMtx.RLock() peer, ok := s.peersByPub[string(pubKey)] s.peersMtx.RUnlock() if !ok { srvrLog.Error("unable to close channel, peer"+ " with %v id can't be found", pubKey) return } peer.localCloseChanReqs <- request }, UpdateTopology: func(msg *lnwire.ChannelUpdate) error { s.discoverSrv.ProcessRemoteAnnouncement(msg, nil) return nil }, }) // If external IP addresses have been specified, add those to the list // of this server's addresses. selfAddrs := make([]net.Addr, 0, len(cfg.ExternalIPs)) for _, ip := range cfg.ExternalIPs { var addr string _, _, err = net.SplitHostPort(ip) if err != nil { addr = net.JoinHostPort(ip, strconv.Itoa(defaultPeerPort)) } else { addr = ip } lnAddr, err := net.ResolveTCPAddr("tcp", addr) if err != nil { return nil, err } selfAddrs = append(selfAddrs, lnAddr) } chanGraph := chanDB.ChannelGraph() // TODO(roasbeef): make alias configurable alias := lnwire.NewAlias(hex.EncodeToString(serializedPubKey[:10])) self := &channeldb.LightningNode{ LastUpdate: time.Now(), Addresses: selfAddrs, PubKey: privKey.PubKey(), Alias: alias.String(), Features: globalFeatures, } // If our information has changed since our last boot, then we'll // re-sign our node announcement so a fresh authenticated version of it // can be propagated throughout the network upon startup. // TODO(roasbeef): don't always set timestamp above to _now. self.AuthSig, err = discovery.SignAnnouncement(s.nodeSigner, s.identityPriv.PubKey(), &lnwire.NodeAnnouncement{ Timestamp: uint32(self.LastUpdate.Unix()), Addresses: self.Addresses, NodeID: self.PubKey, Alias: alias, Features: self.Features, }, ) if err != nil { return nil, fmt.Errorf("unable to generate signature for "+ "self node announcement: %v", err) } if err := chanGraph.SetSourceNode(self); err != nil { return nil, fmt.Errorf("can't set self node: %v", err) } s.chanRouter, err = routing.New(routing.Config{ Graph: chanGraph, Chain: cc.chainIO, ChainView: cc.chainView, SendToSwitch: func(firstHop *btcec.PublicKey, htlcAdd *lnwire.UpdateAddHTLC, circuit *sphinx.Circuit) ([32]byte, error) { // Using the created circuit, initialize the error // decryptor so we can parse+decode any failures // incurred by this payment within the switch. errorDecryptor := &htlcswitch.FailureDeobfuscator{ OnionDeobfuscator: sphinx.NewOnionDeobfuscator(circuit), } var firstHopPub [33]byte copy(firstHopPub[:], firstHop.SerializeCompressed()) return s.htlcSwitch.SendHTLC(firstHopPub, htlcAdd, errorDecryptor) }, }) if err != nil { return nil, fmt.Errorf("can't create router: %v", err) } s.discoverSrv, err = discovery.New(discovery.Config{ Broadcast: s.broadcastMessage, Notifier: s.cc.chainNotifier, Router: s.chanRouter, SendToPeer: s.sendToPeer, TrickleDelay: time.Millisecond * 300, ProofMatureDelta: 0, DB: chanDB, }) if err != nil { return nil, err } s.breachArbiter = newBreachArbiter(cc.wallet, chanDB, cc.chainNotifier, s.htlcSwitch, s.cc.chainIO, s.cc.feeEstimator) // TODO(roasbeef): introduce closure and config system to decouple the // initialization above ^ // Create the connection manager which will be responsible for // maintaining persistent outbound connections and also accepting new // incoming connections cmgr, err := connmgr.New(&connmgr.Config{ Listeners: listeners, OnAccept: s.inboundPeerConnected, RetryDuration: time.Second * 5, TargetOutbound: 100, GetNewAddress: nil, Dial: noiseDial(s.identityPriv), OnConnection: s.outboundPeerConnected, }) if err != nil { return nil, err } s.connMgr = cmgr return s, nil } // Start starts the main daemon server, all requested listeners, and any helper // goroutines. func (s *server) Start() error { // Already running? if atomic.AddInt32(&s.started, 1) != 1 { return nil } // Start the notification server. This is used so channel management // goroutines can be notified when a funding transaction reaches a // sufficient number of confirmations, or when the input for the // funding transaction is spent in an attempt at an uncooperative close // by the counterparty. if err := s.cc.chainNotifier.Start(); err != nil { return err } if err := s.htlcSwitch.Start(); err != nil { return err } if err := s.utxoNursery.Start(); err != nil { return err } if err := s.breachArbiter.Start(); err != nil { return err } if err := s.discoverSrv.Start(); err != nil { return err } if err := s.chanRouter.Start(); err != nil { return err } s.wg.Add(1) go s.queryHandler() // With all the relevant sub-systems started, we'll now attempt to // establish persistent connections to our direct channel collaborators // within the network. if err := s.establishPersistentConnections(); err != nil { return err } return nil } // Stop gracefully shutsdown the main daemon server. This function will signal // any active goroutines, or helper objects to exit, then blocks until they've // all successfully exited. Additionally, any/all listeners are closed. func (s *server) Stop() error { // Bail if we're already shutting down. if atomic.AddInt32(&s.shutdown, 1) != 1 { return nil } // Shutdown the wallet, funding manager, and the rpc server. s.cc.chainNotifier.Stop() s.chanRouter.Stop() s.htlcSwitch.Stop() s.utxoNursery.Stop() s.breachArbiter.Stop() s.discoverSrv.Stop() s.cc.wallet.Shutdown() s.cc.chainView.Stop() // Signal all the lingering goroutines to quit. close(s.quit) s.wg.Wait() return nil } // establishPersistentConnections attempts to establish persistent connections // to all our direct channel collaborators. In order to promote liveness of // our active channels, we instruct the connection manager to attempt to // establish and maintain persistent connections to all our direct channel // counterparties. func (s *server) establishPersistentConnections() error { // nodeAddrsMap stores the combination of node public keys and // addresses that we'll attempt to reconnect to. PubKey strings are // used as keys since other PubKey forms can't be compared. nodeAddrsMap := map[string]*nodeAddresses{} // Iterate through the list of LinkNodes to find addresses we should // attempt to connect to based on our set of previous connections. Set // the reconnection port to the default peer port. linkNodes, err := s.chanDB.FetchAllLinkNodes() if err != nil && err != channeldb.ErrLinkNodesNotFound { return err } for _, node := range linkNodes { for _, address := range node.Addresses { if address.Port == 0 { address.Port = defaultPeerPort } } pubStr := string(node.IdentityPub.SerializeCompressed()) nodeAddrs := &nodeAddresses{ pubKey: node.IdentityPub, addresses: node.Addresses, } nodeAddrsMap[pubStr] = nodeAddrs } // After checking our previous connections for addresses to connect to, // iterate through the nodes in our channel graph to find addresses // that have been added via NodeAnnouncement messages. chanGraph := s.chanDB.ChannelGraph() sourceNode, err := chanGraph.SourceNode() if err != nil { return err } // TODO(roasbeef): instead iterate over link nodes and query graph for // each of the nodes. err = sourceNode.ForEachChannel(nil, func(_ *bolt.Tx, _ *channeldb.ChannelEdgeInfo, policy *channeldb.ChannelEdgePolicy) error { pubStr := string(policy.Node.PubKey.SerializeCompressed()) // Add addresses from channel graph/NodeAnnouncements to the // list of addresses we'll connect to. If there are duplicates // that have different ports specified, the port from the // channel graph should supersede the port from the link node. var addrs []*net.TCPAddr linkNodeAddrs, ok := nodeAddrsMap[pubStr] if ok { for _, lnAddress := range linkNodeAddrs.addresses { var addrMatched bool for _, polAddress := range policy.Node.Addresses { polTCPAddr, ok := polAddress.(*net.TCPAddr) if ok && polTCPAddr.IP.Equal(lnAddress.IP) { addrMatched = true addrs = append(addrs, polTCPAddr) } } if !addrMatched { addrs = append(addrs, lnAddress) } } } else { for _, addr := range policy.Node.Addresses { polTCPAddr, ok := addr.(*net.TCPAddr) if ok { addrs = append(addrs, polTCPAddr) } } } nodeAddrsMap[pubStr] = &nodeAddresses{ pubKey: policy.Node.PubKey, addresses: addrs, } return nil }) if err != nil && err != channeldb.ErrGraphNoEdgesFound { return err } // Iterate through the combined list of addresses from prior links and // node announcements and attempt to reconnect to each node. for pubStr, nodeAddr := range nodeAddrsMap { // Add this peer to the set of peers we should maintain a // persistent connection with. s.persistentPeers[pubStr] = struct{}{} for _, address := range nodeAddr.addresses { // Create a wrapper address which couples the IP and // the pubkey so the brontide authenticated connection // can be established. lnAddr := &lnwire.NetAddress{ IdentityKey: nodeAddr.pubKey, Address: address, } srvrLog.Debugf("Attempting persistent connection to "+ "channel peer %v", lnAddr) // Send the persistent connection request to the // connection manager, saving the request itself so we // can cancel/restart the process as needed. connReq := &connmgr.ConnReq{ Addr: lnAddr, Permanent: true, } s.pendingConnMtx.Lock() s.persistentConnReqs[pubStr] = append(s.persistentConnReqs[pubStr], connReq) s.pendingConnMtx.Unlock() go s.connMgr.Connect(connReq) } } return nil } // WaitForShutdown blocks all goroutines have been stopped. func (s *server) WaitForShutdown() { s.wg.Wait() } // broadcastReq is a message sent to the server by a related subsystem when it // wishes to broadcast one or more messages to all connected peers. Thi type broadcastReq struct { ignore *btcec.PublicKey msgs []lnwire.Message errChan chan error // MUST be buffered. } // broadcastMessage sends a request to the server to broadcast a set of // messages to all peers other than the one specified by the `skip` parameter. func (s *server) broadcastMessage(skip *btcec.PublicKey, msgs ...lnwire.Message) error { errChan := make(chan error, 1) msgsToSend := make([]lnwire.Message, 0, len(msgs)) msgsToSend = append(msgsToSend, msgs...) broadcastReq := &broadcastReq{ ignore: skip, msgs: msgsToSend, errChan: errChan, } select { case s.broadcastRequests <- broadcastReq: case <-s.quit: return errors.New("server shutting down") } select { case err := <-errChan: return err case <-s.quit: return errors.New("server shutting down") } } // sendReq is message sent to the server by a related subsystem which it // wishes to send a set of messages to a specified peer. type sendReq struct { target *btcec.PublicKey msgs []lnwire.Message errChan chan error } type nodeAddresses struct { pubKey *btcec.PublicKey addresses []*net.TCPAddr } // sendToPeer send a message to the server telling it to send the specific set // of message to a particular peer. If the peer connect be found, then this // method will return a non-nil error. func (s *server) sendToPeer(target *btcec.PublicKey, msgs ...lnwire.Message) error { errChan := make(chan error, 1) msgsToSend := make([]lnwire.Message, 0, len(msgs)) msgsToSend = append(msgsToSend, msgs...) sMsg := &sendReq{ target: target, msgs: msgsToSend, errChan: errChan, } select { case s.sendRequests <- sMsg: case <-s.quit: return errors.New("server shutting down") } select { case err := <-errChan: return err case <-s.quit: return errors.New("server shutting down") } } // findPeer will return the peer that corresponds to the passed in public key. // This function is used by the funding manager, allowing it to update the // daemon's local representation of the remote peer. func (s *server) findPeer(peerKey *btcec.PublicKey) (*peer, error) { serializedIDKey := string(peerKey.SerializeCompressed()) s.peersMtx.RLock() peer := s.peersByPub[serializedIDKey] s.peersMtx.RUnlock() if peer == nil { return nil, errors.New("Peer not found. Pubkey: " + string(peerKey.SerializeCompressed())) } return peer, nil } // peerTerminationWatcher waits until a peer has been disconnected, and then // cleans up all resources allocated to the peer, notifies relevant sub-systems // of its demise, and finally handles re-connecting to the peer if it's // persistent. // // NOTE: This MUST be launched as a goroutine. func (s *server) peerTerminationWatcher(p *peer) { p.WaitForDisconnect() srvrLog.Debugf("Peer %v has been disconnected", p) // If the server is exiting then we can bail out early ourselves as all // the other sub-systems will already be shutting down. select { case <-s.quit: return default: // If we aren't shutting down, then we'll fall through this // this empty default case. } // Tell the switch to remove all links associated with this peer. // Passing nil as the target link indicates that all links associated // with this interface should be closed. // // TODO(roasbeef): instead add a PurgeInterfaceLinks function? links, err := p.server.htlcSwitch.GetLinksByInterface(p.pubKeyBytes) if err != nil { srvrLog.Errorf("unable to get channel links: %v", err) } for _, link := range links { err := p.server.htlcSwitch.RemoveLink(link.ChanID()) if err != nil { srvrLog.Errorf("unable to remove channel link: %v", err) } } // Send the peer to be garbage collected by the server. p.server.donePeers <- p // If this peer had an active persistent connection request, then we // can remove this as we manually decide below if we should attempt to // re-connect. if p.connReq != nil { s.connMgr.Remove(p.connReq.ID()) } // Next, check to see if this is a persistent peer or not. pubStr := string(p.addr.IdentityKey.SerializeCompressed()) s.pendingConnMtx.RLock() _, ok := s.persistentPeers[pubStr] s.pendingConnMtx.RUnlock() if ok { srvrLog.Debugf("Attempting to re-establish persistent "+ "connection to peer %v", p) // If so, then we'll attempt to re-establish a persistent // connection to the peer. // TODO(roasbeef): look up latest info for peer in database connReq := &connmgr.ConnReq{ Addr: p.addr, Permanent: true, } s.pendingConnMtx.Lock() // We'll only need to re-launch a connection requests if one // isn't already currently pending. if _, ok := s.persistentConnReqs[pubStr]; ok { return } // Otherwise, we'll launch a new connection requests in order // to attempt to maintain a persistent connection with this // peer. s.persistentConnReqs[pubStr] = append(s.persistentConnReqs[pubStr], connReq) s.pendingConnMtx.Unlock() go s.connMgr.Connect(connReq) } } // peerConnected is a function that handles initialization a newly connected // peer by adding it to the server's global list of all active peers, and // starting all the goroutines the peer needs to function properly. func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, inbound bool) { brontideConn := conn.(*brontide.Conn) peerAddr := &lnwire.NetAddress{ IdentityKey: brontideConn.RemotePub(), Address: conn.RemoteAddr().(*net.TCPAddr), ChainNet: activeNetParams.Net, } // Now that we've established a connection, create a peer, and // it to the set of currently active peers. p, err := newPeer(conn, connReq, s, peerAddr, inbound) if err != nil { srvrLog.Errorf("unable to create peer %v", err) return } // TODO(roasbeef): update IP address for link-node // * also mark last-seen, do it one single transaction? // Attempt to start the peer, if we're unable to do so, then disconnect // this peer. if err := p.Start(); err != nil { p.Disconnect(errors.Errorf("unable to start peer: %v", err)) return } s.newPeers <- p } // shouldDropConnection determines if our local connection to a remote peer // should be dropped in the case of concurrent connection establishment. In // order to deterministically decide which connection should be dropped, we'll // utilize the ordering of the local and remote public key. If we didn't use // such a tie breaker, then we risk _both_ connections erroneously being // dropped. func shouldDropLocalConnection(local, remote *btcec.PublicKey) bool { localPubBytes := local.SerializeCompressed() remotePubPbytes := remote.SerializeCompressed() // The connection that comes from the node with a "smaller" pubkey should // be kept. Therefore, if our pubkey is "greater" than theirs, we should // drop our established connection. return bytes.Compare(localPubBytes, remotePubPbytes) > 0 } // inboundPeerConnected initializes a new peer in response to a new inbound // connection. func (s *server) inboundPeerConnected(conn net.Conn) { s.peersMtx.Lock() defer s.peersMtx.Unlock() // If we already have an inbound connection to this peer, then ignore // this new connection. nodePub := conn.(*brontide.Conn).RemotePub() pubStr := string(nodePub.SerializeCompressed()) if _, ok := s.inboundPeers[pubStr]; ok { srvrLog.Debugf("Ignoring duplicate inbound connection") conn.Close() return } srvrLog.Infof("New inbound connection from %v", conn.RemoteAddr()) localPub := s.identityPriv.PubKey() // Check to see if we should drop our connection, if not, then we'll // close out this connection with the remote peer. This // prevents us from having duplicate connections, or none. if connectedPeer, ok := s.peersByPub[pubStr]; ok { // If the connection we've already established should be kept, // then we'll close out this connection s.t there's only a // single connection between us. if !shouldDropLocalConnection(localPub, nodePub) { srvrLog.Warnf("Received inbound connection from "+ "peer %x, but already connected, dropping conn", nodePub.SerializeCompressed()) conn.Close() return } // Otherwise, if we should drop the connection, then we'll // disconnect our already connected peer, and also send the // peer to the peer garbage collection goroutine. srvrLog.Debugf("Disconnecting stale connection to %v", connectedPeer) connectedPeer.Disconnect(errors.New("remove stale connection")) s.donePeers <- connectedPeer } // Next, check to see if we have any outstanding persistent connection // requests to this peer. If so, then we'll remove all of these // connection requests, and also delete the entry from the map. s.pendingConnMtx.Lock() if connReqs, ok := s.persistentConnReqs[pubStr]; ok { for _, connReq := range connReqs { s.connMgr.Remove(connReq.ID()) } delete(s.persistentConnReqs, pubStr) } s.pendingConnMtx.Unlock() go s.peerConnected(conn, nil, false) } // outboundPeerConnected initializes a new peer in response to a new outbound // connection. func (s *server) outboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) { s.peersMtx.Lock() defer s.peersMtx.Unlock() localPub := s.identityPriv.PubKey() nodePub := conn.(*brontide.Conn).RemotePub() pubStr := string(nodePub.SerializeCompressed()) // If we already have an outbound connection to this peer, then ignore // this new connection. if _, ok := s.outboundPeers[pubStr]; ok { srvrLog.Debugf("Ignoring duplicate outbound connection") conn.Close() return } if _, ok := s.persistentConnReqs[pubStr]; !ok && connReq != nil { srvrLog.Debugf("Ignoring cancelled outbound connection") conn.Close() return } srvrLog.Infof("Established connection to: %v", conn.RemoteAddr()) // As we've just established an outbound connection to this peer, we'll // cancel all other persistent connection requests and eliminate the // entry for this peer from the map. s.pendingConnMtx.Lock() if connReqs, ok := s.persistentConnReqs[pubStr]; ok { for _, pConnReq := range connReqs { if pConnReq.ID() != connReq.ID() { s.connMgr.Remove(pConnReq.ID()) } } delete(s.persistentConnReqs, pubStr) } s.pendingConnMtx.Unlock() // If we already have an inbound connection from this peer, then we'll // check to see _which_ of our connections should be dropped. if connectedPeer, ok := s.peersByPub[pubStr]; ok { // If our (this) connection should be dropped, then we'll do // so, in order to ensure we don't have any duplicate // connections. if shouldDropLocalConnection(localPub, nodePub) { srvrLog.Warnf("Established outbound connection to "+ "peer %x, but already connected, dropping conn", nodePub.SerializeCompressed()) s.connMgr.Remove(connReq.ID()) conn.Close() return } // Otherwise, _their_ connection should be dropped. So we'll // disconnect the peer and send the now obsolete peer to the // server for garbage collection. srvrLog.Debugf("Disconnecting stale connection to %v", connectedPeer) connectedPeer.Disconnect(errors.New("remove stale connection")) s.donePeers <- connectedPeer } go s.peerConnected(conn, connReq, true) } // addPeer adds the passed peer to the server's global state of all active // peers. func (s *server) addPeer(p *peer) { if p == nil { return } // Ignore new peers if we're shutting down. if atomic.LoadInt32(&s.shutdown) != 0 { p.Disconnect(errors.New("server is shutting down")) return } // Track the new peer in our indexes so we can quickly look it up either // according to its public key, or it's peer ID. // TODO(roasbeef): pipe all requests through to the // queryHandler/peerManager s.peersMtx.Lock() pubStr := string(p.addr.IdentityKey.SerializeCompressed()) s.peersByID[p.id] = p s.peersByPub[pubStr] = p if p.inbound { s.inboundPeers[pubStr] = p } else { s.outboundPeers[pubStr] = p } s.peersMtx.Unlock() // Launch a goroutine to watch for the termination of this peer so we // can ensure all resources are properly cleaned up and if need be // connections are re-established. go s.peerTerminationWatcher(p) // Once the peer has been added to our indexes, send a message to the // channel router so we can synchronize our view of the channel graph // with this new peer. go s.discoverSrv.SynchronizeNode(p.addr.IdentityKey) } // removePeer removes the passed peer from the server's state of all active // peers. func (s *server) removePeer(p *peer) { s.peersMtx.Lock() defer s.peersMtx.Unlock() srvrLog.Debugf("removing peer %v", p) if p == nil { return } // As the peer is now finished, ensure that the TCP connection is // closed and all of its related goroutines have exited. p.Disconnect(errors.New("remove peer")) // Ignore deleting peers if we're shutting down. if atomic.LoadInt32(&s.shutdown) != 0 { return } pubStr := string(p.addr.IdentityKey.SerializeCompressed()) delete(s.peersByID, p.id) delete(s.peersByPub, pubStr) if p.inbound { delete(s.inboundPeers, pubStr) } else { delete(s.outboundPeers, pubStr) } } // connectPeerMsg is a message requesting the server to open a connection to a // particular peer. This message also houses an error channel which will be // used to report success/failure. type connectPeerMsg struct { addr *lnwire.NetAddress persistent bool err chan error } // disconnectPeerMsg is a message requesting the server to disconnect from an // active peer. type disconnectPeerMsg struct { pubKey *btcec.PublicKey err chan error } // openChanReq is a message sent to the server in order to request the // initiation of a channel funding workflow to the peer with either the specified // relative peer ID, or a global lightning ID. type openChanReq struct { targetPeerID int32 targetPubkey *btcec.PublicKey // TODO(roasbeef): make enums in lnwire channelType uint8 coinType uint64 localFundingAmt btcutil.Amount remoteFundingAmt btcutil.Amount pushAmt btcutil.Amount numConfs uint32 updates chan *lnrpc.OpenStatusUpdate err chan error } // queryHandler handles any requests to modify the server's internal state of // all active peers, or query/mutate the server's global state. Additionally, // any queries directed at peers will be handled by this goroutine. // // NOTE: This MUST be run as a goroutine. func (s *server) queryHandler() { go s.connMgr.Start() out: for { select { // New peers. case p := <-s.newPeers: s.addPeer(p) // Finished peers. case p := <-s.donePeers: s.removePeer(p) case bMsg := <-s.broadcastRequests: ignore := bMsg.ignore srvrLog.Debugf("Broadcasting %v messages", len(bMsg.msgs)) // Launch a new goroutine to handle the broadcast // request, this allows us process this request // asynchronously without blocking subsequent broadcast // requests. go func() { s.peersMtx.RLock() for _, sPeer := range s.peersByPub { if ignore != nil && sPeer.addr.IdentityKey.IsEqual(ignore) { srvrLog.Debugf("Skipping %v in broadcast", ignore.SerializeCompressed()) continue } go func(p *peer) { for _, msg := range bMsg.msgs { p.queueMsg(msg, nil) } }(sPeer) } s.peersMtx.RUnlock() bMsg.errChan <- nil }() case sMsg := <-s.sendRequests: // TODO(roasbeef): use [33]byte everywhere instead // * eliminate usage of mutexes, funnel all peer // mutation to this goroutine target := sMsg.target.SerializeCompressed() srvrLog.Debugf("Attempting to send msgs %v to: %x", len(sMsg.msgs), target) // Launch a new goroutine to handle this send request, // this allows us process this request asynchronously // without blocking future send requests. go func() { s.peersMtx.RLock() targetPeer, ok := s.peersByPub[string(target)] if !ok { s.peersMtx.RUnlock() srvrLog.Errorf("unable to send message to %x, "+ "peer not found", target) sMsg.errChan <- errors.New("peer not found") return } s.peersMtx.RUnlock() sMsg.errChan <- nil for _, msg := range sMsg.msgs { targetPeer.queueMsg(msg, nil) } }() case query := <-s.queries: switch msg := query.(type) { case *disconnectPeerMsg: s.handleDisconnectPeer(msg) case *connectPeerMsg: s.handleConnectPeer(msg) case *openChanReq: s.handleOpenChanReq(msg) } case <-s.quit: break out } } s.connMgr.Stop() s.wg.Done() } // handleConnectPeer attempts to establish a connection to the address enclosed // within the passed connectPeerMsg. This function is *async*, a goroutine will // be spawned in order to finish the request, and respond to the caller. func (s *server) handleConnectPeer(msg *connectPeerMsg) { addr := msg.addr targetPub := string(msg.addr.IdentityKey.SerializeCompressed()) // Ensure we're not already connected to this // peer. s.peersMtx.RLock() peer, ok := s.peersByPub[targetPub] if ok { s.peersMtx.RUnlock() msg.err <- fmt.Errorf("already connected to peer: %v", peer) return } s.peersMtx.RUnlock() // If there's already a pending connection request for this pubkey, // then we ignore this request to ensure we don't create a redundant // connection. s.pendingConnMtx.RLock() if _, ok := s.persistentConnReqs[targetPub]; ok { s.pendingConnMtx.RUnlock() msg.err <- fmt.Errorf("connection attempt to %v is pending", addr) return } s.pendingConnMtx.RUnlock() // If there's not already a pending or active connection to this node, // then instruct the connection manager to attempt to establish a // persistent connection to the peer. srvrLog.Debugf("Connecting to %v", addr) if msg.persistent { connReq := &connmgr.ConnReq{ Addr: addr, Permanent: true, } s.pendingConnMtx.Lock() s.persistentPeers[targetPub] = struct{}{} s.persistentConnReqs[targetPub] = append(s.persistentConnReqs[targetPub], connReq) s.pendingConnMtx.Unlock() go s.connMgr.Connect(connReq) msg.err <- nil } else { // If we're not making a persistent connection, then we'll // attempt to connect o the target peer, returning an error // which indicates success of failure. go func() { // Attempt to connect to the remote node. If the we // can't make the connection, or the crypto negotiation // breaks down, then return an error to the caller. conn, err := brontide.Dial(s.identityPriv, addr) if err != nil { msg.err <- err return } s.outboundPeerConnected(nil, conn) msg.err <- nil }() } } // handleDisconnectPeer attempts to disconnect one peer from another func (s *server) handleDisconnectPeer(msg *disconnectPeerMsg) { pubBytes := msg.pubKey.SerializeCompressed() pubStr := string(pubBytes) // Check that were actually connected to this peer. If not, then we'll // exit in an error as we can't disconnect from a peer that we're not // currently connected to. s.peersMtx.RLock() peer, ok := s.peersByPub[pubStr] s.peersMtx.RUnlock() if !ok { msg.err <- fmt.Errorf("unable to find peer %x", pubBytes) return } // If this peer was formerly a persistent connection, then we'll remove // them from this map so we don't attempt to re-connect after we // disconnect. s.pendingConnMtx.Lock() if _, ok := s.persistentPeers[pubStr]; ok { delete(s.persistentPeers, pubStr) } s.pendingConnMtx.Unlock() // Now that we know the peer is actually connected, we'll disconnect // from the peer. srvrLog.Infof("Disconnecting from %v", peer) peer.Disconnect(errors.New("received user command to disconnect the peer")) msg.err <- nil } // handleOpenChanReq first locates the target peer, and if found hands off the // request to the funding manager allowing it to initiate the channel funding // workflow. func (s *server) handleOpenChanReq(req *openChanReq) { var ( targetPeer *peer pubKeyBytes []byte ) // If the user is targeting the peer by public key, then we'll need to // convert that into a string for our map. Otherwise, we expect them to // target by peer ID instead. if req.targetPubkey != nil { pubKeyBytes = req.targetPubkey.SerializeCompressed() } // First attempt to locate the target peer to open a channel with, if // we're unable to locate the peer then this request will fail. s.peersMtx.RLock() if peer, ok := s.peersByID[req.targetPeerID]; ok { targetPeer = peer } else if peer, ok := s.peersByPub[string(pubKeyBytes)]; ok { targetPeer = peer } s.peersMtx.RUnlock() if targetPeer == nil { req.err <- fmt.Errorf("unable to find peer nodeID(%x), "+ "peerID(%v)", pubKeyBytes, req.targetPeerID) return } // Spawn a goroutine to send the funding workflow request to the // funding manager. This allows the server to continue handling queries // instead of blocking on this request which is exported as a // synchronous request to the outside world. // TODO(roasbeef): pass in chan that's closed if/when funding succeeds // so can track as persistent peer? go s.fundingMgr.initFundingWorkflow(targetPeer.addr, req) } // ConnectToPeer requests that the server connect to a Lightning Network peer // at the specified address. This function will *block* until either a // connection is established, or the initial handshake process fails. func (s *server) ConnectToPeer(addr *lnwire.NetAddress, perm bool) error { errChan := make(chan error, 1) s.queries <- &connectPeerMsg{ addr: addr, persistent: perm, err: errChan, } return <-errChan } // DisconnectPeer sends the request to server to close the connection with peer // identified by public key. func (s *server) DisconnectPeer(pubKey *btcec.PublicKey) error { errChan := make(chan error, 1) s.queries <- &disconnectPeerMsg{ pubKey: pubKey, err: errChan, } return <-errChan } // OpenChannel sends a request to the server to open a channel to the specified // peer identified by ID with the passed channel funding paramters. func (s *server) OpenChannel(peerID int32, nodeKey *btcec.PublicKey, localAmt, pushAmt btcutil.Amount, numConfs uint32) (chan *lnrpc.OpenStatusUpdate, chan error) { errChan := make(chan error, 1) updateChan := make(chan *lnrpc.OpenStatusUpdate, 1) req := &openChanReq{ targetPeerID: peerID, targetPubkey: nodeKey, localFundingAmt: localAmt, pushAmt: pushAmt, numConfs: numConfs, updates: updateChan, err: errChan, } s.queries <- req return updateChan, errChan } // Peers returns a slice of all active peers. func (s *server) Peers() []*peer { s.peersMtx.RLock() peers := make([]*peer, 0, len(s.peersByID)) for _, peer := range s.peersByID { peers = append(peers, peer) } s.peersMtx.RUnlock() return peers }