From 8cdb84c6194642f089ea0a22fa3be48bdc0e6a7c Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Mon, 9 Jan 2017 19:08:52 -0800 Subject: [PATCH] lnd: add new "perm" bit to the ConnectPeer RPC call MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit modifies the ConnectPeer RPC call and partitions the behavior of the call into two scenarios: the connection should be persistent which causes the call to be non-blocking, and the connection should only attempt to connect once — which causes the call to be blocking and report any error back to the caller. As a result, the pendingConnRequest map and the logic around it is no longer needed. --- peer.go | 2 ++ rpcserver.go | 6 ++-- server.go | 93 +++++++++++++++++++++------------------------------- 3 files changed, 42 insertions(+), 59 deletions(-) diff --git a/peer.go b/peer.go index a5394426..fcee224a 100644 --- a/peer.go +++ b/peer.go @@ -263,6 +263,8 @@ func (p *peer) Start() error { return nil } + atomic.AddInt32(&p.connected, 1) + peerLog.Tracef("peer %v starting", p) p.wg.Add(5) diff --git a/rpcserver.go b/rpcserver.go index 86c09300..3a755698 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -215,15 +215,13 @@ func (r *rpcServer) ConnectPeer(ctx context.Context, ChainNet: activeNetParams.Net, } - peerID, err := r.server.ConnectToPeer(peerAddr) - if err != nil { + if err := r.server.ConnectToPeer(peerAddr, in.Perm); err != nil { rpcsLog.Errorf("(connectpeer): error connecting to peer: %v", err) return nil, err } - // TODO(roasbeef): add pubkey return rpcsLog.Debugf("Connected to peer: %v", peerAddr.String()) - return &lnrpc.ConnectPeerResponse{peerID}, nil + return &lnrpc.ConnectPeerResponse{}, nil } // OpenChannel attempts to open a singly funded channel specified in the diff --git a/server.go b/server.go index 6758d472..78b7512b 100644 --- a/server.go +++ b/server.go @@ -66,9 +66,8 @@ type server struct { connMgr *connmgr.ConnManager - pendingConnMtx sync.RWMutex - persistentConnReqs map[string]*connmgr.ConnReq - pendingConnRequests map[string]*connectPeerMsg + pendingConnMtx sync.RWMutex + persistentConnReqs map[string]*connmgr.ConnReq broadcastRequests chan *broadcastReq sendRequests chan *sendReq @@ -118,8 +117,7 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier, sphinx: sphinx.NewRouter(privKey, activeNetParams.Params), lightningID: fastsha256.Sum256(serializedPubKey), - pendingConnRequests: make(map[string]*connectPeerMsg), - persistentConnReqs: make(map[string]*connmgr.ConnReq), + persistentConnReqs: make(map[string]*connmgr.ConnReq), peersByID: make(map[int32]*peer), peersByPub: make(map[string]*peer), @@ -224,11 +222,6 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier, } s.persistentConnReqs[pubStr] = connReq go s.connMgr.Connect(connReq) - - s.pendingConnRequests[pubStr] = &connectPeerMsg{ - resp: make(chan int32, 1), - err: make(chan error, 1), - } } return s, nil @@ -411,22 +404,6 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, inbound peer.Start() s.newPeers <- peer - - // If this was an RPC initiated outbound connection that was - // successfully established, then send a response back to the client so - // they won't be blocked indefinitely. - pubStr := string(peerAddr.IdentityKey.SerializeCompressed()) - s.pendingConnMtx.RLock() - msg, ok := s.pendingConnRequests[pubStr] - s.pendingConnMtx.RUnlock() - if ok { - msg.resp <- peer.id - msg.err <- nil - - s.pendingConnMtx.Lock() - delete(s.pendingConnRequests, pubStr) - s.pendingConnMtx.Unlock() - } } // inboundPeerConnected initializes a new peer in response to a new inbound @@ -541,9 +518,10 @@ func (s *server) removePeer(p *peer) { // particular peer. This message also houses an error channel which will be // used to report success/failure. type connectPeerMsg struct { - addr *lnwire.NetAddress - resp chan int32 - err chan error + addr *lnwire.NetAddress + persistent bool + + err chan error } // listPeersMsg is a message sent to the server in order to obtain a listing @@ -686,7 +664,6 @@ func (s *server) handleConnectPeer(msg *connectPeerMsg) { if ok { s.peersMtx.RUnlock() msg.err <- fmt.Errorf("already connected to peer: %v", peer) - msg.resp <- -1 return } s.peersMtx.RUnlock() @@ -695,18 +672,10 @@ func (s *server) handleConnectPeer(msg *connectPeerMsg) { // then we ignore this request to ensure we don't create a redundant // connection. s.pendingConnMtx.RLock() - if _, ok := s.pendingConnRequests[targetPub]; ok { - s.pendingConnMtx.RUnlock() - msg.err <- fmt.Errorf("connection attempt to %v is pending", - addr) - msg.resp <- -1 - return - } if _, ok := s.persistentConnReqs[targetPub]; ok { s.pendingConnMtx.RUnlock() msg.err <- fmt.Errorf("connection attempt to %v is pending", addr) - msg.resp <- -1 return } s.pendingConnMtx.RUnlock() @@ -715,20 +684,29 @@ func (s *server) handleConnectPeer(msg *connectPeerMsg) { // then instruct the connection manager to attempt to establish a // persistent connection to the peer. srvrLog.Debugf("Connecting to %v", addr) - go s.connMgr.Connect(&connmgr.ConnReq{ - Addr: addr, - Permanent: true, - }) + if msg.persistent { + go s.connMgr.Connect(&connmgr.ConnReq{ + Addr: addr, + Permanent: true, + }) + } else { + // If we're not making a persistent connection, then we'll + // attempt to connect o the target peer, returning an error + // which indicates success of failure. + go func() { + // Attempt to connect to the remote node. If the we + // can't make the connection, or the crypto negotiation + // breaks down, then return an error to the caller. + conn, err := brontide.Dial(s.identityPriv, addr) + if err != nil { + msg.err <- err + return + } - // TODO(roasbeef): create goroutine to poll state so can report - // connection fails - - // Finally, we store the original request keyed by the public key so we - // can dispatch the response to the RPC client once a connection has - // been initiated. - s.pendingConnMtx.Lock() - s.pendingConnRequests[targetPub] = msg - s.pendingConnMtx.Unlock() + s.outboundPeerConnected(nil, conn) + msg.err <- nil + }() + } } // handleOpenChanReq first locates the target peer, and if found hands off the @@ -767,13 +745,18 @@ func (s *server) handleOpenChanReq(req *openChanReq) { // ConnectToPeer requests that the server connect to a Lightning Network peer // at the specified address. This function will *block* until either a // connection is established, or the initial handshake process fails. -func (s *server) ConnectToPeer(addr *lnwire.NetAddress) (int32, error) { - reply := make(chan int32, 1) +func (s *server) ConnectToPeer(addr *lnwire.NetAddress, + perm bool) error { + errChan := make(chan error, 1) - s.queries <- &connectPeerMsg{addr, reply, errChan} + s.queries <- &connectPeerMsg{ + addr: addr, + persistent: perm, + err: errChan, + } - return <-reply, <-errChan + return <-errChan } // OpenChannel sends a request to the server to open a channel to the specified