lnd.xprv/rpcserver.go
BitfuryLightning f8c851769f multi: initial integration of routing module
This commit integrates BitFury's current routing functionality into lnd. The
primary ochestration point for the routing sub-system in the routingMgr. The
routingMgr manages all persistent and volatile state related to routing within
the network.

Newly opened channels, either when the initiator or responder are inserted into
the routing table once the channel is fully open. Once new links are inserted
the routingMgr can then perform path selection in order to locate an "optimal"
path to a target destination.
2016-08-11 11:20:27 -07:00

494 lines
15 KiB
Go

package main
import (
"encoding/hex"
"fmt"
"io"
"sync"
"sync/atomic"
"github.com/lightningnetwork/lnd/lndc"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcd/txscript"
"github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil"
"github.com/roasbeef/btcwallet/waddrmgr"
"golang.org/x/net/context"
)
var (
defaultAccount uint32 = waddrmgr.DefaultAccountNum
)
// rpcServer is a gRPC, RPC front end to the lnd daemon.
type rpcServer struct {
started int32 // To be used atomically.
shutdown int32 // To be used atomically.
server *server
wg sync.WaitGroup
quit chan struct{}
}
// A compile time check to ensure that rpcServer fully implements the
// LightningServer gRPC service.
var _ lnrpc.LightningServer = (*rpcServer)(nil)
// newRpcServer creates and returns a new instance of the rpcServer.
func newRpcServer(s *server) *rpcServer {
return &rpcServer{server: s, quit: make(chan struct{}, 1)}
}
// Start launches any helper goroutines required for the rpcServer
// to function.
func (r *rpcServer) Start() error {
if atomic.AddInt32(&r.started, 1) != 1 {
return nil
}
return nil
}
// Stop signals any active goroutines for a graceful closure.
func (r *rpcServer) Stop() error {
if atomic.AddInt32(&r.shutdown, 1) != 1 {
return nil
}
close(r.quit)
return nil
}
// addrPairsToOutputs converts a map describing a set of outputs to be created,
// the outputs themselves. The passed map pairs up an address, to a desired
// output value amount. Each address is converted to its corresponding pkScript
// to be used within the constructed output(s).
func addrPairsToOutputs(addrPairs map[string]int64) ([]*wire.TxOut, error) {
outputs := make([]*wire.TxOut, 0, len(addrPairs))
for addr, amt := range addrPairs {
addr, err := btcutil.DecodeAddress(addr, activeNetParams.Params)
if err != nil {
return nil, err
}
pkscript, err := txscript.PayToAddrScript(addr)
if err != nil {
return nil, err
}
outputs = append(outputs, wire.NewTxOut(amt, pkscript))
}
return outputs, nil
}
// sendCoinsOnChain makes an on-chain transaction in or to send coins to one or
// more addresses specified in the passed payment map. The payment map maps an
// address to a specified output value to be sent to that address.
func (r *rpcServer) sendCoinsOnChain(paymentMap map[string]int64) (*wire.ShaHash, error) {
outputs, err := addrPairsToOutputs(paymentMap)
if err != nil {
return nil, err
}
return r.server.lnwallet.SendOutputs(outputs, defaultAccount, 1)
}
// SendCoins executes a request to send coins to a particular address. Unlike
// SendMany, this RPC call only allows creating a single output at a time.
func (r *rpcServer) SendCoins(ctx context.Context,
in *lnrpc.SendCoinsRequest) (*lnrpc.SendCoinsResponse, error) {
rpcsLog.Infof("[sendcoins] addr=%v, amt=%v", in.Addr, btcutil.Amount(in.Amount))
paymentMap := map[string]int64{in.Addr: in.Amount}
txid, err := r.sendCoinsOnChain(paymentMap)
if err != nil {
return nil, err
}
rpcsLog.Infof("[sendcoins] spend generated txid: %v", txid.String())
return &lnrpc.SendCoinsResponse{Txid: txid.String()}, nil
}
// SendMany handles a request for a transaction create multiple specified
// outputs in parallel.
func (r *rpcServer) SendMany(ctx context.Context,
in *lnrpc.SendManyRequest) (*lnrpc.SendManyResponse, error) {
txid, err := r.sendCoinsOnChain(in.AddrToAmount)
if err != nil {
return nil, err
}
rpcsLog.Infof("[sendmany] spend generated txid: %v", txid.String())
return &lnrpc.SendManyResponse{Txid: txid.String()}, nil
}
// NewAddress creates a new address under control of the local wallet.
func (r *rpcServer) NewAddress(ctx context.Context,
in *lnrpc.NewAddressRequest) (*lnrpc.NewAddressResponse, error) {
r.server.lnwallet.KeyGenMtx.Lock()
defer r.server.lnwallet.KeyGenMtx.Unlock()
// Translate the gRPC proto address type to the wallet controller's
// available address types.
var addrType waddrmgr.AddressType
switch in.Type {
case lnrpc.NewAddressRequest_WITNESS_PUBKEY_HASH:
addrType = waddrmgr.WitnessPubKey
case lnrpc.NewAddressRequest_NESTED_PUBKEY_HASH:
addrType = waddrmgr.NestedWitnessPubKey
case lnrpc.NewAddressRequest_PUBKEY_HASH:
addrType = waddrmgr.PubKeyHash
}
addr, err := r.server.lnwallet.NewAddress(defaultAccount,
addrType)
if err != nil {
return nil, err
}
rpcsLog.Infof("[newaddress] addr=%v", addr.String())
return &lnrpc.NewAddressResponse{Address: addr.String()}, nil
}
// ConnectPeer attempts to establish a connection to a remote peer.
func (r *rpcServer) ConnectPeer(ctx context.Context,
in *lnrpc.ConnectPeerRequest) (*lnrpc.ConnectPeerResponse, error) {
if in.Addr == nil {
return nil, fmt.Errorf("need: lnc pubkeyhash@hostname")
}
idAtHost := fmt.Sprintf("%v@%v", in.Addr.PubKeyHash, in.Addr.Host)
rpcsLog.Debugf("[connectpeer] peer=%v", idAtHost)
peerAddr, err := lndc.LnAddrFromString(idAtHost, activeNetParams.Params)
if err != nil {
rpcsLog.Errorf("(connectpeer): error parsing ln addr: %v", err)
return nil, err
}
peerID, err := r.server.ConnectToPeer(peerAddr)
if err != nil {
rpcsLog.Errorf("(connectpeer): error connecting to peer: %v", err)
return nil, err
}
rpcsLog.Debugf("Connected to peer: %v", peerAddr.String())
return &lnrpc.ConnectPeerResponse{peerID}, nil
}
// OpenChannel attempts to open a singly funded channel specified in the
// request to a remote peer.
func (r *rpcServer) OpenChannel(in *lnrpc.OpenChannelRequest,
updateStream lnrpc.Lightning_OpenChannelServer) error {
rpcsLog.Tracef("[openchannel] request to peerid(%v) "+
"allocation(us=%v, them=%v) numconfs=%v", in.TargetPeerId,
in.LocalFundingAmount, in.RemoteFundingAmount, in.NumConfs)
localFundingAmt := btcutil.Amount(in.LocalFundingAmount)
remoteFundingAmt := btcutil.Amount(in.RemoteFundingAmount)
target := in.TargetPeerId
numConfs := in.NumConfs
respChan, errChan := r.server.OpenChannel(target, localFundingAmt,
remoteFundingAmt, numConfs)
if err := <-errChan; err != nil {
rpcsLog.Errorf("unable to open channel to peerid(%v): %v",
target, err)
return err
}
var outpoint *wire.OutPoint
select {
case resp := <-respChan:
outpoint = resp.chanPoint
openUpdate := &lnrpc.ChannelOpenUpdate{
&lnrpc.ChannelPoint{
FundingTxid: outpoint.Hash[:],
OutputIndex: outpoint.Index,
},
}
if err := updateStream.Send(openUpdate); err != nil {
return err
}
case <-r.quit:
return nil
}
rpcsLog.Tracef("[openchannel] success peerid(%v), ChannelPoint(%v)",
in.TargetPeerId, outpoint)
return nil
}
// CloseChannel attempts to close an active channel identified by its channel
// point. The actions of this method can additionally be augmented to attempt
// a force close after a timeout period in the case of an inactive peer.
func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest,
updateStream lnrpc.Lightning_CloseChannelServer) error {
index := in.ChannelPoint.OutputIndex
txid, err := wire.NewShaHash(in.ChannelPoint.FundingTxid)
if err != nil {
rpcsLog.Errorf("[closechannel] invalid txid: %v", err)
return err
}
targetChannelPoint := wire.NewOutPoint(txid, index)
rpcsLog.Tracef("[closechannel] request for ChannelPoint(%v)",
targetChannelPoint)
respChan, errChan := r.server.htlcSwitch.CloseLink(targetChannelPoint)
if err := <-errChan; err != nil {
rpcsLog.Errorf("Unable to close ChannelPoint(%v): %v",
targetChannelPoint, err)
return err
}
select {
case resp := <-respChan:
closeUpdate := &lnrpc.ChannelCloseUpdate{
ClosingTxid: resp.txid[:],
Success: resp.success,
}
if err := updateStream.Send(closeUpdate); err != nil {
return err
}
case <-r.quit:
return nil
}
return nil
}
// GetInfo serves a request to the "getinfo" RPC call. This call returns
// general information concerning the lightning node including it's LN ID,
// identity address, and information concerning the number of open+pending
// channels.
func (r *rpcServer) GetInfo(ctx context.Context,
in *lnrpc.GetInfoRequest) (*lnrpc.GetInfoResponse, error) {
var activeChannels uint32
serverPeers := r.server.Peers()
for _, serverPeer := range serverPeers {
activeChannels += uint32(len(serverPeer.ChannelSnapshots()))
}
pendingChannels := r.server.fundingMgr.NumPendingChannels()
idPub := r.server.identityPriv.PubKey().SerializeCompressed()
idAddr, err := btcutil.NewAddressPubKeyHash(btcutil.Hash160(idPub), activeNetParams.Params)
if err != nil {
return nil, err
}
return &lnrpc.GetInfoResponse{
LightningId: hex.EncodeToString(r.server.lightningID[:]),
IdentityAddress: idAddr.String(),
NumPendingChannels: pendingChannels,
NumActiveChannels: activeChannels,
NumPeers: uint32(len(serverPeers)),
}, nil
}
// ListPeers returns a verbose listing of all currently active peers.
func (r *rpcServer) ListPeers(ctx context.Context,
in *lnrpc.ListPeersRequest) (*lnrpc.ListPeersResponse, error) {
rpcsLog.Tracef("[listpeers] request")
serverPeers := r.server.Peers()
resp := &lnrpc.ListPeersResponse{
Peers: make([]*lnrpc.Peer, 0, len(serverPeers)),
}
for _, serverPeer := range serverPeers {
// TODO(roasbeef): add a snapshot method which grabs peer read mtx
lnID := hex.EncodeToString(serverPeer.lightningID[:])
peer := &lnrpc.Peer{
LightningId: lnID,
PeerId: serverPeer.id,
Address: serverPeer.conn.RemoteAddr().String(),
Inbound: serverPeer.inbound,
BytesRecv: atomic.LoadUint64(&serverPeer.bytesReceived),
BytesSent: atomic.LoadUint64(&serverPeer.bytesSent),
}
chanSnapshots := serverPeer.ChannelSnapshots()
peer.Channels = make([]*lnrpc.ActiveChannel, 0, len(chanSnapshots))
for _, chanSnapshot := range chanSnapshots {
channel := &lnrpc.ActiveChannel{
RemoteId: lnID,
ChannelPoint: chanSnapshot.ChannelPoint.String(),
Capacity: int64(chanSnapshot.Capacity),
LocalBalance: int64(chanSnapshot.LocalBalance),
RemoteBalance: int64(chanSnapshot.RemoteBalance),
NumUpdates: chanSnapshot.NumUpdates,
}
peer.Channels = append(peer.Channels, channel)
}
resp.Peers = append(resp.Peers, peer)
}
rpcsLog.Debugf("[listpeers] yielded %v peers", serverPeers)
return resp, nil
}
// WalletBalance returns the sum of all confirmed unspent outputs under control
// by the wallet. This method can be modified by having the request specify
// only witness outputs should be factored into the final output sum.
// TODO(roasbeef): split into total and confirmed/unconfirmed
func (r *rpcServer) WalletBalance(ctx context.Context,
in *lnrpc.WalletBalanceRequest) (*lnrpc.WalletBalanceResponse, error) {
var balance float64
if in.WitnessOnly {
witnessOutputs, err := r.server.lnwallet.ListUnspentWitness(1)
if err != nil {
return nil, err
}
// We need to convert from BTC to satoshi here otherwise, and
// incorrect sum will be returned.
var outputSum btcutil.Amount
for _, witnessOutput := range witnessOutputs {
outputSum += btcutil.Amount(witnessOutput.Amount * 1e8)
}
balance = outputSum.ToBTC()
} else {
// TODO(roasbeef): make num confs a param
outputSum, err := r.server.lnwallet.CalculateBalance(1)
if err != nil {
return nil, err
}
balance = outputSum.ToBTC()
}
rpcsLog.Debugf("[walletbalance] balance=%v", balance)
return &lnrpc.WalletBalanceResponse{balance}, nil
}
// PendingChannels returns a list of all the channels that are currently
// considered "pending". A channel is pending if it has finished the funding
// workflow and is waiting for confirmations for the funding txn, or is in the
// process of closure, either initiated cooperatively or non-coopertively.
func (r *rpcServer) PendingChannels(ctx context.Context,
in *lnrpc.PendingChannelRequest) (*lnrpc.PendingChannelResponse, error) {
both := in.Status == lnrpc.ChannelStatus_ALL
includeOpen := (in.Status == lnrpc.ChannelStatus_OPENING) || both
includeClose := (in.Status == lnrpc.ChannelStatus_CLOSING) || both
rpcsLog.Debugf("[pendingchannels] %v", in.Status)
var pendingChannels []*lnrpc.PendingChannelResponse_PendingChannel
if includeOpen {
pendingOpenChans := r.server.fundingMgr.PendingChannels()
for _, pendingOpen := range pendingOpenChans {
// TODO(roasbeef): add confirmation progress
pendingChan := &lnrpc.PendingChannelResponse_PendingChannel{
PeerId: pendingOpen.peerId,
LightningId: hex.EncodeToString(pendingOpen.lightningID[:]),
ChannelPoint: pendingOpen.channelPoint.String(),
Capacity: int64(pendingOpen.capacity),
LocalBalance: int64(pendingOpen.localBalance),
RemoteBalance: int64(pendingOpen.remoteBalance),
Status: lnrpc.ChannelStatus_OPENING,
}
pendingChannels = append(pendingChannels, pendingChan)
}
}
if includeClose {
}
return &lnrpc.PendingChannelResponse{
PendingChannels: pendingChannels,
}, nil
}
// SendPayment dispatches a bi-directional streaming RPC for sending payments
// through the Lightning Network. A single RPC invocation creates a persistent
// bi-directional stream allowing clients to rapidly send payments through the
// Lightning Network with a single persistent connection.
func (r *rpcServer) SendPayment(paymentStream lnrpc.Lightning_SendPaymentServer) error {
errChan := make(chan error, 1)
for {
select {
case err := <-errChan:
return err
default:
// Receive the next pending payment within the stream sent by
// the client. If we read the EOF sentinel, then the client has
// closed the stream, and we can exit normally.
nextPayment, err := paymentStream.Recv()
if err == io.EOF {
return nil
} else if err != nil {
return err
}
// Craft an HTLC packet to send to the routing sub-system. The
// meta-data within this packet will be used to route the
// payment through the network.
htlcAdd := &lnwire.HTLCAddRequest{
Amount: lnwire.CreditsAmount(nextPayment.Amt),
RedemptionHashes: [][32]byte{debugHash},
}
destAddr, err := wire.NewShaHash(nextPayment.Dest)
if err != nil {
return err
}
htlcPkt := &htlcPacket{
dest: *destAddr,
msg: htlcAdd,
}
// TODO(roasbeef): semaphore to limit num outstanding
// goroutines.
go func() {
// Finally, send this next packet to the routing layer in order
// to complete the next payment.
// TODO(roasbeef): this should go through the L3 router once
// multi-hop is in place.
if err := r.server.htlcSwitch.SendHTLC(htlcPkt); err != nil {
errChan <- err
return
}
// TODO(roasbeef): proper responses
resp := &lnrpc.SendResponse{}
if err := paymentStream.Send(resp); err != nil {
errChan <- err
return
}
}()
}
}
return nil
}
func (r *rpcServer) ShowRoutingTable(ctx context.Context,
in *lnrpc.ShowRoutingTableRequest) (*lnrpc.ShowRoutingTableResponse, error) {
rpcsLog.Debugf("[ShowRoutingTable]")
rtCopy := r.server.routingMgr.GetRTCopy()
return &lnrpc.ShowRoutingTableResponse{
Rt: rtCopy.String(),
}, nil
}