lnd: implement connectpeer and listpeers RPC calls
This commit is contained in:
parent
c7e9bb8c58
commit
9ef7e23384
@ -7,13 +7,27 @@ service Lightning {
|
|||||||
rpc NewAddress(NewAddressRequest) returns (NewAddressResponse);
|
rpc NewAddress(NewAddressRequest) returns (NewAddressResponse);
|
||||||
|
|
||||||
rpc ConnectPeer(ConnectPeerRequest) returns (ConnectPeerResponse);
|
rpc ConnectPeer(ConnectPeerRequest) returns (ConnectPeerResponse);
|
||||||
|
rpc ListPeers(ListPeersRequest) returns (ListPeersResponse);
|
||||||
|
|
||||||
|
rpc OpenChannel(OpenChannelRequest) returns (OpenChannelResponse);
|
||||||
|
rpc CloseChannel(CloseChannelRequest) returns (CloseChannelResponse);
|
||||||
|
|
||||||
rpc WalletBalance(WalletBalanceRequest) returns (WalletBalanceResponse);
|
rpc WalletBalance(WalletBalanceRequest) returns (WalletBalanceResponse);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message ChannelPoint {
|
||||||
|
bytes funding_txid = 1;
|
||||||
|
uint32 output_index = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message LightningAddress {
|
||||||
|
string pubKeyHash = 1;
|
||||||
|
string host = 2;
|
||||||
|
}
|
||||||
|
|
||||||
message SendManyRequest {
|
message SendManyRequest {
|
||||||
map<string, int64> AddrToAmount = 1;
|
map<string, int64> AddrToAmount = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
message SendManyResponse {
|
message SendManyResponse {
|
||||||
string txid = 1;
|
string txid = 1;
|
||||||
}
|
}
|
||||||
@ -32,11 +46,57 @@ message NewAddressResponse {
|
|||||||
}
|
}
|
||||||
|
|
||||||
message ConnectPeerRequest {
|
message ConnectPeerRequest {
|
||||||
string idAtHost = 1;
|
LightningAddress addr = 1;
|
||||||
|
}
|
||||||
|
message ConnectPeerResponse {
|
||||||
|
int32 peer_id = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
message ConnectPeerResponse {
|
message HTLC {
|
||||||
bytes lnID = 1;
|
int64 id = 1;
|
||||||
|
|
||||||
|
int64 amount = 2;
|
||||||
|
|
||||||
|
bytes hash_lock = 3;
|
||||||
|
|
||||||
|
bool to_us = 4;
|
||||||
|
}
|
||||||
|
|
||||||
|
message ActiveChannel {
|
||||||
|
bytes funding_txid = 1;
|
||||||
|
|
||||||
|
int64 capacity = 2;
|
||||||
|
int64 local_balance = 3;
|
||||||
|
int64 remote_balance = 4;
|
||||||
|
|
||||||
|
int64 unsettled_belance = 5;
|
||||||
|
repeated HTLC pending_htlcs = 6;
|
||||||
|
|
||||||
|
int64 num_updates = 7;
|
||||||
|
// TODO(roasbeef): other stuffs
|
||||||
|
}
|
||||||
|
|
||||||
|
message Peer {
|
||||||
|
string lightning_id = 1;
|
||||||
|
int32 peer_id = 2;
|
||||||
|
string address = 3;
|
||||||
|
|
||||||
|
uint64 bytes_sent = 4;
|
||||||
|
uint64 bytes_recv = 5;
|
||||||
|
|
||||||
|
int64 sat_sent = 6;
|
||||||
|
int64 sat_recv = 7;
|
||||||
|
|
||||||
|
bool inbound = 8;
|
||||||
|
|
||||||
|
// TODO(roasbeef): add pending channels
|
||||||
|
repeated ActiveChannel channels = 9;
|
||||||
|
}
|
||||||
|
|
||||||
|
message ListPeersRequest {}
|
||||||
|
message ListPeersResponse {
|
||||||
|
repeated Peer peers = 1;
|
||||||
|
}
|
||||||
message WalletBalanceRequest {
|
message WalletBalanceRequest {
|
||||||
bool witness_only = 1;
|
bool witness_only = 1;
|
||||||
}
|
}
|
||||||
|
75
rpcserver.go
75
rpcserver.go
@ -1,6 +1,7 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"sync"
|
"sync"
|
||||||
@ -19,7 +20,7 @@ var (
|
|||||||
defaultAccount uint32 = waddrmgr.DefaultAccountNum
|
defaultAccount uint32 = waddrmgr.DefaultAccountNum
|
||||||
)
|
)
|
||||||
|
|
||||||
// rpcServer...
|
// rpcServer is a gRPC, RPC front end to the lnd daemon.
|
||||||
type rpcServer struct {
|
type rpcServer struct {
|
||||||
started int32 // To be used atomically.
|
started int32 // To be used atomically.
|
||||||
shutdown int32 // To be used atomically.
|
shutdown int32 // To be used atomically.
|
||||||
@ -31,14 +32,17 @@ type rpcServer struct {
|
|||||||
quit chan struct{}
|
quit chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// A compile time check to ensure that rpcServer fully implements the
|
||||||
|
// LightningServer gRPC service.
|
||||||
var _ lnrpc.LightningServer = (*rpcServer)(nil)
|
var _ lnrpc.LightningServer = (*rpcServer)(nil)
|
||||||
|
|
||||||
// newRpcServer...
|
// newRpcServer creates and returns a new instance of the rpcServer.
|
||||||
func newRpcServer(s *server) *rpcServer {
|
func newRpcServer(s *server) *rpcServer {
|
||||||
return &rpcServer{server: s, quit: make(chan struct{}, 1)}
|
return &rpcServer{server: s, quit: make(chan struct{}, 1)}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start...
|
// Start launches any helper goroutines required for the rpcServer
|
||||||
|
// to function.
|
||||||
func (r *rpcServer) Start() error {
|
func (r *rpcServer) Start() error {
|
||||||
if atomic.AddInt32(&r.started, 1) != 1 {
|
if atomic.AddInt32(&r.started, 1) != 1 {
|
||||||
return nil
|
return nil
|
||||||
@ -47,7 +51,7 @@ func (r *rpcServer) Start() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop...
|
// Stop signals any active goroutines for a graceful closure.
|
||||||
func (r *rpcServer) Stop() error {
|
func (r *rpcServer) Stop() error {
|
||||||
if atomic.AddInt32(&r.shutdown, 1) != 1 {
|
if atomic.AddInt32(&r.shutdown, 1) != 1 {
|
||||||
return nil
|
return nil
|
||||||
@ -58,8 +62,10 @@ func (r *rpcServer) Stop() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendMany...
|
// SendMany handles a request for a transaction create multiple specified
|
||||||
func (r *rpcServer) SendMany(ctx context.Context, in *lnrpc.SendManyRequest) (*lnrpc.SendManyResponse, error) {
|
// outputs in parallel.
|
||||||
|
func (r *rpcServer) SendMany(ctx context.Context,
|
||||||
|
in *lnrpc.SendManyRequest) (*lnrpc.SendManyResponse, error) {
|
||||||
|
|
||||||
outputs := make([]*wire.TxOut, 0, len(in.AddrToAmount))
|
outputs := make([]*wire.TxOut, 0, len(in.AddrToAmount))
|
||||||
for addr, amt := range in.AddrToAmount {
|
for addr, amt := range in.AddrToAmount {
|
||||||
@ -76,6 +82,8 @@ func (r *rpcServer) SendMany(ctx context.Context, in *lnrpc.SendManyRequest) (*l
|
|||||||
outputs = append(outputs, wire.NewTxOut(amt, pkscript))
|
outputs = append(outputs, wire.NewTxOut(amt, pkscript))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Instruct the wallet to create an transaction paying to the specified
|
||||||
|
// outputs, selecting any coins with at least one confirmation.
|
||||||
txid, err := r.server.lnwallet.SendOutputs(outputs, defaultAccount, 1)
|
txid, err := r.server.lnwallet.SendOutputs(outputs, defaultAccount, 1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -86,7 +94,7 @@ func (r *rpcServer) SendMany(ctx context.Context, in *lnrpc.SendManyRequest) (*l
|
|||||||
return &lnrpc.SendManyResponse{Txid: txid.String()}, nil
|
return &lnrpc.SendManyResponse{Txid: txid.String()}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewAddress...
|
// NewAddress creates a new address under control of the local wallet.
|
||||||
func (r *rpcServer) NewAddress(ctx context.Context,
|
func (r *rpcServer) NewAddress(ctx context.Context,
|
||||||
in *lnrpc.NewAddressRequest) (*lnrpc.NewAddressResponse, error) {
|
in *lnrpc.NewAddressRequest) (*lnrpc.NewAddressResponse, error) {
|
||||||
|
|
||||||
@ -115,15 +123,32 @@ func (r *rpcServer) NewAddress(ctx context.Context,
|
|||||||
return &lnrpc.NewAddressResponse{Address: addr.String()}, nil
|
return &lnrpc.NewAddressResponse{Address: addr.String()}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// LNConnect...
|
// ConnectPeer attempts to establish a connection to a remote peer.
|
||||||
func (r *rpcServer) ConnectPeer(ctx context.Context,
|
func (r *rpcServer) ConnectPeer(ctx context.Context,
|
||||||
in *lnrpc.ConnectPeerRequest) (*lnrpc.ConnectPeerResponse, error) {
|
in *lnrpc.ConnectPeerRequest) (*lnrpc.ConnectPeerResponse, error) {
|
||||||
|
|
||||||
if len(in.IdAtHost) == 0 {
|
if in.Addr == nil {
|
||||||
return nil, fmt.Errorf("need: lnc pubkeyhash@hostname")
|
return nil, fmt.Errorf("need: lnc pubkeyhash@hostname")
|
||||||
}
|
}
|
||||||
|
|
||||||
peerAddr, err := lndc.LnAddrFromString(in.IdAtHost)
|
idAtHost := fmt.Sprintf("%v@%v", in.Addr.PubKeyHash, in.Addr.Host)
|
||||||
|
rpcsLog.Debugf("Attempting to connect to peer %v", idAtHost)
|
||||||
|
|
||||||
|
peerAddr, err := lndc.LnAddrFromString(idAtHost)
|
||||||
|
if err != nil {
|
||||||
|
rpcsLog.Errorf("(connectpeer): error parsing ln addr: %v", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
peerID, err := r.server.ConnectToPeer(peerAddr)
|
||||||
|
if err != nil {
|
||||||
|
rpcsLog.Errorf("(connectpeer): error connecting to peer: %v", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
rpcsLog.Debugf("Connected to peer: %v", peerAddr.String())
|
||||||
|
return &lnrpc.ConnectPeerResponse{peerID}, nil
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -134,6 +159,36 @@ func (r *rpcServer) ConnectPeer(ctx context.Context,
|
|||||||
|
|
||||||
rpcsLog.Infof("Connected to peer: %v", peerAddr.String())
|
rpcsLog.Infof("Connected to peer: %v", peerAddr.String())
|
||||||
return &lnrpc.ConnectPeerResponse{[]byte(peerAddr.String())}, nil
|
return &lnrpc.ConnectPeerResponse{[]byte(peerAddr.String())}, nil
|
||||||
|
// ListPeers returns a verbose listing of all currently active peers.
|
||||||
|
func (r *rpcServer) ListPeers(ctx context.Context,
|
||||||
|
in *lnrpc.ListPeersRequest) (*lnrpc.ListPeersResponse, error) {
|
||||||
|
|
||||||
|
rpcsLog.Tracef("recieved listpeers request")
|
||||||
|
|
||||||
|
serverPeers := r.server.Peers()
|
||||||
|
resp := &lnrpc.ListPeersResponse{
|
||||||
|
Peers: make([]*lnrpc.Peer, 0, len(serverPeers)),
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, serverPeer := range serverPeers {
|
||||||
|
// TODO(roasbeef): add a snapshot method which grabs peer read mtx
|
||||||
|
peer := &lnrpc.Peer{
|
||||||
|
LightningId: hex.EncodeToString(serverPeer.lightningID[:]),
|
||||||
|
PeerId: serverPeer.id,
|
||||||
|
Address: serverPeer.conn.RemoteAddr().String(),
|
||||||
|
Inbound: serverPeer.inbound,
|
||||||
|
BytesRecv: atomic.LoadUint64(&serverPeer.bytesReceived),
|
||||||
|
BytesSent: atomic.LoadUint64(&serverPeer.bytesSent),
|
||||||
|
}
|
||||||
|
|
||||||
|
resp.Peers = append(resp.Peers, peer)
|
||||||
|
}
|
||||||
|
|
||||||
|
rpcsLog.Tracef("listpeers yielded %v peers", serverPeers)
|
||||||
|
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
|
|
||||||
// WalletBalance returns the sum of all confirmed unspent outputs under control
|
// WalletBalance returns the sum of all confirmed unspent outputs under control
|
||||||
// by the wallet. This method can be modified by having the request specify
|
// by the wallet. This method can be modified by having the request specify
|
||||||
// only witness outputs should be factored into the final output sum.
|
// only witness outputs should be factored into the final output sum.
|
||||||
|
300
server.go
300
server.go
@ -11,22 +11,36 @@ import (
|
|||||||
"github.com/lightningnetwork/lnd/lndc"
|
"github.com/lightningnetwork/lnd/lndc"
|
||||||
"github.com/lightningnetwork/lnd/lnwallet"
|
"github.com/lightningnetwork/lnd/lnwallet"
|
||||||
"github.com/roasbeef/btcd/btcec"
|
"github.com/roasbeef/btcd/btcec"
|
||||||
|
"github.com/roasbeef/btcd/wire"
|
||||||
|
"github.com/roasbeef/btcutil"
|
||||||
|
|
||||||
"github.com/roasbeef/btcwallet/waddrmgr"
|
"github.com/roasbeef/btcwallet/waddrmgr"
|
||||||
)
|
)
|
||||||
|
|
||||||
// server...
|
// server is the main server of the Lightning Network Daemon. The server
|
||||||
|
// houses global state pertianing to the wallet, database, and the rpcserver.
|
||||||
|
// Additionally, the server is also used as a central messaging bus to interact
|
||||||
|
// with any of its companion objects.
|
||||||
type server struct {
|
type server struct {
|
||||||
started int32 // atomic
|
started int32 // atomic
|
||||||
shutdown int32 // atomic
|
shutdown int32 // atomic
|
||||||
|
|
||||||
longTermPriv *btcec.PrivateKey
|
// identityPriv is the private key used to authenticate any incoming
|
||||||
|
// connections.
|
||||||
|
identityPriv *btcec.PrivateKey
|
||||||
|
|
||||||
listeners []net.Listener
|
listeners []net.Listener
|
||||||
peers map[int32]*peer
|
peers map[int32]*peer
|
||||||
|
|
||||||
|
chanIndexMtx sync.RWMutex
|
||||||
|
chanIndex map[wire.OutPoint]*peer
|
||||||
|
|
||||||
rpcServer *rpcServer
|
rpcServer *rpcServer
|
||||||
|
// TODO(roasbeef): add chan notifier also
|
||||||
lnwallet *lnwallet.LightningWallet
|
lnwallet *lnwallet.LightningWallet
|
||||||
|
|
||||||
|
// TODO(roasbeef): add to constructor
|
||||||
|
fundingMgr *fundingManager
|
||||||
chanDB *channeldb.DB
|
chanDB *channeldb.DB
|
||||||
|
|
||||||
newPeers chan *peer
|
newPeers chan *peer
|
||||||
@ -37,7 +51,8 @@ type server struct {
|
|||||||
quit chan struct{}
|
quit chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// newServer...
|
// newServer creates a new instance of the server which is to listen using the
|
||||||
|
// passed listener address.
|
||||||
func newServer(listenAddrs []string, wallet *lnwallet.LightningWallet,
|
func newServer(listenAddrs []string, wallet *lnwallet.LightningWallet,
|
||||||
chanDB *channeldb.DB) (*server, error) {
|
chanDB *channeldb.DB) (*server, error) {
|
||||||
|
|
||||||
@ -56,12 +71,14 @@ func newServer(listenAddrs []string, wallet *lnwallet.LightningWallet,
|
|||||||
|
|
||||||
s := &server{
|
s := &server{
|
||||||
chanDB: chanDB,
|
chanDB: chanDB,
|
||||||
longTermPriv: privKey,
|
fundingMgr: newFundingManager(wallet),
|
||||||
|
lnwallet: wallet,
|
||||||
|
identityPriv: privKey,
|
||||||
listeners: listeners,
|
listeners: listeners,
|
||||||
peers: make(map[int32]*peer),
|
peers: make(map[int32]*peer),
|
||||||
|
chanIndex: make(map[wire.OutPoint]*peer),
|
||||||
newPeers: make(chan *peer, 100),
|
newPeers: make(chan *peer, 100),
|
||||||
donePeers: make(chan *peer, 100),
|
donePeers: make(chan *peer, 100),
|
||||||
lnwallet: wallet,
|
|
||||||
queries: make(chan interface{}),
|
queries: make(chan interface{}),
|
||||||
quit: make(chan struct{}),
|
quit: make(chan struct{}),
|
||||||
}
|
}
|
||||||
@ -71,26 +88,65 @@ func newServer(listenAddrs []string, wallet *lnwallet.LightningWallet,
|
|||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// addPeer...
|
// Start starts the main daemon server, all requested listeners, and any helper
|
||||||
func (s *server) addPeer(p *peer) {
|
// goroutines.
|
||||||
if p == nil {
|
func (s *server) Start() {
|
||||||
|
// Already running?
|
||||||
|
if atomic.AddInt32(&s.started, 1) != 1 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ignore new peers if we're shutting down.
|
// Start all the listeners.
|
||||||
if atomic.LoadInt32(&s.shutdown) != 0 {
|
for _, l := range s.listeners {
|
||||||
p.Stop()
|
s.wg.Add(1)
|
||||||
return
|
go s.listener(l)
|
||||||
}
|
}
|
||||||
|
|
||||||
s.peers[p.peerId] = p
|
s.fundingMgr.Start()
|
||||||
|
|
||||||
|
s.wg.Add(2)
|
||||||
|
go s.peerManager()
|
||||||
|
go s.queryHandler()
|
||||||
}
|
}
|
||||||
|
|
||||||
// removePeer...
|
// Stop gracefully shutsdown the main daemon server. This function will signal
|
||||||
func (s *server) removePeer(p *peer) {
|
// any active goroutines, or helper objects to exit, then blocks until they've
|
||||||
|
// all successfully exited. Additionally, any/all listeners are closed.
|
||||||
|
func (s *server) Stop() error {
|
||||||
|
// Bail if we're already shutting down.
|
||||||
|
if atomic.AddInt32(&s.shutdown, 1) != 1 {
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// peerManager...
|
// Stop all the listeners.
|
||||||
|
for _, listener := range s.listeners {
|
||||||
|
if err := listener.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Shutdown the wallet, funding manager, and the rpc server.
|
||||||
|
s.rpcServer.Stop()
|
||||||
|
s.lnwallet.Shutdown()
|
||||||
|
s.fundingMgr.Stop()
|
||||||
|
|
||||||
|
// Signal all the lingering goroutines to quit.
|
||||||
|
close(s.quit)
|
||||||
|
s.wg.Wait()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// WaitForShutdown blocks all goroutines have been stopped.
|
||||||
|
func (s *server) WaitForShutdown() {
|
||||||
|
s.wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
// peerManager handles any requests to modify the server's internal state of
|
||||||
|
// all active peers. Additionally, any queries directed at peers will be
|
||||||
|
// handled by this goroutine.
|
||||||
|
//
|
||||||
|
// NOTE: This MUST be run as a goroutine.
|
||||||
func (s *server) peerManager() {
|
func (s *server) peerManager() {
|
||||||
out:
|
out:
|
||||||
for {
|
for {
|
||||||
@ -108,20 +164,91 @@ out:
|
|||||||
s.wg.Done()
|
s.wg.Done()
|
||||||
}
|
}
|
||||||
|
|
||||||
// connectPeerMsg...
|
// addPeer adds the passed peer to the server's global state of all active
|
||||||
type connectPeerMsg struct {
|
// peers.
|
||||||
addr *lndc.LNAdr
|
func (s *server) addPeer(p *peer) {
|
||||||
reply chan error
|
if p == nil {
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// queryHandler...
|
// Ignore new peers if we're shutting down.
|
||||||
|
if atomic.LoadInt32(&s.shutdown) != 0 {
|
||||||
|
p.Stop()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
s.peers[p.id] = p
|
||||||
|
}
|
||||||
|
|
||||||
|
// removePeer removes the passed peer from the server's state of all active
|
||||||
|
// peers.
|
||||||
|
func (s *server) removePeer(p *peer) {
|
||||||
|
if p == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ignore deleting peers if we're shutting down.
|
||||||
|
if atomic.LoadInt32(&s.shutdown) != 0 {
|
||||||
|
p.Stop()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(s.peers, p.id)
|
||||||
|
}
|
||||||
|
|
||||||
|
// connectPeerMsg is a message requesting the server to open a connection to a
|
||||||
|
// particular peer. This message also houses an error channel which will be
|
||||||
|
// used to report success/failure.
|
||||||
|
type connectPeerMsg struct {
|
||||||
|
addr *lndc.LNAdr
|
||||||
|
resp chan int32
|
||||||
|
err chan error
|
||||||
|
}
|
||||||
|
|
||||||
|
// listPeersMsg is a message sent to the server in order to obtain a listing
|
||||||
|
// of all currently active channels.
|
||||||
|
type listPeersMsg struct {
|
||||||
|
resp chan []*peer
|
||||||
|
}
|
||||||
|
// queryHandler is a a goroutine dedicated to handling an queries or requests
|
||||||
|
// to mutate the server's global state.
|
||||||
|
//
|
||||||
|
// NOTE: This MUST be run as a goroutine.
|
||||||
func (s *server) queryHandler() {
|
func (s *server) queryHandler() {
|
||||||
|
// TODO(roabeef): consolidate with peerManager
|
||||||
out:
|
out:
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case query := <-s.queries:
|
case query := <-s.queries:
|
||||||
|
// TODO(roasbeef): make all goroutines?
|
||||||
switch msg := query.(type) {
|
switch msg := query.(type) {
|
||||||
case *connectPeerMsg:
|
case *connectPeerMsg:
|
||||||
|
s.handleConnectPeer(msg)
|
||||||
|
case *listPeersMsg:
|
||||||
|
s.handleListPeers(msg)
|
||||||
|
}
|
||||||
|
case <-s.quit:
|
||||||
|
break out
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
s.wg.Done()
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleListPeers...
|
||||||
|
func (s *server) handleListPeers(msg *listPeersMsg) {
|
||||||
|
peers := make([]*peer, 0, len(s.peers))
|
||||||
|
for _, peer := range s.peers {
|
||||||
|
peers = append(peers, peer)
|
||||||
|
}
|
||||||
|
|
||||||
|
msg.resp <- peers
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleConnectPeer attempts to establish a connection to the address enclosed
|
||||||
|
// within the passed connectPeerMsg. This function is *async*, a goroutine will
|
||||||
|
// be spawned in order to finish the request, and respond to the caller.
|
||||||
|
func (s *server) handleConnectPeer(msg *connectPeerMsg) {
|
||||||
addr := msg.addr
|
addr := msg.addr
|
||||||
|
|
||||||
// Ensure we're not already connected to this
|
// Ensure we're not already connected to this
|
||||||
@ -129,15 +256,18 @@ out:
|
|||||||
for _, peer := range s.peers {
|
for _, peer := range s.peers {
|
||||||
if peer.lightningAddr.String() ==
|
if peer.lightningAddr.String() ==
|
||||||
addr.String() {
|
addr.String() {
|
||||||
msg.reply <- fmt.Errorf(
|
msg.err <- fmt.Errorf(
|
||||||
"already connected to peer: %v",
|
"already connected to peer: %v",
|
||||||
peer.lightningAddr,
|
peer.lightningAddr,
|
||||||
)
|
)
|
||||||
|
msg.resp <- -1
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Launch a goroutine to connect to the requested
|
// Launch a goroutine to connect to the requested
|
||||||
// peer so we can continue to handle queries.
|
// peer so we can continue to handle queries.
|
||||||
|
// TODO(roasbeef): semaphore to limit the number of goroutines for
|
||||||
|
// async requests.
|
||||||
go func() {
|
go func() {
|
||||||
// For the lndc crypto handshake, we
|
// For the lndc crypto handshake, we
|
||||||
// either need a compressed pubkey, or a
|
// either need a compressed pubkey, or a
|
||||||
@ -149,6 +279,7 @@ out:
|
|||||||
remoteId = addr.PubKey.SerializeCompressed()
|
remoteId = addr.PubKey.SerializeCompressed()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
srvrLog.Debugf("connecting to %v", hex.EncodeToString(remoteId))
|
||||||
// Attempt to connect to the remote
|
// Attempt to connect to the remote
|
||||||
// node. If the we can't make the
|
// node. If the we can't make the
|
||||||
// connection, or the crypto negotation
|
// connection, or the crypto negotation
|
||||||
@ -157,42 +288,60 @@ out:
|
|||||||
ipAddr := addr.NetAddr.String()
|
ipAddr := addr.NetAddr.String()
|
||||||
conn := lndc.NewConn(nil)
|
conn := lndc.NewConn(nil)
|
||||||
if err := conn.Dial(
|
if err := conn.Dial(
|
||||||
s.longTermPriv, ipAddr, remoteId); err != nil {
|
s.identityPriv, ipAddr, remoteId); err != nil {
|
||||||
msg.reply <- err
|
msg.err <- err
|
||||||
|
msg.resp <- -1
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now that we've established a connection,
|
// Now that we've established a connection,
|
||||||
// create a peer, and it to the set of
|
// create a peer, and it to the set of
|
||||||
// currently active peers.
|
// currently active peers.
|
||||||
peer := newPeer(conn, s)
|
peer, err := newPeer(conn, s, activeNetParams.Net, false)
|
||||||
|
if err != nil {
|
||||||
|
srvrLog.Errorf("unable to create peer %v", err)
|
||||||
|
msg.resp <- -1
|
||||||
|
msg.err <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
peer.Start()
|
||||||
s.newPeers <- peer
|
s.newPeers <- peer
|
||||||
|
|
||||||
msg.reply <- nil
|
msg.resp <- peer.id
|
||||||
|
msg.err <- nil
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
case <-s.quit:
|
|
||||||
break out
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
s.wg.Done()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ConnectToPeer...
|
return
|
||||||
func (s *server) ConnectToPeer(addr *lndc.LNAdr) error {
|
|
||||||
reply := make(chan error, 1)
|
|
||||||
|
|
||||||
s.queries <- &connectPeerMsg{addr, reply}
|
|
||||||
|
|
||||||
return <-reply
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddPeer...
|
|
||||||
func (s *server) AddPeer(p *peer) {
|
|
||||||
s.newPeers <- p
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// listener...
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, listener := range s.listeners {
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Peers returns a slice of all active peers.
|
||||||
|
func (s *server) Peers() []*peer {
|
||||||
|
resp := make(chan []*peer)
|
||||||
|
|
||||||
|
s.queries <- &listPeersMsg{resp}
|
||||||
|
|
||||||
|
return <-resp
|
||||||
|
}
|
||||||
|
|
||||||
|
// listener is a goroutine dedicated to accepting in coming peer connections
|
||||||
|
// from the passed listener.
|
||||||
|
//
|
||||||
|
// NOTE: This MUST be run as a goroutine.
|
||||||
func (s *server) listener(l net.Listener) {
|
func (s *server) listener(l net.Listener) {
|
||||||
srvrLog.Infof("Server listening on %s", l.Addr())
|
srvrLog.Infof("Server listening on %s", l.Addr())
|
||||||
for atomic.LoadInt32(&s.shutdown) == 0 {
|
for atomic.LoadInt32(&s.shutdown) == 0 {
|
||||||
@ -207,67 +356,41 @@ func (s *server) listener(l net.Listener) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
srvrLog.Tracef("New inbound connection from %v", conn.RemoteAddr())
|
srvrLog.Tracef("New inbound connection from %v", conn.RemoteAddr())
|
||||||
peer := newPeer(conn, s)
|
peer, err := newPeer(conn, s, activeNetParams.Net, true)
|
||||||
peer.Start()
|
if err != nil {
|
||||||
|
srvrLog.Errorf("unable to create peer: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
peer.Start()
|
||||||
s.newPeers <- peer
|
s.newPeers <- peer
|
||||||
}
|
}
|
||||||
|
|
||||||
s.wg.Done()
|
s.wg.Done()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start...
|
|
||||||
func (s *server) Start() {
|
|
||||||
// Already running?
|
|
||||||
if atomic.AddInt32(&s.started, 1) != 1 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start all the listeners.
|
|
||||||
for _, l := range s.listeners {
|
|
||||||
s.wg.Add(1)
|
|
||||||
go s.listener(l)
|
|
||||||
}
|
|
||||||
|
|
||||||
s.wg.Add(2)
|
|
||||||
go s.peerManager()
|
|
||||||
go s.queryHandler()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stop...
|
|
||||||
func (s *server) Stop() error {
|
|
||||||
// Bail if we're already shutting down.
|
|
||||||
if atomic.AddInt32(&s.shutdown, 1) != 1 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stop all the listeners.
|
|
||||||
for _, listener := range s.listeners {
|
|
||||||
if err := listener.Close(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
s.rpcServer.Stop()
|
|
||||||
s.lnwallet.Shutdown()
|
|
||||||
|
|
||||||
// Signal all the lingering goroutines to quit.
|
|
||||||
close(s.quit)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// getIdentityPrivKey gets the identity private key out of the wallet DB.
|
// getIdentityPrivKey gets the identity private key out of the wallet DB.
|
||||||
func getIdentityPrivKey(c *channeldb.DB, w *lnwallet.LightningWallet) (*btcec.PrivateKey, error) {
|
func getIdentityPrivKey(c *channeldb.DB,
|
||||||
|
w *lnwallet.LightningWallet) (*btcec.PrivateKey, error) {
|
||||||
|
|
||||||
|
// First retrieve the current identity address for this peer.
|
||||||
adr, err := c.GetIdAdr()
|
adr, err := c.GetIdAdr()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
ltndLog.Infof("got ID address: %s", adr.String())
|
|
||||||
|
// Using the ID address, request the private key coresponding to the
|
||||||
|
// address from the wallet's address manager.
|
||||||
adr2, err := w.Manager.Address(adr)
|
adr2, err := w.Manager.Address(adr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
ltndLog.Infof("pubkey: %v", hex.EncodeToString(adr2.(waddrmgr.ManagedPubKeyAddress).PubKey().SerializeCompressed()))
|
|
||||||
|
serializedKey := adr2.(waddrmgr.ManagedPubKeyAddress).PubKey().SerializeCompressed()
|
||||||
|
keyEncoded := hex.EncodeToString(serializedKey)
|
||||||
|
ltndLog.Infof("identity address: %v", adr)
|
||||||
|
ltndLog.Infof("identity pubkey retrieved: %v", keyEncoded)
|
||||||
|
|
||||||
priv, err := adr2.(waddrmgr.ManagedPubKeyAddress).PrivKey()
|
priv, err := adr2.(waddrmgr.ManagedPubKeyAddress).PrivKey()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -275,8 +398,3 @@ func getIdentityPrivKey(c *channeldb.DB, w *lnwallet.LightningWallet) (*btcec.Pr
|
|||||||
|
|
||||||
return priv, nil
|
return priv, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// WaitForShutdown blocks all goroutines have been stopped.
|
|
||||||
func (s *server) WaitForShutdown() {
|
|
||||||
s.wg.Wait()
|
|
||||||
}
|
|
||||||
|
Loading…
Reference in New Issue
Block a user