lnd: add new "perm" bit to the ConnectPeer RPC call

This commit modifies the ConnectPeer RPC call and partitions the
behavior of the call into two scenarios: the connection should be
persistent which causes the call to be non-blocking, and the connection
should only attempt to connect once — which causes the call to be
blocking and report any error back to the caller.

As a result, the pendingConnRequest map and the logic around it is no
longer needed.
This commit is contained in:
Olaoluwa Osuntokun 2017-01-09 19:08:52 -08:00
parent b01e7efcef
commit 8cdb84c619
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
3 changed files with 42 additions and 59 deletions

View File

@ -263,6 +263,8 @@ func (p *peer) Start() error {
return nil return nil
} }
atomic.AddInt32(&p.connected, 1)
peerLog.Tracef("peer %v starting", p) peerLog.Tracef("peer %v starting", p)
p.wg.Add(5) p.wg.Add(5)

View File

@ -215,15 +215,13 @@ func (r *rpcServer) ConnectPeer(ctx context.Context,
ChainNet: activeNetParams.Net, ChainNet: activeNetParams.Net,
} }
peerID, err := r.server.ConnectToPeer(peerAddr) if err := r.server.ConnectToPeer(peerAddr, in.Perm); err != nil {
if err != nil {
rpcsLog.Errorf("(connectpeer): error connecting to peer: %v", err) rpcsLog.Errorf("(connectpeer): error connecting to peer: %v", err)
return nil, err return nil, err
} }
// TODO(roasbeef): add pubkey return
rpcsLog.Debugf("Connected to peer: %v", peerAddr.String()) rpcsLog.Debugf("Connected to peer: %v", peerAddr.String())
return &lnrpc.ConnectPeerResponse{peerID}, nil return &lnrpc.ConnectPeerResponse{}, nil
} }
// OpenChannel attempts to open a singly funded channel specified in the // OpenChannel attempts to open a singly funded channel specified in the

View File

@ -66,9 +66,8 @@ type server struct {
connMgr *connmgr.ConnManager connMgr *connmgr.ConnManager
pendingConnMtx sync.RWMutex pendingConnMtx sync.RWMutex
persistentConnReqs map[string]*connmgr.ConnReq persistentConnReqs map[string]*connmgr.ConnReq
pendingConnRequests map[string]*connectPeerMsg
broadcastRequests chan *broadcastReq broadcastRequests chan *broadcastReq
sendRequests chan *sendReq sendRequests chan *sendReq
@ -118,8 +117,7 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier,
sphinx: sphinx.NewRouter(privKey, activeNetParams.Params), sphinx: sphinx.NewRouter(privKey, activeNetParams.Params),
lightningID: fastsha256.Sum256(serializedPubKey), lightningID: fastsha256.Sum256(serializedPubKey),
pendingConnRequests: make(map[string]*connectPeerMsg), persistentConnReqs: make(map[string]*connmgr.ConnReq),
persistentConnReqs: make(map[string]*connmgr.ConnReq),
peersByID: make(map[int32]*peer), peersByID: make(map[int32]*peer),
peersByPub: make(map[string]*peer), peersByPub: make(map[string]*peer),
@ -224,11 +222,6 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier,
} }
s.persistentConnReqs[pubStr] = connReq s.persistentConnReqs[pubStr] = connReq
go s.connMgr.Connect(connReq) go s.connMgr.Connect(connReq)
s.pendingConnRequests[pubStr] = &connectPeerMsg{
resp: make(chan int32, 1),
err: make(chan error, 1),
}
} }
return s, nil return s, nil
@ -411,22 +404,6 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, inbound
peer.Start() peer.Start()
s.newPeers <- peer s.newPeers <- peer
// If this was an RPC initiated outbound connection that was
// successfully established, then send a response back to the client so
// they won't be blocked indefinitely.
pubStr := string(peerAddr.IdentityKey.SerializeCompressed())
s.pendingConnMtx.RLock()
msg, ok := s.pendingConnRequests[pubStr]
s.pendingConnMtx.RUnlock()
if ok {
msg.resp <- peer.id
msg.err <- nil
s.pendingConnMtx.Lock()
delete(s.pendingConnRequests, pubStr)
s.pendingConnMtx.Unlock()
}
} }
// inboundPeerConnected initializes a new peer in response to a new inbound // inboundPeerConnected initializes a new peer in response to a new inbound
@ -541,9 +518,10 @@ func (s *server) removePeer(p *peer) {
// particular peer. This message also houses an error channel which will be // particular peer. This message also houses an error channel which will be
// used to report success/failure. // used to report success/failure.
type connectPeerMsg struct { type connectPeerMsg struct {
addr *lnwire.NetAddress addr *lnwire.NetAddress
resp chan int32 persistent bool
err chan error
err chan error
} }
// listPeersMsg is a message sent to the server in order to obtain a listing // listPeersMsg is a message sent to the server in order to obtain a listing
@ -686,7 +664,6 @@ func (s *server) handleConnectPeer(msg *connectPeerMsg) {
if ok { if ok {
s.peersMtx.RUnlock() s.peersMtx.RUnlock()
msg.err <- fmt.Errorf("already connected to peer: %v", peer) msg.err <- fmt.Errorf("already connected to peer: %v", peer)
msg.resp <- -1
return return
} }
s.peersMtx.RUnlock() s.peersMtx.RUnlock()
@ -695,18 +672,10 @@ func (s *server) handleConnectPeer(msg *connectPeerMsg) {
// then we ignore this request to ensure we don't create a redundant // then we ignore this request to ensure we don't create a redundant
// connection. // connection.
s.pendingConnMtx.RLock() s.pendingConnMtx.RLock()
if _, ok := s.pendingConnRequests[targetPub]; ok {
s.pendingConnMtx.RUnlock()
msg.err <- fmt.Errorf("connection attempt to %v is pending",
addr)
msg.resp <- -1
return
}
if _, ok := s.persistentConnReqs[targetPub]; ok { if _, ok := s.persistentConnReqs[targetPub]; ok {
s.pendingConnMtx.RUnlock() s.pendingConnMtx.RUnlock()
msg.err <- fmt.Errorf("connection attempt to %v is pending", msg.err <- fmt.Errorf("connection attempt to %v is pending",
addr) addr)
msg.resp <- -1
return return
} }
s.pendingConnMtx.RUnlock() s.pendingConnMtx.RUnlock()
@ -715,20 +684,29 @@ func (s *server) handleConnectPeer(msg *connectPeerMsg) {
// then instruct the connection manager to attempt to establish a // then instruct the connection manager to attempt to establish a
// persistent connection to the peer. // persistent connection to the peer.
srvrLog.Debugf("Connecting to %v", addr) srvrLog.Debugf("Connecting to %v", addr)
go s.connMgr.Connect(&connmgr.ConnReq{ if msg.persistent {
Addr: addr, go s.connMgr.Connect(&connmgr.ConnReq{
Permanent: true, Addr: addr,
}) Permanent: true,
})
} else {
// If we're not making a persistent connection, then we'll
// attempt to connect o the target peer, returning an error
// which indicates success of failure.
go func() {
// Attempt to connect to the remote node. If the we
// can't make the connection, or the crypto negotiation
// breaks down, then return an error to the caller.
conn, err := brontide.Dial(s.identityPriv, addr)
if err != nil {
msg.err <- err
return
}
// TODO(roasbeef): create goroutine to poll state so can report s.outboundPeerConnected(nil, conn)
// connection fails msg.err <- nil
}()
// Finally, we store the original request keyed by the public key so we }
// can dispatch the response to the RPC client once a connection has
// been initiated.
s.pendingConnMtx.Lock()
s.pendingConnRequests[targetPub] = msg
s.pendingConnMtx.Unlock()
} }
// handleOpenChanReq first locates the target peer, and if found hands off the // handleOpenChanReq first locates the target peer, and if found hands off the
@ -767,13 +745,18 @@ func (s *server) handleOpenChanReq(req *openChanReq) {
// ConnectToPeer requests that the server connect to a Lightning Network peer // ConnectToPeer requests that the server connect to a Lightning Network peer
// at the specified address. This function will *block* until either a // at the specified address. This function will *block* until either a
// connection is established, or the initial handshake process fails. // connection is established, or the initial handshake process fails.
func (s *server) ConnectToPeer(addr *lnwire.NetAddress) (int32, error) { func (s *server) ConnectToPeer(addr *lnwire.NetAddress,
reply := make(chan int32, 1) perm bool) error {
errChan := make(chan error, 1) errChan := make(chan error, 1)
s.queries <- &connectPeerMsg{addr, reply, errChan} s.queries <- &connectPeerMsg{
addr: addr,
persistent: perm,
err: errChan,
}
return <-reply, <-errChan return <-errChan
} }
// OpenChannel sends a request to the server to open a channel to the specified // OpenChannel sends a request to the server to open a channel to the specified