server: move establishment of persistent connections to Start()

This commit is contained in:
Olaoluwa Osuntokun 2017-04-23 19:21:32 -07:00
parent 7fe02c7bf6
commit fe3c90362e
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2

257
server.go

@ -187,22 +187,20 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier,
chanGraph := chanDB.ChannelGraph() chanGraph := chanDB.ChannelGraph()
// In order to have ability to announce the self node we need to // TODO(roasbeef): make alias configurable
// sign the node announce message, and include the signature in the
// node channeldb object.
alias := lnwire.NewAlias(hex.EncodeToString(serializedPubKey[:10])) alias := lnwire.NewAlias(hex.EncodeToString(serializedPubKey[:10]))
self := &channeldb.LightningNode{ self := &channeldb.LightningNode{
LastUpdate: time.Now(), LastUpdate: time.Now(),
Addresses: selfAddrs, Addresses: selfAddrs,
PubKey: privKey.PubKey(), PubKey: privKey.PubKey(),
// TODO(roasbeef): make alias configurable Alias: alias.String(),
Alias: alias.String(), Features: globalFeatures,
Features: globalFeatures,
} }
// Initialize graph with authenticated lightning node. We need to // If our information has changed since our last boot, then we'll
// generate a valid signature in order for other nodes on the network // re-sign our node announcement so a fresh authenticated version of it
// to accept our announcement. // can be propagated throughout the network upon startup.
// TODO(roasbeef): don't always set timestamp above to _now.
self.AuthSig, err = discovery.SignAnnouncement(s.nodeSigner, self.AuthSig, err = discovery.SignAnnouncement(s.nodeSigner,
s.identityPriv.PubKey(), s.identityPriv.PubKey(),
&lnwire.NodeAnnouncement{ &lnwire.NodeAnnouncement{
@ -217,7 +215,6 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier,
return nil, fmt.Errorf("unable to generate signature for "+ return nil, fmt.Errorf("unable to generate signature for "+
"self node announcement: %v", err) "self node announcement: %v", err)
} }
if err := chanGraph.SetSourceNode(self); err != nil { if err := chanGraph.SetSourceNode(self); err != nil {
return nil, fmt.Errorf("can't set self node: %v", err) return nil, fmt.Errorf("can't set self node: %v", err)
} }
@ -322,116 +319,6 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier,
} }
s.connMgr = cmgr s.connMgr = cmgr
// In order to promote liveness of our active channels, instruct the
// connection manager to attempt to establish and maintain persistent
// connections to all our direct channel counterparties.
// nodeAddrsMap stores the combination of node public keys and
// addresses that we'll attempt to reconnect to. PubKey strings are
// used as keys since other PubKey forms can't be compared.
nodeAddrsMap := map[string]*nodeAddresses{}
// Iterate through the list of LinkNodes to find addresses we should
// attempt to connect to based on our set of previous connections. Set
// the reconnection port to the default peer port.
linkNodes, err := s.chanDB.FetchAllLinkNodes()
if err != nil && err != channeldb.ErrLinkNodesNotFound {
return nil, err
}
for _, node := range linkNodes {
for _, address := range node.Addresses {
if address.Port == 0 {
address.Port = defaultPeerPort
}
}
pubStr := string(node.IdentityPub.SerializeCompressed())
nodeAddrs := &nodeAddresses{
pubKey: node.IdentityPub,
addresses: node.Addresses,
}
nodeAddrsMap[pubStr] = nodeAddrs
}
// After checking our previous connections for addresses to connect to,
// iterate through the nodes in our channel graph to find addresses
// that have been added via NodeAnnouncement messages.
sourceNode, err := chanGraph.SourceNode()
if err != nil {
return nil, err
}
err = sourceNode.ForEachChannel(nil, func(_ *bolt.Tx,
_ *channeldb.ChannelEdgeInfo, policy *channeldb.ChannelEdgePolicy) error {
pubStr := string(policy.Node.PubKey.SerializeCompressed())
// Add addresses from channel graph/NodeAnnouncements to the
// list of addresses we'll connect to. If there are duplicates
// that have different ports specified, the port from the
// channel graph should supersede the port from the link node.
var addrs []*net.TCPAddr
linkNodeAddrs, ok := nodeAddrsMap[pubStr]
if ok {
for _, lnAddress := range linkNodeAddrs.addresses {
var addrMatched bool
for _, polAddress := range policy.Node.Addresses {
polTCPAddr, ok := polAddress.(*net.TCPAddr)
if ok && polTCPAddr.IP.Equal(lnAddress.IP) {
addrMatched = true
addrs = append(addrs, polTCPAddr)
}
}
if !addrMatched {
addrs = append(addrs, lnAddress)
}
}
} else {
for _, addr := range policy.Node.Addresses {
polTCPAddr, ok := addr.(*net.TCPAddr)
if ok {
addrs = append(addrs, polTCPAddr)
}
}
}
nodeAddrsMap[pubStr] = &nodeAddresses{
pubKey: policy.Node.PubKey,
addresses: addrs,
}
return nil
})
if err != nil && err != channeldb.ErrGraphNoEdgesFound {
return nil, err
}
// Iterate through the combined list of addresses from prior links and
// node announcements and attempt to reconnect to each node.
for pubStr, nodeAddr := range nodeAddrsMap {
for _, address := range nodeAddr.addresses {
// Create a wrapper address which couples the IP and
// the pubkey so the brontide authenticated connection
// can be established.
lnAddr := &lnwire.NetAddress{
IdentityKey: nodeAddr.pubKey,
Address: address,
}
srvrLog.Debugf("Attempting persistent connection to "+
"channel peer %v", lnAddr)
// Send the persistent connection request to the
// connection manager, saving the request itself so we
// can cancel/restart the process as needed.
connReq := &connmgr.ConnReq{
Addr: lnAddr,
Permanent: true,
}
s.persistentConnReqs[pubStr] = append(s.persistentConnReqs[pubStr],
connReq)
go s.connMgr.Connect(connReq)
}
}
return s, nil return s, nil
} }
@ -477,6 +364,13 @@ func (s *server) Start() error {
s.wg.Add(1) s.wg.Add(1)
go s.queryHandler() go s.queryHandler()
// With all the relevant sub-systems started, we'll now atetmpt to
// stasblish persistent connections to our direct channel collaborators
// within the network.
if err := s.establishPersistentConnections(); err != nil {
return err
}
return nil return nil
} }
@ -507,6 +401,129 @@ func (s *server) Stop() error {
return nil return nil
} }
// establishPersistentConnections attempts to establish persistent connections
// to all our direct channel collaborators. In order to promote liveness of
// our active channels, we instruct the connection manager to attempt to
// establish and maintain persistent connections to all our direct channel
// counterparties.
func (s *server) establishPersistentConnections() error {
// nodeAddrsMap stores the combination of node public keys and
// addresses that we'll attempt to reconnect to. PubKey strings are
// used as keys since other PubKey forms can't be compared.
nodeAddrsMap := map[string]*nodeAddresses{}
// Iterate through the list of LinkNodes to find addresses we should
// attempt to connect to based on our set of previous connections. Set
// the reconnection port to the default peer port.
linkNodes, err := s.chanDB.FetchAllLinkNodes()
if err != nil && err != channeldb.ErrLinkNodesNotFound {
return err
}
for _, node := range linkNodes {
for _, address := range node.Addresses {
if address.Port == 0 {
address.Port = defaultPeerPort
}
}
pubStr := string(node.IdentityPub.SerializeCompressed())
nodeAddrs := &nodeAddresses{
pubKey: node.IdentityPub,
addresses: node.Addresses,
}
nodeAddrsMap[pubStr] = nodeAddrs
}
// After checking our previous connections for addresses to connect to,
// iterate through the nodes in our channel graph to find addresses
// that have been added via NodeAnnouncement messages.
chanGraph := s.chanDB.ChannelGraph()
sourceNode, err := chanGraph.SourceNode()
if err != nil {
return err
}
err = sourceNode.ForEachChannel(nil, func(_ *bolt.Tx,
_ *channeldb.ChannelEdgeInfo, policy *channeldb.ChannelEdgePolicy) error {
pubStr := string(policy.Node.PubKey.SerializeCompressed())
// Add addresses from channel graph/NodeAnnouncements to the
// list of addresses we'll connect to. If there are duplicates
// that have different ports specified, the port from the
// channel graph should supersede the port from the link node.
var addrs []*net.TCPAddr
linkNodeAddrs, ok := nodeAddrsMap[pubStr]
if ok {
for _, lnAddress := range linkNodeAddrs.addresses {
var addrMatched bool
for _, polAddress := range policy.Node.Addresses {
polTCPAddr, ok := polAddress.(*net.TCPAddr)
if ok && polTCPAddr.IP.Equal(lnAddress.IP) {
addrMatched = true
addrs = append(addrs, polTCPAddr)
}
}
if !addrMatched {
addrs = append(addrs, lnAddress)
}
}
} else {
for _, addr := range policy.Node.Addresses {
polTCPAddr, ok := addr.(*net.TCPAddr)
if ok {
addrs = append(addrs, polTCPAddr)
}
}
}
nodeAddrsMap[pubStr] = &nodeAddresses{
pubKey: policy.Node.PubKey,
addresses: addrs,
}
return nil
})
if err != nil && err != channeldb.ErrGraphNoEdgesFound {
return err
}
// Iterate through the combined list of addresses from prior links and
// node announcements and attempt to reconnect to each node.
for pubStr, nodeAddr := range nodeAddrsMap {
// Add this peer to the set of peers we should maintain a
// persistent connection with.
s.persistentPeers[pubStr] = struct{}{}
for _, address := range nodeAddr.addresses {
// Create a wrapper address which couples the IP and
// the pubkey so the brontide authenticated connection
// can be established.
lnAddr := &lnwire.NetAddress{
IdentityKey: nodeAddr.pubKey,
Address: address,
}
srvrLog.Debugf("Attempting persistent connection to "+
"channel peer %v", lnAddr)
// Send the persistent connection request to the
// connection manager, saving the request itself so we
// can cancel/restart the process as needed.
connReq := &connmgr.ConnReq{
Addr: lnAddr,
Permanent: true,
}
s.pendingConnMtx.Lock()
s.persistentConnReqs[pubStr] = append(s.persistentConnReqs[pubStr],
connReq)
s.pendingConnMtx.Unlock()
go s.connMgr.Connect(connReq)
}
}
return nil
}
// WaitForShutdown blocks all goroutines have been stopped. // WaitForShutdown blocks all goroutines have been stopped.
func (s *server) WaitForShutdown() { func (s *server) WaitForShutdown() {
s.wg.Wait() s.wg.Wait()