add functionality to server for manually establishing outgoing connections

This commit is contained in:
Olaoluwa Osuntokun 2016-01-16 19:09:02 -08:00
parent 1f1b82fe3f
commit 9a775cf66e

@ -73,6 +73,17 @@ func newServer(listenAddrs []string, bitcoinNet *chaincfg.Params,
// addPeer...
func (s *server) addPeer(p *peer) {
if p == nil {
return
}
// Ignore new peers if we're shutting down.
if atomic.LoadInt32(&s.shutdown) != 0 {
p.Stop()
return
}
s.peers[p.peerId] = p
}
// removePeer...
@ -97,11 +108,67 @@ out:
s.wg.Done()
}
// connectPeerMsg...
type connectPeerMsg struct {
addr *lnAddr
reply chan error
}
// queryHandler...
func (s *server) queryHandler() {
out:
for {
select {
// TODO(roasbeef): meta-rpc-stuff
case query := <-s.queries:
switch msg := query.(type) {
case *connectPeerMsg:
addr := msg.addr
// Ensure we're not already connected to this
// peer.
for _, peer := range s.peers {
if peer.lightningAddr.String() ==
addr.String() {
msg.reply <- fmt.Errorf(
"already connected to peer: %v",
peer.lightningAddr,
)
}
}
// Launch a goroutine to connect to the requested
// peer so we can continue to handle queries.
go func() {
// For the lndc crypto handshake, we
// either need a compressed pubkey, or a
// 20-byte pkh.
var remoteId []byte
if addr.pubKey == nil {
remoteId = addr.bitcoinAddr.ScriptAddress()
} else {
remoteId = addr.pubKey.SerializeCompressed()
}
// Attempt to connect to the remote
// node. If the we can't make the
// connection, or the crypto negotation
// breaks down, then return an error to the
// caller.
ipAddr := addr.netAddr.String()
conn := lndc.NewConn(s.longTermPriv, nil)
if err := conn.Dial(ipAddr, remoteId); err != nil {
msg.reply <- err
}
// Now that we've established a connection,
// create a peer, and it to the set of
// currently active peers.
peer := newPeer(conn, s)
s.newPeers <- peer
msg.reply <- nil
}()
}
case <-s.quit:
break out
}
@ -110,6 +177,15 @@ out:
s.wg.Done()
}
// ConnectToPeer...
func (s *server) ConnectToPeer(addr *lnAddr) error {
reply := make(chan error, 1)
s.queries <- &connectPeerMsg{addr, reply}
return <-reply
}
// AddPeer...
func (s *server) AddPeer(p *peer) {
s.newPeers <- p
@ -121,10 +197,12 @@ func (s *server) listener(l net.Listener) {
conn, err := l.Accept()
if err != nil {
// TODO(roasbeef): log
fmt.Println("err: ", err)
continue
}
// TODO(roasbeef): create new peer, start it's goroutines
fmt.Println(conn)
peer := newPeer(conn, s)
peer.Start()
}
s.wg.Done()
@ -143,9 +221,9 @@ func (s *server) Start() {
go s.listener(l)
}
s.wg.Add(1)
s.wg.Add(2)
go s.peerManager()
go s.queryHandler()
}
// Stop...