lnd: implement pendingchannels RPC

This commit implements the “pendingchannels” RPC within the rpcserver.
This RPC allows callers to receive details concerning the current
pending channels associated with the daemon. Pending channels are those
waiting for additional confirmations before they can be considered
opened/closed.

At the time of this commit, only open channels are shown. A future
commit will also add the confirmation updates, along with information
for close channels.
This commit is contained in:
Olaoluwa Osuntokun 2016-07-07 15:33:52 -07:00
parent c0383679d2
commit 07166fe88b
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
2 changed files with 113 additions and 12 deletions

@ -9,6 +9,7 @@ import (
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/txscript"
"github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil"
)
const (
@ -26,6 +27,7 @@ const (
// * deadlines, etc.
type reservationWithCtx struct {
reservation *lnwallet.ChannelReservation
peer *peer
resp chan *wire.OutPoint
err chan error
@ -184,6 +186,30 @@ func (f *fundingManager) NumPendingChannels() uint32 {
return <-resp
}
type pendingChannel struct {
peerId int32
lightningID [32]byte
channelPoint *wire.OutPoint
capacity btcutil.Amount
localBalance btcutil.Amount
remoteBalance btcutil.Amount
}
type pendingChansReq struct {
resp chan []*pendingChannel
}
// PendingChannels returns a slice describing all the channels which are
// currently pending at the last state of the funding workflow.
func (f *fundingManager) PendingChannels() []*pendingChannel {
resp := make(chan []*pendingChannel, 1)
req := &pendingChansReq{resp}
f.queries <- req
return <-resp
}
// reservationCoordinator is the primary goroutine tasked with progressing the
// funding workflow between the wallet, and any outside peers or local callers.
//
@ -210,11 +236,9 @@ out:
case req := <-f.queries:
switch msg := req.(type) {
case *numPendingReq:
var numPending uint32
for _, peerChannels := range f.activeReservations {
numPending += uint32(len(peerChannels))
}
msg.resp <- numPending
f.handleNumPending(msg)
case *pendingChansReq:
f.handlePendingChannels(msg)
}
case <-f.quit:
break out
@ -224,6 +248,41 @@ out:
f.wg.Done()
}
// handleNumPending handles a request for the total number of pending channels.
func (f *fundingManager) handleNumPending(msg *numPendingReq) {
var numPending uint32
for _, peerChannels := range f.activeReservations {
numPending += uint32(len(peerChannels))
}
msg.resp <- numPending
}
// handlePendingChannels responds to a request for details concerning all
// currently pending channels waiting for the final phase of the funding
// workflow (funding txn confirmation).
func (f *fundingManager) handlePendingChannels(msg *pendingChansReq) {
var pendingChannels []*pendingChannel
for peerID, peerChannels := range f.activeReservations {
for _, pendingChan := range peerChannels {
peer := pendingChan.peer
res := pendingChan.reservation
localFund := res.OurContribution().FundingAmount
remoteFund := res.TheirContribution().FundingAmount
pendingChan := &pendingChannel{
peerId: peerID,
lightningID: peer.lightningID,
channelPoint: res.FundingOutpoint(),
capacity: localFund + remoteFund,
localBalance: localFund,
remoteBalance: remoteFund,
}
pendingChannels = append(pendingChannels, pendingChan)
}
}
msg.resp <- pendingChannels
}
// processFundingRequest sends a message to the fundingManager allowing it to
// intiate the new funding workflow with the source peer.
func (f *fundingManager) processFundingRequest(msg *lnwire.SingleFundingRequest, peer *peer) {
@ -233,7 +292,7 @@ func (f *fundingManager) processFundingRequest(msg *lnwire.SingleFundingRequest,
// handleSingleFundingRequest creates an initial 'ChannelReservation' within
// the wallet, then responds to the source peer with a single funder response
// message progressing the funding workflow.
// TODO(roasbeef): add erorr chan to all, let channelManager handle
// TODO(roasbeef): add error chan to all, let channelManager handle
// error+propagate
func (f *fundingManager) handleFundingRequest(fmsg *fundingRequestMsg) {
msg := fmsg.msg
@ -264,6 +323,7 @@ func (f *fundingManager) handleFundingRequest(fmsg *fundingRequestMsg) {
}
f.activeReservations[fmsg.peer.id][msg.ChannelID] = &reservationWithCtx{
reservation: reservation,
peer: fmsg.peer,
}
f.resMtx.Unlock()
@ -452,12 +512,6 @@ func (f *fundingManager) handleFundingSignComplete(fmsg *fundingSignCompleteMsg)
return
}
// This reservation is no longer pending as the funding transaction has been
// broadcast, so we can now delete it.
f.resMtx.Lock()
delete(f.activeReservations[fmsg.peer.id], chanID)
f.resMtx.Unlock()
fundingPoint := resCtx.reservation.FundingOutpoint()
fndgLog.Infof("Finalizing pendingID(%v) over ChannelPoint(%v), "+
"waiting for channel open on-chain", chanID, fundingPoint)
@ -468,7 +522,16 @@ func (f *fundingManager) handleFundingSignComplete(fmsg *fundingSignCompleteMsg)
go func() {
// TODO(roasbeef): semaphore to limit active chan open goroutines
select {
// TODO(roasbeef): need to persist pending broadcast channels,
// send chan open proof during scan of blocks mined while down.
case openChan := <-resCtx.reservation.DispatchChan():
// This reservation is no longer pending as the funding
// transaction has been fully confirmed.
f.resMtx.Lock()
delete(f.activeReservations[fmsg.peer.id], chanID)
f.resMtx.Unlock()
fndgLog.Infof("ChannelPoint(%v) with peerID(%v) is now active",
fundingPoint, fmsg.peer.id)
@ -599,6 +662,7 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) {
}
f.activeReservations[msg.peer.id][chanID] = &reservationWithCtx{
reservation: reservation,
peer: msg.peer,
err: msg.err,
resp: msg.resp,
}

@ -381,3 +381,40 @@ func (r *rpcServer) WalletBalance(ctx context.Context,
return &lnrpc.WalletBalanceResponse{balance}, nil
}
// PendingChannels returns a list of all the channels that are currently
// considered "pending". A channel is pending if it has finished the funding
// workflow and is waiting for confirmations for the funding txn, or is in the
// process of closure, either initiated cooperatively or non-coopertively.
func (r *rpcServer) PendingChannels(ctx context.Context,
in *lnrpc.PendingChannelRequest) (*lnrpc.PendingChannelResponse, error) {
both := in.Status == lnrpc.ChannelStatus_ALL
includeOpen := (in.Status == lnrpc.ChannelStatus_OPENING) || both
includeClose := (in.Status == lnrpc.ChannelStatus_CLOSING) || both
rpcsLog.Debugf("[pendingchannels] %v", in.Status)
var pendingChannels []*lnrpc.PendingChannelResponse_PendingChannel
if includeOpen {
pendingOpenChans := r.server.fundingMgr.PendingChannels()
for _, pendingOpen := range pendingOpenChans {
// TODO(roasbeef): add confirmation progress
pendingChan := &lnrpc.PendingChannelResponse_PendingChannel{
PeerId: pendingOpen.peerId,
LightningId: hex.EncodeToString(pendingOpen.lightningID[:]),
ChannelPoint: pendingOpen.channelPoint.String(),
Capacity: int64(pendingOpen.capacity),
LocalBalance: int64(pendingOpen.localBalance),
RemoteBalance: int64(pendingOpen.remoteBalance),
Status: lnrpc.ChannelStatus_OPENING,
}
pendingChannels = append(pendingChannels, pendingChan)
}
}
if includeClose {
}
return &lnrpc.PendingChannelResponse{
PendingChannels: pendingChannels,
}, nil
}