lnd: add async updates for [open|close]channel RPC's

This commit modifies the internal workflow for opening or closing a
channel in order to create a path in which RPC clients can receive
updates. Updates are now communicated via channels from the goroutines
spawned by the RPC server to process the request, and the sub-system
within the daemon that actually executes the request.

With this change clients can now receive updates that the request is
pending (final message has been sent to the target client), or that the
request has been completed. Confirmation related updates have not yet
been implemented as that will require some changes to the ChainNotifier
interface.
This commit is contained in:
Olaoluwa Osuntokun 2016-08-30 16:52:53 -07:00
parent 05ac8d3c47
commit 832fd248cd
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
6 changed files with 160 additions and 115 deletions

View File

@ -4,6 +4,7 @@ import (
"sync"
"sync/atomic"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcd/btcec"
@ -32,8 +33,8 @@ type reservationWithCtx struct {
reservation *lnwallet.ChannelReservation
peer *peer
resp chan *wire.OutPoint
err chan error
updates chan *lnrpc.OpenStatusUpdate
err chan error
}
// initFundingMsg is sent by an outside sub-system to the funding manager in
@ -43,9 +44,6 @@ type reservationWithCtx struct {
// the workflow.
type initFundingMsg struct {
peer *peer
err chan error
resp chan *wire.OutPoint
*openChanReq
}
@ -397,6 +395,7 @@ func (f *fundingManager) handleFundingResponse(fmsg *fundingResponseMsg) {
_, addrs, _, err := txscript.ExtractPkScriptAddrs(msg.DeliveryPkScript, activeNetParams.Params)
if err != nil {
fndgLog.Errorf("Unable to extract addresses from script: %v", err)
resCtx.err <- err
return
}
contribution := &lnwallet.ChannelContribution{
@ -411,6 +410,7 @@ func (f *fundingManager) handleFundingResponse(fmsg *fundingResponseMsg) {
fndgLog.Errorf("Unable to process contribution from %v: %v",
sourcePeer, err)
fmsg.peer.Disconnect()
resCtx.err <- err
return
}
@ -422,6 +422,7 @@ func (f *fundingManager) handleFundingResponse(fmsg *fundingResponseMsg) {
commitSig, err := btcec.ParseSignature(sig, btcec.S256())
if err != nil {
fndgLog.Errorf("Unable to parse signature: %v", err)
resCtx.err <- err
return
}
@ -524,6 +525,7 @@ func (f *fundingManager) handleFundingSignComplete(fmsg *fundingSignCompleteMsg)
if err := resCtx.reservation.CompleteReservation(nil, commitSig); err != nil {
fndgLog.Errorf("unable to complete reservation sign complete: %v", err)
fmsg.peer.Disconnect()
resCtx.err <- err
return
}
@ -531,16 +533,27 @@ func (f *fundingManager) handleFundingSignComplete(fmsg *fundingSignCompleteMsg)
fndgLog.Infof("Finalizing pendingID(%v) over ChannelPoint(%v), "+
"waiting for channel open on-chain", chanID, fundingPoint)
// Send an update to the upstream client that the negotiation process
// is over.
// TODO(roasbeef): add abstraction over updates to accomdate
// long-polling, or SSE, etc.
resCtx.updates <- &lnrpc.OpenStatusUpdate{
Update: &lnrpc.OpenStatusUpdate_ChanPending{
ChanPending: &lnrpc.PendingUpdate{
Txid: fundingPoint.Hash[:],
},
},
}
// Spawn a goroutine which will send the newly open channel to the
// source peer once the channel is open. A channel is considered "open"
// once it reaches a sufficient number of confirmations.
// TODO(roasbeef): semaphore to limit active chan open goroutines
go func() {
// TODO(roasbeef): semaphore to limit active chan open goroutines
select {
// TODO(roasbeef): need to persist pending broadcast channels,
// send chan open proof during scan of blocks mined while down.
case openChan := <-resCtx.reservation.DispatchChan():
// This reservation is no longer pending as the funding
// transaction has been fully confirmed.
f.resMtx.Lock()
@ -567,9 +580,33 @@ func (f *fundingManager) handleFundingSignComplete(fmsg *fundingSignCompleteMsg)
fundingOpen := lnwire.NewSingleFundingOpenProof(chanID, spvProof)
fmsg.peer.queueMsg(fundingOpen, nil)
// Finally, respond to the original caller (if any).
resCtx.err <- nil
resCtx.resp <- resCtx.reservation.FundingOutpoint()
// Register the new link wtith the L3 routing manager
// so this new channel can be utilized during path
// finding.
chanInfo := openChan.StateSnapshot()
capacity := float64(chanInfo.Capacity)
fmsg.peer.server.routingMgr.AddChannel(
graph.NewID(fmsg.peer.server.lightningID),
graph.NewID(chanInfo.RemoteID),
graph.NewEdgeID(fundingPoint.Hash.String()),
&rt.ChannelInfo{
Cpt: capacity,
},
)
// Finally give the caller a final update notifying
// them that the channel is now open.
// TODO(roasbeef): helper funcs for proto construction
resCtx.updates <- &lnrpc.OpenStatusUpdate{
Update: &lnrpc.OpenStatusUpdate_ChanOpen{
ChanOpen: &lnrpc.ChannelOpenUpdate{
ChannelPoint: &lnrpc.ChannelPoint{
FundingTxid: fundingPoint.Hash[:],
OutputIndex: fundingPoint.Index,
},
},
},
}
return
case <-f.quit:
return
@ -617,6 +654,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
fndgLog.Infof("FundingOpen: ChannelPoint(%v) with peerID(%v) is now open",
resCtx.reservation.FundingOutpoint, fmsg.peer.id)
// Notify the L3 routing manager of the newly active channel link.
capacity := float64(resCtx.reservation.OurContribution().FundingAmount +
resCtx.reservation.TheirContribution().FundingAmount)
fmsg.peer.server.routingMgr.AddChannel(
@ -627,23 +665,19 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
Cpt: capacity,
},
)
// Finally, notify the target peer of the newly open channel.
fmsg.peer.newChannels <- openChan
}
// initFundingWorkflow sends a message to the funding manager instructing it
// to initiate a single funder workflow with the source peer.
// TODO(roasbeef): re-visit blocking nature..
func (f *fundingManager) initFundingWorkflow(targetPeer *peer, req *openChanReq) (*wire.OutPoint, error) {
errChan := make(chan error, 1)
respChan := make(chan *wire.OutPoint, 1)
func (f *fundingManager) initFundingWorkflow(targetPeer *peer, req *openChanReq) {
f.fundingRequests <- &initFundingMsg{
peer: targetPeer,
resp: respChan,
err: errChan,
openChanReq: req,
}
return <-respChan, <-errChan
}
// handleInitFundingMsg creates a channel reservation within the daemon's
@ -667,7 +701,6 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) {
reservation, err := f.wallet.InitChannelReservation(capacity, localAmt,
nodeID, uint16(numConfs), 4)
if err != nil {
msg.resp <- nil
msg.err <- err
return
}
@ -689,8 +722,8 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) {
f.activeReservations[msg.peer.id][chanID] = &reservationWithCtx{
reservation: reservation,
peer: msg.peer,
updates: msg.updates,
err: msg.err,
resp: msg.resp,
}
f.resMtx.Unlock()
@ -700,7 +733,6 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) {
deliveryScript, err := txscript.PayToAddrScript(contribution.DeliveryAddress)
if err != nil {
fndgLog.Errorf("Unable to convert address to pkscript: %v", err)
msg.resp <- nil
msg.err <- err
return
}

View File

@ -8,6 +8,7 @@ import (
"time"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil"
@ -325,7 +326,6 @@ func (h *htlcSwitch) handleUnregisterLink(req *unregisterLinkMsg) {
func (h *htlcSwitch) handleCloseLink(req *closeLinkReq) {
targetLink, ok := h.chanIndex[*req.chanPoint]
if !ok {
req.resp <- nil
req.err <- fmt.Errorf("channel point %v not found", req.chanPoint)
return
}
@ -396,27 +396,20 @@ func (h *htlcSwitch) UnregisterLink(chanInterface [32]byte, chanPoint *wire.OutP
type closeLinkReq struct {
chanPoint *wire.OutPoint
resp chan *closeLinkResp
err chan error
}
// closeChanResp is the response to a closeChanReq is simply houses a boolean
// value indicating if the channel coopertive channel closure was succesful or not.
type closeLinkResp struct {
txid *wire.ShaHash
success bool
updates chan *lnrpc.CloseStatusUpdate
err chan error
}
// CloseLink closes an active link targetted by it's channel point. Closing the
// link initiates a cooperative channel closure.
// TODO(roabeef): bool flag for timeout/force
func (h *htlcSwitch) CloseLink(chanPoint *wire.OutPoint) (chan *closeLinkResp, chan error) {
respChan := make(chan *closeLinkResp, 1)
func (h *htlcSwitch) CloseLink(chanPoint *wire.OutPoint) (chan *lnrpc.CloseStatusUpdate, chan error) {
updateChan := make(chan *lnrpc.CloseStatusUpdate, 1)
errChan := make(chan error, 1)
h.linkControl <- &closeLinkReq{chanPoint, respChan, errChan}
h.linkControl <- &closeLinkReq{chanPoint, updateChan, errChan}
return respChan, errChan
return updateChan, errChan
}
// linkInfoUpdateMsg encapsulates a request for the htlc switch to update the

2
lnd.go
View File

@ -143,7 +143,7 @@ func lndMain() error {
lnrpc.RegisterLightningServer(grpcServer, server.rpcServer)
// Finally, start the grpc server listening for HTTP/2 connections.
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", loadedConfig.RPCPort))
lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", loadedConfig.RPCPort))
if err != nil {
fmt.Printf("failed to listen: %v", err)
return err

41
peer.go
View File

@ -12,6 +12,7 @@ import (
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lndc"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcd/btcec"
@ -639,7 +640,6 @@ func (p *peer) handleLocalClose(req *closeLinkReq) {
// when the remote node broadcasts the fully signed closing transaction.
sig, txid, err := channel.InitCooperativeClose()
if err != nil {
req.resp <- nil
req.err <- err
return
}
@ -653,22 +653,33 @@ func (p *peer) handleLocalClose(req *closeLinkReq) {
// TODO(roasbeef): remove encoding redundancy
closeSig, err := btcec.ParseSignature(sig, btcec.S256())
if err != nil {
req.resp <- nil
req.err <- err
return
}
closeReq := lnwire.NewCloseRequest(chanPoint, closeSig)
p.queueMsg(closeReq, nil)
// Update the caller w.r.t the current pending state of this request.
req.updates <- &lnrpc.CloseStatusUpdate{
Update: &lnrpc.CloseStatusUpdate_ClosePending{
ClosePending: &lnrpc.PendingUpdate{
Txid: txid[:],
},
},
}
// Finally, launch a goroutine which will request to be notified by the
// ChainNotifier once the closure transaction obtains a single
// confirmation.
go func() {
// TODO(roasbeef): add param for num needed confs
notifier := p.server.lnwallet.ChainNotifier
confNtfn, _ := notifier.RegisterConfirmationsNtfn(txid, 1)
confNtfn, err := notifier.RegisterConfirmationsNtfn(txid, 1)
if err != nil {
req.err <- err
return
}
var success bool
select {
case height, ok := <-confNtfn.Confirmed:
// In the case that the ChainNotifier is shutting
@ -683,17 +694,24 @@ func (p *peer) handleLocalClose(req *closeLinkReq) {
// active indexes, and the database state.
peerLog.Infof("ChannelPoint(%v) is now "+
"closed at height %v", key, height)
wipeChannel(p, channel)
success = true
if err := wipeChannel(p, channel); err != nil {
req.err <- err
return
}
case <-p.quit:
return
}
// Respond to the local sub-system which requested the channel
// closure.
req.resp <- &closeLinkResp{txid, success}
req.err <- nil
req.updates <- &lnrpc.CloseStatusUpdate{
Update: &lnrpc.CloseStatusUpdate_ChanClose{
ChanClose: &lnrpc.ChannelCloseUpdate{
ClosingTxid: txid[:],
Success: true,
},
},
}
}()
}
@ -741,7 +759,7 @@ func (p *peer) handleRemoteClose(req *lnwire.CloseRequest) {
// wipeChannel removes the passed channel from all indexes associated with the
// peer, and deletes the channel from the database.
func wipeChannel(p *peer, channel *lnwallet.LightningChannel) {
func wipeChannel(p *peer, channel *lnwallet.LightningChannel) error {
chanID := channel.ChannelPoint()
delete(p.activeChannels, *chanID)
@ -756,7 +774,10 @@ func wipeChannel(p *peer, channel *lnwallet.LightningChannel) {
if err := channel.DeleteState(); err != nil {
peerLog.Errorf("Unable to delete ChannelPoint(%v) "+
"from db %v", chanID, err)
return err
}
return nil
}
// pendingPayment represents a pending HTLC which has yet to be settled by the

View File

@ -201,30 +201,43 @@ func (r *rpcServer) OpenChannel(in *lnrpc.OpenChannelRequest,
remoteFundingAmt := btcutil.Amount(in.RemoteFundingAmount)
target := in.TargetPeerId
numConfs := in.NumConfs
respChan, errChan := r.server.OpenChannel(target, localFundingAmt,
updateChan, errChan := r.server.OpenChannel(target, localFundingAmt,
remoteFundingAmt, numConfs)
if err := <-errChan; err != nil {
rpcsLog.Errorf("unable to open channel to peerid(%v): %v",
target, err)
return err
var outpoint wire.OutPoint
out:
for {
select {
case err := <-errChan:
rpcsLog.Errorf("unable to open channel to peerid(%v): %v",
target, err)
return err
case fundingUpdate := <-updateChan:
rpcsLog.Tracef("[openchannel] sending update: %v",
fundingUpdate)
if err := updateStream.Send(fundingUpdate); err != nil {
return err
}
// If a final channel open update is being sent, then
// we can break out of our recv loop as we no longer
// need to process any further updates.
switch update := fundingUpdate.Update.(type) {
case *lnrpc.OpenStatusUpdate_ChanOpen:
chanPoint := update.ChanOpen.ChannelPoint
h, _ := wire.NewShaHash(chanPoint.FundingTxid)
outpoint = wire.OutPoint{
Hash: *h,
Index: chanPoint.OutputIndex,
}
break out
}
case <-r.quit:
return nil
}
}
var outpoint *wire.OutPoint
select {
case resp := <-respChan:
outpoint = resp.chanPoint
openUpdate := &lnrpc.ChannelOpenUpdate{
&lnrpc.ChannelPoint{
FundingTxid: outpoint.Hash[:],
OutputIndex: outpoint.Index,
},
}
if err := updateStream.Send(openUpdate); err != nil {
return err
}
case <-r.quit:
return nil
}
rpcsLog.Tracef("[openchannel] success peerid(%v), ChannelPoint(%v)",
in.TargetPeerId, outpoint)
return nil
@ -247,24 +260,33 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest,
rpcsLog.Tracef("[closechannel] request for ChannelPoint(%v)",
targetChannelPoint)
respChan, errChan := r.server.htlcSwitch.CloseLink(targetChannelPoint)
if err := <-errChan; err != nil {
rpcsLog.Errorf("Unable to close ChannelPoint(%v): %v",
targetChannelPoint, err)
return err
}
updateChan, errChan := r.server.htlcSwitch.CloseLink(targetChannelPoint)
select {
case resp := <-respChan:
closeUpdate := &lnrpc.ChannelCloseUpdate{
ClosingTxid: resp.txid[:],
Success: resp.success,
}
if err := updateStream.Send(closeUpdate); err != nil {
out:
for {
select {
case err := <-errChan:
rpcsLog.Errorf("[closechannel] unable to close "+
"ChannelPoint(%v): %v", targetChannelPoint, err)
return err
case closingUpdate := <-updateChan:
if err := updateStream.Send(closingUpdate); err != nil {
return err
}
// If a final channel closing updates is being sent,
// then we can break out of our dispatch loop as we no
// longer need to process any further updates.
switch closeUpdate := closingUpdate.Update.(type) {
case *lnrpc.CloseStatusUpdate_ChanClose:
h, _ := wire.NewShaHash(closeUpdate.ChanClose.ClosingTxid)
rpcsLog.Errorf("[closechannel] close completed: "+
"txid(%v)", h)
break out
}
case <-r.quit:
return nil
}
case <-r.quit:
return nil
}
return nil
@ -350,6 +372,7 @@ func (r *rpcServer) ListPeers(ctx context.Context,
// by the wallet. This method can be modified by having the request specify
// only witness outputs should be factored into the final output sum.
// TODO(roasbeef): split into total and confirmed/unconfirmed
// TODO(roasbeef): add async hooks into wallet balance changes
func (r *rpcServer) WalletBalance(ctx context.Context,
in *lnrpc.WalletBalanceRequest) (*lnrpc.WalletBalanceResponse, error) {

View File

@ -10,14 +10,13 @@ import (
"github.com/btcsuite/fastsha256"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lndc"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil"
"github.com/BitfuryLightning/tools/routing"
"github.com/BitfuryLightning/tools/rt"
"github.com/BitfuryLightning/tools/rt/graph"
"github.com/roasbeef/btcwallet/waddrmgr"
)
@ -229,14 +228,8 @@ type openChanReq struct {
numConfs uint32
resp chan *openChanResp
err chan error
}
// openChanResp is the response to an openChanReq, it contains the channel
// point, or outpoint of the broadcast funding transaction.
type openChanResp struct {
chanPoint *wire.OutPoint
updates chan *lnrpc.OpenStatusUpdate
err chan error
}
// queryHandler handles any requests to modify the server's internal state of
@ -389,7 +382,6 @@ func (s *server) handleOpenChanReq(req *openChanReq) {
}
if targetPeer == nil {
req.resp <- nil
req.err <- fmt.Errorf("unable to find peer %v", target)
return
}
@ -398,23 +390,8 @@ func (s *server) handleOpenChanReq(req *openChanReq) {
// manager. This allows the server to continue handling queries instead of
// blocking on this request which is exporeted as a synchronous request to
// the outside world.
go func() {
// TODO(roasbeef): server semaphore to restrict num goroutines
fundingID, err := s.fundingMgr.initFundingWorkflow(targetPeer, req)
if err == nil {
capacity := float64(req.localFundingAmt + req.remoteFundingAmt)
s.routingMgr.AddChannel(
graph.NewID(s.lightningID),
graph.NewID([32]byte(targetPeer.lightningID)),
graph.NewEdgeID(fundingID.String()),
&rt.ChannelInfo{
Cpt: capacity,
},
)
}
req.resp <- &openChanResp{fundingID}
req.err <- err
}()
// TODO(roasbeef): server semaphore to restrict num goroutines
go s.fundingMgr.initFundingWorkflow(targetPeer, req)
}
// ConnectToPeer requests that the server connect to a Lightning Network peer
@ -432,10 +409,10 @@ func (s *server) ConnectToPeer(addr *lndc.LNAdr) (int32, error) {
// OpenChannel sends a request to the server to open a channel to the specified
// peer identified by ID with the passed channel funding paramters.
func (s *server) OpenChannel(nodeID int32, localAmt, remoteAmt btcutil.Amount,
numConfs uint32) (chan *openChanResp, chan error) {
numConfs uint32) (chan *lnrpc.OpenStatusUpdate, chan error) {
errChan := make(chan error, 1)
respChan := make(chan *openChanResp, 1)
updateChan := make(chan *lnrpc.OpenStatusUpdate, 1)
s.queries <- &openChanReq{
targetNodeID: nodeID,
@ -443,12 +420,11 @@ func (s *server) OpenChannel(nodeID int32, localAmt, remoteAmt btcutil.Amount,
remoteFundingAmt: remoteAmt,
numConfs: numConfs,
resp: respChan,
err: errChan,
updates: updateChan,
err: errChan,
}
// TODO(roasbeef): hook in "progress" channel
return respChan, errChan
return updateChan, errChan
}
// Peers returns a slice of all active peers.