lnd: switch to using the connmgr for listening and persistent conns
This commit revamps the way in bound and outbound connections are handled within lnd. Instead of manually managing listening goroutines and also outbound connections, all the duty is now assigned to the connmgr, a new btcsuite package. The connmgr now handles accepting inbound (brontide) connections and communicates with the server to hand off new connections via a callback. Additionally, any outbound connection attempt is now made persistent by default, with the assumption that (for right now), connections are only to be made to peers we wish to make connections to. Finally, on start-up we now attempt to connection to all/any of our direct channel counter parties in order to promote the availability of our channels to the daemon itself and any RPC users.
This commit is contained in:
parent
89326423dc
commit
bd89a9312d
13
config.go
13
config.go
@ -2,6 +2,7 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sort"
|
"sort"
|
||||||
@ -9,6 +10,9 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
flags "github.com/btcsuite/go-flags"
|
flags "github.com/btcsuite/go-flags"
|
||||||
|
"github.com/lightningnetwork/lnd/brontide"
|
||||||
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
|
"github.com/roasbeef/btcd/btcec"
|
||||||
"github.com/roasbeef/btcutil"
|
"github.com/roasbeef/btcutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -302,3 +306,12 @@ func supportedSubsystems() []string {
|
|||||||
sort.Strings(subsystems)
|
sort.Strings(subsystems)
|
||||||
return subsystems
|
return subsystems
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// noiseDial is a factory function which creates a connmgr compliant dialing
|
||||||
|
// function by returning a closure which includes the server's identity key.
|
||||||
|
func noiseDial(idPriv *btcec.PrivateKey) func(net.Addr) (net.Conn, error) {
|
||||||
|
return func(a net.Addr) (net.Conn, error) {
|
||||||
|
lnAddr := a.(*lnwire.NetAddress)
|
||||||
|
return brontide.Dial(idPriv, lnAddr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -337,12 +337,15 @@ func (f *fundingManager) handleFundingRequest(fmsg *fundingRequestMsg) {
|
|||||||
|
|
||||||
ourDustLimit := lnwallet.DefaultDustLimit()
|
ourDustLimit := lnwallet.DefaultDustLimit()
|
||||||
theirDustlimit := msg.DustLimit
|
theirDustlimit := msg.DustLimit
|
||||||
|
|
||||||
// Attempt to initialize a reservation within the wallet. If the wallet
|
// Attempt to initialize a reservation within the wallet. If the wallet
|
||||||
// has insufficient resources to create the channel, then the reservation
|
// has insufficient resources to create the channel, then the reservation
|
||||||
// attempt may be rejected. Note that since we're on the responding
|
// attempt may be rejected. Note that since we're on the responding
|
||||||
// side of a single funder workflow, we don't commit any funds to the
|
// side of a single funder workflow, we don't commit any funds to the
|
||||||
// channel ourselves.
|
// channel ourselves.
|
||||||
// TODO(roasbeef): passing num confs 1 is irrelevant here, make signed?
|
// TODO(roasbeef): passing num confs 1 is irrelevant here, make signed?
|
||||||
|
// TODO(roasbeef): assuming this was an inbound connection, replace
|
||||||
|
// port with default advertised port
|
||||||
reservation, err := f.wallet.InitChannelReservation(amt, 0,
|
reservation, err := f.wallet.InitChannelReservation(amt, 0,
|
||||||
fmsg.peer.addr.IdentityKey, fmsg.peer.addr.Address, 1, delay,
|
fmsg.peer.addr.IdentityKey, fmsg.peer.addr.Address, 1, delay,
|
||||||
ourDustLimit)
|
ourDustLimit)
|
||||||
|
7
log.go
7
log.go
@ -4,6 +4,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
|
"github.com/btcsuite/btcd/connmgr"
|
||||||
"github.com/btcsuite/btclog"
|
"github.com/btcsuite/btclog"
|
||||||
"github.com/btcsuite/seelog"
|
"github.com/btcsuite/seelog"
|
||||||
"github.com/lightningnetwork/lnd/chainntnfs"
|
"github.com/lightningnetwork/lnd/chainntnfs"
|
||||||
@ -28,6 +29,7 @@ var (
|
|||||||
hswcLog = btclog.Disabled
|
hswcLog = btclog.Disabled
|
||||||
utxnLog = btclog.Disabled
|
utxnLog = btclog.Disabled
|
||||||
brarLog = btclog.Disabled
|
brarLog = btclog.Disabled
|
||||||
|
cmgrLog = btclog.Disabled
|
||||||
)
|
)
|
||||||
|
|
||||||
// subsystemLoggers maps each subsystem identifier to its associated logger.
|
// subsystemLoggers maps each subsystem identifier to its associated logger.
|
||||||
@ -43,6 +45,7 @@ var subsystemLoggers = map[string]btclog.Logger{
|
|||||||
"HSWC": hswcLog,
|
"HSWC": hswcLog,
|
||||||
"UTXN": utxnLog,
|
"UTXN": utxnLog,
|
||||||
"BRAR": brarLog,
|
"BRAR": brarLog,
|
||||||
|
"CMGR": cmgrLog,
|
||||||
}
|
}
|
||||||
|
|
||||||
// useLogger updates the logger references for subsystemID to logger. Invalid
|
// useLogger updates the logger references for subsystemID to logger. Invalid
|
||||||
@ -89,6 +92,10 @@ func useLogger(subsystemID string, logger btclog.Logger) {
|
|||||||
|
|
||||||
case "BRAR":
|
case "BRAR":
|
||||||
brarLog = logger
|
brarLog = logger
|
||||||
|
|
||||||
|
case "CMGR":
|
||||||
|
cmgrLog = logger
|
||||||
|
connmgr.UseLogger(logger)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
10
peer.go
10
peer.go
@ -11,6 +11,7 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/btcsuite/btcd/connmgr"
|
||||||
"github.com/btcsuite/fastsha256"
|
"github.com/btcsuite/fastsha256"
|
||||||
"github.com/davecgh/go-spew/spew"
|
"github.com/davecgh/go-spew/spew"
|
||||||
"github.com/lightningnetwork/lightning-onion"
|
"github.com/lightningnetwork/lightning-onion"
|
||||||
@ -65,7 +66,8 @@ type peer struct {
|
|||||||
connected int32
|
connected int32
|
||||||
disconnect int32
|
disconnect int32
|
||||||
|
|
||||||
conn net.Conn
|
connReq *connmgr.ConnReq
|
||||||
|
conn net.Conn
|
||||||
|
|
||||||
addr *lnwire.NetAddress
|
addr *lnwire.NetAddress
|
||||||
lightningID wire.ShaHash
|
lightningID wire.ShaHash
|
||||||
@ -318,6 +320,10 @@ func (p *peer) Disconnect() {
|
|||||||
|
|
||||||
close(p.quit)
|
close(p.quit)
|
||||||
|
|
||||||
|
if p.connReq != nil {
|
||||||
|
p.server.connMgr.Disconnect(p.connReq.ID())
|
||||||
|
}
|
||||||
|
|
||||||
// Launch a goroutine to clean up the remaining resources.
|
// Launch a goroutine to clean up the remaining resources.
|
||||||
go func() {
|
go func() {
|
||||||
// Tell the switch to unregister all links associated with this
|
// Tell the switch to unregister all links associated with this
|
||||||
@ -762,6 +768,8 @@ func (p *peer) executeCooperativeClose(channel *lnwallet.LightningChannel) (*wir
|
|||||||
|
|
||||||
// handleLocalClose kicks-off the workflow to execute a cooperative or forced
|
// handleLocalClose kicks-off the workflow to execute a cooperative or forced
|
||||||
// unilateral closure of the channel initiated by a local sub-system.
|
// unilateral closure of the channel initiated by a local sub-system.
|
||||||
|
// TODO(roasbeef): if no more active channels with peer call Remove on connMgr
|
||||||
|
// with peerID
|
||||||
func (p *peer) handleLocalClose(req *closeLinkReq) {
|
func (p *peer) handleLocalClose(req *closeLinkReq) {
|
||||||
var (
|
var (
|
||||||
err error
|
err error
|
||||||
|
371
server.go
371
server.go
@ -5,7 +5,9 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/btcsuite/btcd/connmgr"
|
||||||
"github.com/btcsuite/fastsha256"
|
"github.com/btcsuite/fastsha256"
|
||||||
"github.com/lightningnetwork/lightning-onion"
|
"github.com/lightningnetwork/lightning-onion"
|
||||||
"github.com/lightningnetwork/lnd/brontide"
|
"github.com/lightningnetwork/lnd/brontide"
|
||||||
@ -37,8 +39,9 @@ type server struct {
|
|||||||
// long-term identity private key.
|
// long-term identity private key.
|
||||||
lightningID [32]byte
|
lightningID [32]byte
|
||||||
|
|
||||||
listeners []net.Listener
|
peersMtx sync.RWMutex
|
||||||
peers map[int32]*peer
|
peersByID map[int32]*peer
|
||||||
|
peersByPub map[string]*peer
|
||||||
|
|
||||||
rpcServer *rpcServer
|
rpcServer *rpcServer
|
||||||
|
|
||||||
@ -60,6 +63,12 @@ type server struct {
|
|||||||
|
|
||||||
sphinx *sphinx.Router
|
sphinx *sphinx.Router
|
||||||
|
|
||||||
|
connMgr *connmgr.ConnManager
|
||||||
|
|
||||||
|
pendingConnMtx sync.RWMutex
|
||||||
|
persistentConnReqs map[string]*connmgr.ConnReq
|
||||||
|
pendingConnRequests map[string]*connectPeerMsg
|
||||||
|
|
||||||
newPeers chan *peer
|
newPeers chan *peer
|
||||||
donePeers chan *peer
|
donePeers chan *peer
|
||||||
queries chan interface{}
|
queries chan interface{}
|
||||||
@ -104,13 +113,17 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier,
|
|||||||
sphinx: sphinx.NewRouter(privKey, activeNetParams.Params),
|
sphinx: sphinx.NewRouter(privKey, activeNetParams.Params),
|
||||||
lightningID: fastsha256.Sum256(serializedPubKey),
|
lightningID: fastsha256.Sum256(serializedPubKey),
|
||||||
|
|
||||||
listeners: listeners,
|
pendingConnRequests: make(map[string]*connectPeerMsg),
|
||||||
|
persistentConnReqs: make(map[string]*connmgr.ConnReq),
|
||||||
|
|
||||||
peers: make(map[int32]*peer),
|
peersByID: make(map[int32]*peer),
|
||||||
newPeers: make(chan *peer, 100),
|
peersByPub: make(map[string]*peer),
|
||||||
donePeers: make(chan *peer, 100),
|
|
||||||
queries: make(chan interface{}),
|
newPeers: make(chan *peer, 10),
|
||||||
quit: make(chan struct{}),
|
donePeers: make(chan *peer, 10),
|
||||||
|
|
||||||
|
queries: make(chan interface{}),
|
||||||
|
quit: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the debug HTLC flag is on, then we invoice a "master debug"
|
// If the debug HTLC flag is on, then we invoice a "master debug"
|
||||||
@ -137,7 +150,7 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier,
|
|||||||
}
|
}
|
||||||
|
|
||||||
var targetPeer *peer
|
var targetPeer *peer
|
||||||
for _, peer := range s.peers { // TODO: threadsafe api
|
for _, peer := range s.peersByID { // TODO: threadsafe API
|
||||||
nodePub := peer.addr.IdentityKey.SerializeCompressed()
|
nodePub := peer.addr.IdentityKey.SerializeCompressed()
|
||||||
nodeVertex := graph.NewVertex(nodePub[:])
|
nodeVertex := graph.NewVertex(nodePub[:])
|
||||||
|
|
||||||
@ -157,6 +170,7 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier,
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
s.routingMgr = routing.NewRoutingManager(graph.NewVertex(selfVertex), routingMgrConfig)
|
s.routingMgr = routing.NewRoutingManager(graph.NewVertex(selfVertex), routingMgrConfig)
|
||||||
|
|
||||||
s.htlcSwitch = newHtlcSwitch(serializedPubKey, s.routingMgr)
|
s.htlcSwitch = newHtlcSwitch(serializedPubKey, s.routingMgr)
|
||||||
|
|
||||||
s.rpcServer = newRpcServer(s)
|
s.rpcServer = newRpcServer(s)
|
||||||
@ -168,6 +182,57 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier,
|
|||||||
// TODO(roasbeef): introduce closure and config system to decouple the
|
// TODO(roasbeef): introduce closure and config system to decouple the
|
||||||
// initialization above ^
|
// initialization above ^
|
||||||
|
|
||||||
|
// Create the connection manager which will be responsible for
|
||||||
|
// maintaining persistent outbound connections and also accepting new
|
||||||
|
// incoming connections
|
||||||
|
cmgr, err := connmgr.New(&connmgr.Config{
|
||||||
|
Listeners: listeners,
|
||||||
|
OnAccept: s.inboundPeerConnected,
|
||||||
|
RetryDuration: time.Second * 5,
|
||||||
|
TargetOutbound: 100,
|
||||||
|
GetNewAddress: nil,
|
||||||
|
Dial: noiseDial(s.identityPriv),
|
||||||
|
OnConnection: s.outboundPeerConnected,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
s.connMgr = cmgr
|
||||||
|
|
||||||
|
// In order to promote liveness of our active channels, instruct the
|
||||||
|
// connection manager to attempt to establish and maintain persistent
|
||||||
|
// connections to all our direct channel counter parties.
|
||||||
|
linkNodes, err := s.chanDB.FetchAllLinkNodes()
|
||||||
|
if err != nil && err != channeldb.ErrLinkNodesNotFound {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for _, node := range linkNodes {
|
||||||
|
// Create a wrapper address which couples the IP and the pubkey
|
||||||
|
// so the brontide authenticated connection can be established.
|
||||||
|
lnAddr := &lnwire.NetAddress{
|
||||||
|
IdentityKey: node.IdentityPub,
|
||||||
|
Address: node.Addresses[0],
|
||||||
|
}
|
||||||
|
pubStr := string(node.IdentityPub.SerializeCompressed())
|
||||||
|
srvrLog.Debugf("Attempting persistent connection to channel "+
|
||||||
|
"peer %v", lnAddr)
|
||||||
|
|
||||||
|
// Send the persistent connection request to the connection
|
||||||
|
// manager, saving the request itself so we can
|
||||||
|
// cancel/restart the process as needed.
|
||||||
|
connReq := &connmgr.ConnReq{
|
||||||
|
Addr: lnAddr,
|
||||||
|
Permanent: true,
|
||||||
|
}
|
||||||
|
s.persistentConnReqs[pubStr] = connReq
|
||||||
|
go s.connMgr.Connect(connReq)
|
||||||
|
|
||||||
|
s.pendingConnRequests[pubStr] = &connectPeerMsg{
|
||||||
|
resp: make(chan int32, 1),
|
||||||
|
err: make(chan error, 1),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -179,12 +244,6 @@ func (s *server) Start() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start all the listeners.
|
|
||||||
for _, l := range s.listeners {
|
|
||||||
s.wg.Add(1)
|
|
||||||
go s.listener(l)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start the notification server. This is used so channel management
|
// Start the notification server. This is used so channel management
|
||||||
// goroutines can be notified when a funding transaction reaches a
|
// goroutines can be notified when a funding transaction reaches a
|
||||||
// sufficient number of confirmations, or when the input for the
|
// sufficient number of confirmations, or when the input for the
|
||||||
@ -226,13 +285,6 @@ func (s *server) Stop() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop all the listeners.
|
|
||||||
for _, listener := range s.listeners {
|
|
||||||
if err := listener.Close(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Shutdown the wallet, funding manager, and the rpc server.
|
// Shutdown the wallet, funding manager, and the rpc server.
|
||||||
s.chainNotifier.Stop()
|
s.chainNotifier.Stop()
|
||||||
s.rpcServer.Stop()
|
s.rpcServer.Stop()
|
||||||
@ -256,9 +308,116 @@ func (s *server) WaitForShutdown() {
|
|||||||
s.wg.Wait()
|
s.wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// peerConnected is a function that handles initialization a newly connected
|
||||||
|
// peer by adding it to the server's global list of all active peers, and
|
||||||
|
// starting all the goroutines the peer needs to function properly.
|
||||||
|
func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, inbound bool) {
|
||||||
|
brontideConn := conn.(*brontide.Conn)
|
||||||
|
peerAddr := &lnwire.NetAddress{
|
||||||
|
IdentityKey: brontideConn.RemotePub(),
|
||||||
|
Address: conn.RemoteAddr().(*net.TCPAddr),
|
||||||
|
ChainNet: activeNetParams.Net,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now that we've established a connection, create a peer, and
|
||||||
|
// it to the set of currently active peers.
|
||||||
|
peer, err := newPeer(conn, s, peerAddr, false)
|
||||||
|
if err != nil {
|
||||||
|
srvrLog.Errorf("unable to create peer %v", err)
|
||||||
|
conn.Close()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if connReq != nil {
|
||||||
|
peer.connReq = connReq
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(roasbeef): update IP address for link-node
|
||||||
|
// * also mark last-seen, do it one single transaction?
|
||||||
|
|
||||||
|
peer.Start()
|
||||||
|
s.newPeers <- peer
|
||||||
|
|
||||||
|
// If this was an RPC initiated outbound connection that was
|
||||||
|
// successfully established, then send a response back to the client so
|
||||||
|
pubStr := string(peerAddr.IdentityKey.SerializeCompressed())
|
||||||
|
s.pendingConnMtx.RLock()
|
||||||
|
msg, ok := s.pendingConnRequests[pubStr]
|
||||||
|
s.pendingConnMtx.RUnlock()
|
||||||
|
if ok {
|
||||||
|
msg.resp <- peer.id
|
||||||
|
msg.err <- nil
|
||||||
|
|
||||||
|
s.pendingConnMtx.Lock()
|
||||||
|
delete(s.pendingConnRequests, pubStr)
|
||||||
|
s.pendingConnMtx.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// inboundPeerConnected initializes a new peer in response to a new inbound
|
||||||
|
// connection.
|
||||||
|
func (s *server) inboundPeerConnected(conn net.Conn) {
|
||||||
|
s.peersMtx.Lock()
|
||||||
|
defer s.peersMtx.Unlock()
|
||||||
|
|
||||||
|
srvrLog.Tracef("New inbound connection from %v", conn.RemoteAddr())
|
||||||
|
|
||||||
|
nodePub := conn.(*brontide.Conn).RemotePub()
|
||||||
|
|
||||||
|
// If we already have an outbound connection to this peer, simply drop
|
||||||
|
// the connection.
|
||||||
|
pubStr := string(nodePub.SerializeCompressed())
|
||||||
|
if _, ok := s.peersByPub[pubStr]; ok {
|
||||||
|
srvrLog.Errorf("Received inbound connection from peer %x, but "+
|
||||||
|
"already connected, dropping conn",
|
||||||
|
nodePub.SerializeCompressed())
|
||||||
|
conn.Close()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// However, if we receive an incoming connection from a peer we're
|
||||||
|
// attempting to maintain a persistent connection with then we need to
|
||||||
|
// cancel the ongoing connection attempts to ensure that we don't end
|
||||||
|
// up with a duplicate connecting to the same peer.
|
||||||
|
s.pendingConnMtx.RLock()
|
||||||
|
if connReq, ok := s.persistentConnReqs[pubStr]; ok {
|
||||||
|
fmt.Println("trying to cancel out attempt")
|
||||||
|
s.connMgr.Remove(connReq.ID())
|
||||||
|
}
|
||||||
|
s.pendingConnMtx.RUnlock()
|
||||||
|
|
||||||
|
s.peerConnected(conn, nil, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
// outboundPeerConnected initializes a new peer in response to a new outbound
|
||||||
|
// connection.
|
||||||
|
func (s *server) outboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) {
|
||||||
|
s.peersMtx.Lock()
|
||||||
|
defer s.peersMtx.Unlock()
|
||||||
|
|
||||||
|
srvrLog.Tracef("Established connection to: %v", conn.RemoteAddr())
|
||||||
|
|
||||||
|
nodePub := conn.(*brontide.Conn).RemotePub()
|
||||||
|
|
||||||
|
// If we already have an inbound connection from this peer, simply drop
|
||||||
|
// the connection.
|
||||||
|
pubStr := string(nodePub.SerializeCompressed())
|
||||||
|
if _, ok := s.peersByPub[pubStr]; ok {
|
||||||
|
srvrLog.Errorf("Established outbound connection to peer %x, but "+
|
||||||
|
"already connected, dropping conn",
|
||||||
|
nodePub.SerializeCompressed())
|
||||||
|
s.connMgr.Remove(connReq.ID())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
s.peerConnected(conn, connReq, true)
|
||||||
|
}
|
||||||
|
|
||||||
// addPeer adds the passed peer to the server's global state of all active
|
// addPeer adds the passed peer to the server's global state of all active
|
||||||
// peers.
|
// peers.
|
||||||
func (s *server) addPeer(p *peer) {
|
func (s *server) addPeer(p *peer) {
|
||||||
|
s.peersMtx.Lock()
|
||||||
|
defer s.peersMtx.Unlock()
|
||||||
|
|
||||||
if p == nil {
|
if p == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -269,12 +428,16 @@ func (s *server) addPeer(p *peer) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
s.peers[p.id] = p
|
s.peersByID[p.id] = p
|
||||||
|
s.peersByPub[string(p.addr.IdentityKey.SerializeCompressed())] = p
|
||||||
}
|
}
|
||||||
|
|
||||||
// removePeer removes the passed peer from the server's state of all active
|
// removePeer removes the passed peer from the server's state of all active
|
||||||
// peers.
|
// peers.
|
||||||
func (s *server) removePeer(p *peer) {
|
func (s *server) removePeer(p *peer) {
|
||||||
|
s.peersMtx.Lock()
|
||||||
|
defer s.peersMtx.Unlock()
|
||||||
|
|
||||||
srvrLog.Debugf("removing peer %v", p)
|
srvrLog.Debugf("removing peer %v", p)
|
||||||
|
|
||||||
if p == nil {
|
if p == nil {
|
||||||
@ -287,7 +450,8 @@ func (s *server) removePeer(p *peer) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
delete(s.peers, p.id)
|
delete(s.peersByID, p.id)
|
||||||
|
delete(s.peersByPub, string(p.addr.IdentityKey.SerializeCompressed()))
|
||||||
}
|
}
|
||||||
|
|
||||||
// connectPeerMsg is a message requesting the server to open a connection to a
|
// connectPeerMsg is a message requesting the server to open a connection to a
|
||||||
@ -331,6 +495,8 @@ type openChanReq struct {
|
|||||||
//
|
//
|
||||||
// NOTE: This MUST be run as a goroutine.
|
// NOTE: This MUST be run as a goroutine.
|
||||||
func (s *server) queryHandler() {
|
func (s *server) queryHandler() {
|
||||||
|
go s.connMgr.Start()
|
||||||
|
|
||||||
out:
|
out:
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
@ -343,7 +509,6 @@ out:
|
|||||||
s.removePeer(p)
|
s.removePeer(p)
|
||||||
|
|
||||||
case query := <-s.queries:
|
case query := <-s.queries:
|
||||||
// TODO(roasbeef): make all goroutines?
|
|
||||||
switch msg := query.(type) {
|
switch msg := query.(type) {
|
||||||
case *connectPeerMsg:
|
case *connectPeerMsg:
|
||||||
s.handleConnectPeer(msg)
|
s.handleConnectPeer(msg)
|
||||||
@ -357,14 +522,16 @@ out:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.connMgr.Stop()
|
||||||
|
|
||||||
s.wg.Done()
|
s.wg.Done()
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleListPeers sends a lice of all currently active peers to the original
|
// handleListPeers sends a lice of all currently active peers to the original
|
||||||
// caller.
|
// caller.
|
||||||
func (s *server) handleListPeers(msg *listPeersMsg) {
|
func (s *server) handleListPeers(msg *listPeersMsg) {
|
||||||
peers := make([]*peer, 0, len(s.peers))
|
peers := make([]*peer, 0, len(s.peersByID))
|
||||||
for _, peer := range s.peers {
|
for _, peer := range s.peersByID {
|
||||||
peers = append(peers, peer)
|
peers = append(peers, peer)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -377,75 +544,74 @@ func (s *server) handleListPeers(msg *listPeersMsg) {
|
|||||||
func (s *server) handleConnectPeer(msg *connectPeerMsg) {
|
func (s *server) handleConnectPeer(msg *connectPeerMsg) {
|
||||||
addr := msg.addr
|
addr := msg.addr
|
||||||
|
|
||||||
|
targetPub := string(msg.addr.IdentityKey.SerializeCompressed())
|
||||||
|
|
||||||
// Ensure we're not already connected to this
|
// Ensure we're not already connected to this
|
||||||
// peer.
|
// peer.
|
||||||
targetPub := msg.addr.IdentityKey
|
s.peersMtx.RLock()
|
||||||
for _, peer := range s.peers {
|
peer, ok := s.peersByPub[targetPub]
|
||||||
if peer.addr.IdentityKey.IsEqual(targetPub) {
|
if ok {
|
||||||
msg.err <- fmt.Errorf(
|
s.peersMtx.RUnlock()
|
||||||
"already connected to peer: %v",
|
msg.err <- fmt.Errorf("already connected to peer: %v", peer)
|
||||||
peer.addr,
|
msg.resp <- -1
|
||||||
)
|
return
|
||||||
msg.resp <- -1
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
s.peersMtx.RUnlock()
|
||||||
|
|
||||||
// Launch a goroutine to connect to the requested peer so we can
|
// If there's already a pending connection request for this pubkey,
|
||||||
// continue to handle queries.
|
// then we ignore this request to ensure we don't create a redundant
|
||||||
//
|
// connection.
|
||||||
// TODO(roasbeef): semaphore to limit the number of goroutines for
|
s.pendingConnMtx.RLock()
|
||||||
// async requests.
|
if _, ok := s.pendingConnRequests[targetPub]; ok {
|
||||||
go func() {
|
s.pendingConnMtx.RUnlock()
|
||||||
srvrLog.Debugf("connecting to %v", addr)
|
msg.err <- fmt.Errorf("connection attempt to %v is pending",
|
||||||
|
addr)
|
||||||
|
msg.resp <- -1
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if _, ok := s.persistentConnReqs[targetPub]; ok {
|
||||||
|
s.pendingConnMtx.RUnlock()
|
||||||
|
msg.err <- fmt.Errorf("connection attempt to %v is pending",
|
||||||
|
addr)
|
||||||
|
msg.resp <- -1
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.pendingConnMtx.RUnlock()
|
||||||
|
|
||||||
// Attempt to connect to the remote node. If the we can't make
|
// If there's not already a pending or active connection to this node,
|
||||||
// the connection, or the crypto negotation breaks down, then
|
// then instruct the connection manager to attempt to establish a
|
||||||
// return an error to the caller.
|
// persistent connection to the peer.
|
||||||
conn, err := brontide.Dial(s.identityPriv, addr)
|
srvrLog.Debugf("connecting to %v", addr)
|
||||||
if err != nil {
|
go s.connMgr.Connect(&connmgr.ConnReq{
|
||||||
msg.err <- err
|
Addr: addr,
|
||||||
msg.resp <- -1
|
Permanent: true,
|
||||||
return
|
})
|
||||||
}
|
|
||||||
|
|
||||||
// Now that we've established a connection, create a peer, and
|
// Finally, we store the original request keyed by the public key so we
|
||||||
// it to the set of currently active peers.
|
// can dispatch the response to the RPC client once a connection has
|
||||||
peer, err := newPeer(conn, s, msg.addr, false)
|
// been initiated.
|
||||||
if err != nil {
|
s.pendingConnMtx.Lock()
|
||||||
srvrLog.Errorf("unable to create peer %v", err)
|
s.pendingConnRequests[targetPub] = msg
|
||||||
conn.Close()
|
s.pendingConnMtx.Unlock()
|
||||||
msg.resp <- -1
|
|
||||||
msg.err <- err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO(roasbeef): update IP address for link-node
|
|
||||||
// * also mark last-seen, do it one single transaction?
|
|
||||||
|
|
||||||
peer.Start()
|
|
||||||
s.newPeers <- peer
|
|
||||||
|
|
||||||
msg.resp <- peer.id
|
|
||||||
msg.err <- nil
|
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleOpenChanReq first locates the target peer, and if found hands off the
|
// handleOpenChanReq first locates the target peer, and if found hands off the
|
||||||
// request to the funding manager allowing it to initiate the channel funding
|
// request to the funding manager allowing it to initiate the channel funding
|
||||||
// workflow.
|
// workflow.
|
||||||
func (s *server) handleOpenChanReq(req *openChanReq) {
|
func (s *server) handleOpenChanReq(req *openChanReq) {
|
||||||
|
var targetPeer *peer
|
||||||
|
|
||||||
|
pubStr := string(req.targetPubkey.SerializeCompressed())
|
||||||
|
|
||||||
// First attempt to locate the target peer to open a channel with, if
|
// First attempt to locate the target peer to open a channel with, if
|
||||||
// we're unable to locate the peer then this request will fail.
|
// we're unable to locate the peer then this request will fail.
|
||||||
var targetPeer *peer
|
s.peersMtx.RLock()
|
||||||
for _, peer := range s.peers { // TODO(roasbeef): threadsafe api
|
if peer, ok := s.peersByID[req.targetPeerID]; ok {
|
||||||
// We found the the target
|
targetPeer = peer
|
||||||
if peer.addr.IdentityKey.IsEqual(req.targetPubkey) ||
|
} else if peer, ok := s.peersByPub[pubStr]; ok {
|
||||||
req.targetPeerID == peer.id {
|
targetPeer = peer
|
||||||
targetPeer = peer
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
s.peersMtx.RUnlock()
|
||||||
|
|
||||||
if targetPeer == nil {
|
if targetPeer == nil {
|
||||||
req.err <- fmt.Errorf("unable to find peer nodeID(%x), "+
|
req.err <- fmt.Errorf("unable to find peer nodeID(%x), "+
|
||||||
@ -506,46 +672,3 @@ func (s *server) Peers() []*peer {
|
|||||||
|
|
||||||
return <-resp
|
return <-resp
|
||||||
}
|
}
|
||||||
|
|
||||||
// listener is a goroutine dedicated to accepting in coming peer connections
|
|
||||||
// from the passed listener.
|
|
||||||
//
|
|
||||||
// NOTE: This MUST be run as a goroutine.
|
|
||||||
func (s *server) listener(l net.Listener) {
|
|
||||||
srvrLog.Infof("Server listening on %s", l.Addr())
|
|
||||||
for atomic.LoadInt32(&s.shutdown) == 0 {
|
|
||||||
conn, err := l.Accept()
|
|
||||||
if err != nil {
|
|
||||||
// Only log the error message if we aren't currently
|
|
||||||
// shutting down.
|
|
||||||
if atomic.LoadInt32(&s.shutdown) == 0 {
|
|
||||||
srvrLog.Errorf("Can't accept connection: %v", err)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
srvrLog.Tracef("New inbound connection from %v", conn.RemoteAddr())
|
|
||||||
|
|
||||||
brontideConn := conn.(*brontide.Conn)
|
|
||||||
peerAddr := &lnwire.NetAddress{
|
|
||||||
IdentityKey: brontideConn.RemotePub(),
|
|
||||||
Address: conn.RemoteAddr().(*net.TCPAddr),
|
|
||||||
ChainNet: activeNetParams.Net,
|
|
||||||
}
|
|
||||||
|
|
||||||
peer, err := newPeer(conn, s, peerAddr, true)
|
|
||||||
if err != nil {
|
|
||||||
srvrLog.Errorf("unable to create peer: %v", err)
|
|
||||||
conn.Close()
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO(roasbeef): update IP address for link-node
|
|
||||||
// * also mark last-seen, do it one single transaction?
|
|
||||||
|
|
||||||
peer.Start()
|
|
||||||
s.newPeers <- peer
|
|
||||||
}
|
|
||||||
|
|
||||||
s.wg.Done()
|
|
||||||
}
|
|
||||||
|
Loading…
Reference in New Issue
Block a user