fcff17c336
This commit will allow the general public to build lnd without jumping through hoops setting up their local git branches nicely with all of our forks.
281 lines
5.6 KiB
Go
281 lines
5.6 KiB
Go
package main
|
|
|
|
import (
|
|
"encoding/hex"
|
|
"fmt"
|
|
"net"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"github.com/lightningnetwork/lnd/channeldb"
|
|
"github.com/lightningnetwork/lnd/lndc"
|
|
"github.com/lightningnetwork/lnd/lnwallet"
|
|
"github.com/roasbeef/btcd/btcec"
|
|
|
|
"github.com/roasbeef/btcwallet/waddrmgr"
|
|
)
|
|
|
|
// server...
|
|
type server struct {
|
|
started int32 // atomic
|
|
shutdown int32 // atomic
|
|
|
|
longTermPriv *btcec.PrivateKey
|
|
|
|
listeners []net.Listener
|
|
peers map[int32]*peer
|
|
|
|
rpcServer *rpcServer
|
|
lnwallet *lnwallet.LightningWallet
|
|
chanDB *channeldb.DB
|
|
|
|
newPeers chan *peer
|
|
donePeers chan *peer
|
|
queries chan interface{}
|
|
|
|
wg sync.WaitGroup
|
|
quit chan struct{}
|
|
}
|
|
|
|
// newServer...
|
|
func newServer(listenAddrs []string, wallet *lnwallet.LightningWallet,
|
|
chanDB *channeldb.DB) (*server, error) {
|
|
|
|
privKey, err := getIdentityPrivKey(chanDB, wallet)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
listeners := make([]net.Listener, len(listenAddrs))
|
|
for i, addr := range listenAddrs {
|
|
listeners[i], err = lndc.NewListener(privKey, addr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
s := &server{
|
|
chanDB: chanDB,
|
|
longTermPriv: privKey,
|
|
listeners: listeners,
|
|
peers: make(map[int32]*peer),
|
|
newPeers: make(chan *peer, 100),
|
|
donePeers: make(chan *peer, 100),
|
|
lnwallet: wallet,
|
|
queries: make(chan interface{}),
|
|
quit: make(chan struct{}),
|
|
}
|
|
|
|
s.rpcServer = newRpcServer(s)
|
|
|
|
return s, nil
|
|
}
|
|
|
|
// addPeer...
|
|
func (s *server) addPeer(p *peer) {
|
|
if p == nil {
|
|
return
|
|
}
|
|
|
|
// Ignore new peers if we're shutting down.
|
|
if atomic.LoadInt32(&s.shutdown) != 0 {
|
|
p.Stop()
|
|
return
|
|
}
|
|
|
|
s.peers[p.peerId] = p
|
|
}
|
|
|
|
// removePeer...
|
|
func (s *server) removePeer(p *peer) {
|
|
}
|
|
|
|
// peerManager...
|
|
func (s *server) peerManager() {
|
|
out:
|
|
for {
|
|
select {
|
|
// New peers.
|
|
case p := <-s.newPeers:
|
|
s.addPeer(p)
|
|
// Finished peers.
|
|
case p := <-s.donePeers:
|
|
s.removePeer(p)
|
|
case <-s.quit:
|
|
break out
|
|
}
|
|
}
|
|
s.wg.Done()
|
|
}
|
|
|
|
// connectPeerMsg...
|
|
type connectPeerMsg struct {
|
|
addr *lndc.LNAdr
|
|
reply chan error
|
|
}
|
|
|
|
// queryHandler...
|
|
func (s *server) queryHandler() {
|
|
out:
|
|
for {
|
|
select {
|
|
case query := <-s.queries:
|
|
switch msg := query.(type) {
|
|
case *connectPeerMsg:
|
|
addr := msg.addr
|
|
|
|
// Ensure we're not already connected to this
|
|
// peer.
|
|
for _, peer := range s.peers {
|
|
if peer.lightningAddr.String() ==
|
|
addr.String() {
|
|
msg.reply <- fmt.Errorf(
|
|
"already connected to peer: %v",
|
|
peer.lightningAddr,
|
|
)
|
|
}
|
|
}
|
|
|
|
// Launch a goroutine to connect to the requested
|
|
// peer so we can continue to handle queries.
|
|
go func() {
|
|
// For the lndc crypto handshake, we
|
|
// either need a compressed pubkey, or a
|
|
// 20-byte pkh.
|
|
var remoteId []byte
|
|
if addr.PubKey == nil {
|
|
remoteId = addr.Base58Adr.ScriptAddress()
|
|
} else {
|
|
remoteId = addr.PubKey.SerializeCompressed()
|
|
}
|
|
|
|
// Attempt to connect to the remote
|
|
// node. If the we can't make the
|
|
// connection, or the crypto negotation
|
|
// breaks down, then return an error to the
|
|
// caller.
|
|
ipAddr := addr.NetAddr.String()
|
|
conn := lndc.NewConn(nil)
|
|
if err := conn.Dial(
|
|
s.longTermPriv, ipAddr, remoteId); err != nil {
|
|
msg.reply <- err
|
|
}
|
|
|
|
// Now that we've established a connection,
|
|
// create a peer, and it to the set of
|
|
// currently active peers.
|
|
peer := newPeer(conn, s)
|
|
s.newPeers <- peer
|
|
|
|
msg.reply <- nil
|
|
}()
|
|
}
|
|
case <-s.quit:
|
|
break out
|
|
}
|
|
}
|
|
|
|
s.wg.Done()
|
|
}
|
|
|
|
// ConnectToPeer...
|
|
func (s *server) ConnectToPeer(addr *lndc.LNAdr) error {
|
|
reply := make(chan error, 1)
|
|
|
|
s.queries <- &connectPeerMsg{addr, reply}
|
|
|
|
return <-reply
|
|
}
|
|
|
|
// AddPeer...
|
|
func (s *server) AddPeer(p *peer) {
|
|
s.newPeers <- p
|
|
}
|
|
|
|
// listener...
|
|
func (s *server) listener(l net.Listener) {
|
|
srvrLog.Infof("Server listening on %s", l.Addr())
|
|
for atomic.LoadInt32(&s.shutdown) == 0 {
|
|
conn, err := l.Accept()
|
|
if err != nil {
|
|
// Only log the error message if we aren't currently
|
|
// shutting down.
|
|
if atomic.LoadInt32(&s.shutdown) == 0 {
|
|
srvrLog.Errorf("Can't accept connection: %v", err)
|
|
}
|
|
continue
|
|
}
|
|
|
|
srvrLog.Tracef("New inbound connection from %v", conn.RemoteAddr())
|
|
peer := newPeer(conn, s)
|
|
peer.Start()
|
|
}
|
|
|
|
s.wg.Done()
|
|
}
|
|
|
|
// Start...
|
|
func (s *server) Start() {
|
|
// Already running?
|
|
if atomic.AddInt32(&s.started, 1) != 1 {
|
|
return
|
|
}
|
|
|
|
// Start all the listeners.
|
|
for _, l := range s.listeners {
|
|
s.wg.Add(1)
|
|
go s.listener(l)
|
|
}
|
|
|
|
s.wg.Add(2)
|
|
go s.peerManager()
|
|
go s.queryHandler()
|
|
}
|
|
|
|
// Stop...
|
|
func (s *server) Stop() error {
|
|
// Bail if we're already shutting down.
|
|
if atomic.AddInt32(&s.shutdown, 1) != 1 {
|
|
return nil
|
|
}
|
|
|
|
// Stop all the listeners.
|
|
for _, listener := range s.listeners {
|
|
if err := listener.Close(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
s.rpcServer.Stop()
|
|
s.lnwallet.Shutdown()
|
|
|
|
// Signal all the lingering goroutines to quit.
|
|
close(s.quit)
|
|
return nil
|
|
}
|
|
|
|
// getIdentityPrivKey gets the identity private key out of the wallet DB.
|
|
func getIdentityPrivKey(c *channeldb.DB, w *lnwallet.LightningWallet) (*btcec.PrivateKey, error) {
|
|
adr, err := c.GetIdAdr()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ltndLog.Infof("got ID address: %s", adr.String())
|
|
adr2, err := w.Manager.Address(adr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ltndLog.Infof("pubkey: %v", hex.EncodeToString(adr2.(waddrmgr.ManagedPubKeyAddress).PubKey().SerializeCompressed()))
|
|
priv, err := adr2.(waddrmgr.ManagedPubKeyAddress).PrivKey()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return priv, nil
|
|
}
|
|
|
|
// WaitForShutdown blocks all goroutines have been stopped.
|
|
func (s *server) WaitForShutdown() {
|
|
s.wg.Wait()
|
|
}
|