diff --git a/server.go b/server.go index a9324533..0bce7ece 100644 --- a/server.go +++ b/server.go @@ -187,22 +187,20 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier, chanGraph := chanDB.ChannelGraph() - // In order to have ability to announce the self node we need to - // sign the node announce message, and include the signature in the - // node channeldb object. + // TODO(roasbeef): make alias configurable alias := lnwire.NewAlias(hex.EncodeToString(serializedPubKey[:10])) self := &channeldb.LightningNode{ LastUpdate: time.Now(), Addresses: selfAddrs, PubKey: privKey.PubKey(), - // TODO(roasbeef): make alias configurable - Alias: alias.String(), - Features: globalFeatures, + Alias: alias.String(), + Features: globalFeatures, } - // Initialize graph with authenticated lightning node. We need to - // generate a valid signature in order for other nodes on the network - // to accept our announcement. + // If our information has changed since our last boot, then we'll + // re-sign our node announcement so a fresh authenticated version of it + // can be propagated throughout the network upon startup. + // TODO(roasbeef): don't always set timestamp above to _now. self.AuthSig, err = discovery.SignAnnouncement(s.nodeSigner, s.identityPriv.PubKey(), &lnwire.NodeAnnouncement{ @@ -217,7 +215,6 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier, return nil, fmt.Errorf("unable to generate signature for "+ "self node announcement: %v", err) } - if err := chanGraph.SetSourceNode(self); err != nil { return nil, fmt.Errorf("can't set self node: %v", err) } @@ -322,116 +319,6 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier, } s.connMgr = cmgr - // In order to promote liveness of our active channels, instruct the - // connection manager to attempt to establish and maintain persistent - // connections to all our direct channel counterparties. - - // nodeAddrsMap stores the combination of node public keys and - // addresses that we'll attempt to reconnect to. PubKey strings are - // used as keys since other PubKey forms can't be compared. - nodeAddrsMap := map[string]*nodeAddresses{} - - // Iterate through the list of LinkNodes to find addresses we should - // attempt to connect to based on our set of previous connections. Set - // the reconnection port to the default peer port. - linkNodes, err := s.chanDB.FetchAllLinkNodes() - if err != nil && err != channeldb.ErrLinkNodesNotFound { - return nil, err - } - for _, node := range linkNodes { - for _, address := range node.Addresses { - if address.Port == 0 { - address.Port = defaultPeerPort - } - } - pubStr := string(node.IdentityPub.SerializeCompressed()) - nodeAddrs := &nodeAddresses{ - pubKey: node.IdentityPub, - addresses: node.Addresses, - } - nodeAddrsMap[pubStr] = nodeAddrs - } - - // After checking our previous connections for addresses to connect to, - // iterate through the nodes in our channel graph to find addresses - // that have been added via NodeAnnouncement messages. - sourceNode, err := chanGraph.SourceNode() - if err != nil { - return nil, err - } - err = sourceNode.ForEachChannel(nil, func(_ *bolt.Tx, - _ *channeldb.ChannelEdgeInfo, policy *channeldb.ChannelEdgePolicy) error { - - pubStr := string(policy.Node.PubKey.SerializeCompressed()) - - // Add addresses from channel graph/NodeAnnouncements to the - // list of addresses we'll connect to. If there are duplicates - // that have different ports specified, the port from the - // channel graph should supersede the port from the link node. - var addrs []*net.TCPAddr - linkNodeAddrs, ok := nodeAddrsMap[pubStr] - if ok { - for _, lnAddress := range linkNodeAddrs.addresses { - var addrMatched bool - for _, polAddress := range policy.Node.Addresses { - polTCPAddr, ok := polAddress.(*net.TCPAddr) - if ok && polTCPAddr.IP.Equal(lnAddress.IP) { - addrMatched = true - addrs = append(addrs, polTCPAddr) - } - } - if !addrMatched { - addrs = append(addrs, lnAddress) - } - } - } else { - for _, addr := range policy.Node.Addresses { - polTCPAddr, ok := addr.(*net.TCPAddr) - if ok { - addrs = append(addrs, polTCPAddr) - } - } - } - - nodeAddrsMap[pubStr] = &nodeAddresses{ - pubKey: policy.Node.PubKey, - addresses: addrs, - } - - return nil - }) - if err != nil && err != channeldb.ErrGraphNoEdgesFound { - return nil, err - } - - // Iterate through the combined list of addresses from prior links and - // node announcements and attempt to reconnect to each node. - for pubStr, nodeAddr := range nodeAddrsMap { - for _, address := range nodeAddr.addresses { - // Create a wrapper address which couples the IP and - // the pubkey so the brontide authenticated connection - // can be established. - lnAddr := &lnwire.NetAddress{ - IdentityKey: nodeAddr.pubKey, - Address: address, - } - srvrLog.Debugf("Attempting persistent connection to "+ - "channel peer %v", lnAddr) - - // Send the persistent connection request to the - // connection manager, saving the request itself so we - // can cancel/restart the process as needed. - connReq := &connmgr.ConnReq{ - Addr: lnAddr, - Permanent: true, - } - - s.persistentConnReqs[pubStr] = append(s.persistentConnReqs[pubStr], - connReq) - go s.connMgr.Connect(connReq) - } - } - return s, nil } @@ -477,6 +364,13 @@ func (s *server) Start() error { s.wg.Add(1) go s.queryHandler() + // With all the relevant sub-systems started, we'll now atetmpt to + // stasblish persistent connections to our direct channel collaborators + // within the network. + if err := s.establishPersistentConnections(); err != nil { + return err + } + return nil } @@ -507,6 +401,129 @@ func (s *server) Stop() error { return nil } +// establishPersistentConnections attempts to establish persistent connections +// to all our direct channel collaborators. In order to promote liveness of +// our active channels, we instruct the connection manager to attempt to +// establish and maintain persistent connections to all our direct channel +// counterparties. +func (s *server) establishPersistentConnections() error { + // nodeAddrsMap stores the combination of node public keys and + // addresses that we'll attempt to reconnect to. PubKey strings are + // used as keys since other PubKey forms can't be compared. + nodeAddrsMap := map[string]*nodeAddresses{} + + // Iterate through the list of LinkNodes to find addresses we should + // attempt to connect to based on our set of previous connections. Set + // the reconnection port to the default peer port. + linkNodes, err := s.chanDB.FetchAllLinkNodes() + if err != nil && err != channeldb.ErrLinkNodesNotFound { + return err + } + for _, node := range linkNodes { + for _, address := range node.Addresses { + if address.Port == 0 { + address.Port = defaultPeerPort + } + } + pubStr := string(node.IdentityPub.SerializeCompressed()) + nodeAddrs := &nodeAddresses{ + pubKey: node.IdentityPub, + addresses: node.Addresses, + } + nodeAddrsMap[pubStr] = nodeAddrs + } + + // After checking our previous connections for addresses to connect to, + // iterate through the nodes in our channel graph to find addresses + // that have been added via NodeAnnouncement messages. + chanGraph := s.chanDB.ChannelGraph() + sourceNode, err := chanGraph.SourceNode() + if err != nil { + return err + } + err = sourceNode.ForEachChannel(nil, func(_ *bolt.Tx, + _ *channeldb.ChannelEdgeInfo, policy *channeldb.ChannelEdgePolicy) error { + + pubStr := string(policy.Node.PubKey.SerializeCompressed()) + + // Add addresses from channel graph/NodeAnnouncements to the + // list of addresses we'll connect to. If there are duplicates + // that have different ports specified, the port from the + // channel graph should supersede the port from the link node. + var addrs []*net.TCPAddr + linkNodeAddrs, ok := nodeAddrsMap[pubStr] + if ok { + for _, lnAddress := range linkNodeAddrs.addresses { + var addrMatched bool + for _, polAddress := range policy.Node.Addresses { + polTCPAddr, ok := polAddress.(*net.TCPAddr) + if ok && polTCPAddr.IP.Equal(lnAddress.IP) { + addrMatched = true + addrs = append(addrs, polTCPAddr) + } + } + if !addrMatched { + addrs = append(addrs, lnAddress) + } + } + } else { + for _, addr := range policy.Node.Addresses { + polTCPAddr, ok := addr.(*net.TCPAddr) + if ok { + addrs = append(addrs, polTCPAddr) + } + } + } + + nodeAddrsMap[pubStr] = &nodeAddresses{ + pubKey: policy.Node.PubKey, + addresses: addrs, + } + + return nil + }) + if err != nil && err != channeldb.ErrGraphNoEdgesFound { + return err + } + + // Iterate through the combined list of addresses from prior links and + // node announcements and attempt to reconnect to each node. + for pubStr, nodeAddr := range nodeAddrsMap { + // Add this peer to the set of peers we should maintain a + // persistent connection with. + s.persistentPeers[pubStr] = struct{}{} + + for _, address := range nodeAddr.addresses { + // Create a wrapper address which couples the IP and + // the pubkey so the brontide authenticated connection + // can be established. + lnAddr := &lnwire.NetAddress{ + IdentityKey: nodeAddr.pubKey, + Address: address, + } + srvrLog.Debugf("Attempting persistent connection to "+ + "channel peer %v", lnAddr) + + // Send the persistent connection request to the + // connection manager, saving the request itself so we + // can cancel/restart the process as needed. + connReq := &connmgr.ConnReq{ + Addr: lnAddr, + Permanent: true, + } + + s.pendingConnMtx.Lock() + s.persistentConnReqs[pubStr] = append(s.persistentConnReqs[pubStr], + connReq) + s.pendingConnMtx.Unlock() + + go s.connMgr.Connect(connReq) + } + } + + return nil +} + // WaitForShutdown blocks all goroutines have been stopped. func (s *server) WaitForShutdown() { s.wg.Wait()