diff --git a/server.go b/server.go index 90befcb9..206ae226 100644 --- a/server.go +++ b/server.go @@ -73,6 +73,17 @@ func newServer(listenAddrs []string, bitcoinNet *chaincfg.Params, // addPeer... func (s *server) addPeer(p *peer) { + if p == nil { + return + } + + // Ignore new peers if we're shutting down. + if atomic.LoadInt32(&s.shutdown) != 0 { + p.Stop() + return + } + + s.peers[p.peerId] = p } // removePeer... @@ -97,11 +108,67 @@ out: s.wg.Done() } +// connectPeerMsg... +type connectPeerMsg struct { + addr *lnAddr + reply chan error +} + +// queryHandler... func (s *server) queryHandler() { out: for { select { - // TODO(roasbeef): meta-rpc-stuff + case query := <-s.queries: + switch msg := query.(type) { + case *connectPeerMsg: + addr := msg.addr + + // Ensure we're not already connected to this + // peer. + for _, peer := range s.peers { + if peer.lightningAddr.String() == + addr.String() { + msg.reply <- fmt.Errorf( + "already connected to peer: %v", + peer.lightningAddr, + ) + } + } + + // Launch a goroutine to connect to the requested + // peer so we can continue to handle queries. + go func() { + // For the lndc crypto handshake, we + // either need a compressed pubkey, or a + // 20-byte pkh. + var remoteId []byte + if addr.pubKey == nil { + remoteId = addr.bitcoinAddr.ScriptAddress() + } else { + remoteId = addr.pubKey.SerializeCompressed() + } + + // Attempt to connect to the remote + // node. If the we can't make the + // connection, or the crypto negotation + // breaks down, then return an error to the + // caller. + ipAddr := addr.netAddr.String() + conn := lndc.NewConn(s.longTermPriv, nil) + if err := conn.Dial(ipAddr, remoteId); err != nil { + msg.reply <- err + } + + // Now that we've established a connection, + // create a peer, and it to the set of + // currently active peers. + peer := newPeer(conn, s) + s.newPeers <- peer + + msg.reply <- nil + }() + } case <-s.quit: break out } @@ -110,6 +177,15 @@ out: s.wg.Done() } +// ConnectToPeer... +func (s *server) ConnectToPeer(addr *lnAddr) error { + reply := make(chan error, 1) + + s.queries <- &connectPeerMsg{addr, reply} + + return <-reply +} + // AddPeer... func (s *server) AddPeer(p *peer) { s.newPeers <- p @@ -121,10 +197,12 @@ func (s *server) listener(l net.Listener) { conn, err := l.Accept() if err != nil { // TODO(roasbeef): log + fmt.Println("err: ", err) continue } - // TODO(roasbeef): create new peer, start it's goroutines - fmt.Println(conn) + + peer := newPeer(conn, s) + peer.Start() } s.wg.Done() @@ -143,9 +221,9 @@ func (s *server) Start() { go s.listener(l) } - s.wg.Add(1) + s.wg.Add(2) go s.peerManager() - + go s.queryHandler() } // Stop...