From 832fd248cd03a84c8767e50f62e0d7460e0f0120 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 30 Aug 2016 16:52:53 -0700 Subject: [PATCH] lnd: add async updates for [open|close]channel RPC's This commit modifies the internal workflow for opening or closing a channel in order to create a path in which RPC clients can receive updates. Updates are now communicated via channels from the goroutines spawned by the RPC server to process the request, and the sub-system within the daemon that actually executes the request. With this change clients can now receive updates that the request is pending (final message has been sent to the target client), or that the request has been completed. Confirmation related updates have not yet been implemented as that will require some changes to the ChainNotifier interface. --- fundingmanager.go | 72 +++++++++++++++++++++++++---------- htlcswitch.go | 21 ++++------- lnd.go | 2 +- peer.go | 41 +++++++++++++++----- rpcserver.go | 95 +++++++++++++++++++++++++++++------------------ server.go | 44 +++++----------------- 6 files changed, 160 insertions(+), 115 deletions(-) diff --git a/fundingmanager.go b/fundingmanager.go index 762ae560..7c2b43c4 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -4,6 +4,7 @@ import ( "sync" "sync/atomic" + "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" "github.com/roasbeef/btcd/btcec" @@ -32,8 +33,8 @@ type reservationWithCtx struct { reservation *lnwallet.ChannelReservation peer *peer - resp chan *wire.OutPoint - err chan error + updates chan *lnrpc.OpenStatusUpdate + err chan error } // initFundingMsg is sent by an outside sub-system to the funding manager in @@ -43,9 +44,6 @@ type reservationWithCtx struct { // the workflow. type initFundingMsg struct { peer *peer - err chan error - resp chan *wire.OutPoint - *openChanReq } @@ -397,6 +395,7 @@ func (f *fundingManager) handleFundingResponse(fmsg *fundingResponseMsg) { _, addrs, _, err := txscript.ExtractPkScriptAddrs(msg.DeliveryPkScript, activeNetParams.Params) if err != nil { fndgLog.Errorf("Unable to extract addresses from script: %v", err) + resCtx.err <- err return } contribution := &lnwallet.ChannelContribution{ @@ -411,6 +410,7 @@ func (f *fundingManager) handleFundingResponse(fmsg *fundingResponseMsg) { fndgLog.Errorf("Unable to process contribution from %v: %v", sourcePeer, err) fmsg.peer.Disconnect() + resCtx.err <- err return } @@ -422,6 +422,7 @@ func (f *fundingManager) handleFundingResponse(fmsg *fundingResponseMsg) { commitSig, err := btcec.ParseSignature(sig, btcec.S256()) if err != nil { fndgLog.Errorf("Unable to parse signature: %v", err) + resCtx.err <- err return } @@ -524,6 +525,7 @@ func (f *fundingManager) handleFundingSignComplete(fmsg *fundingSignCompleteMsg) if err := resCtx.reservation.CompleteReservation(nil, commitSig); err != nil { fndgLog.Errorf("unable to complete reservation sign complete: %v", err) fmsg.peer.Disconnect() + resCtx.err <- err return } @@ -531,16 +533,27 @@ func (f *fundingManager) handleFundingSignComplete(fmsg *fundingSignCompleteMsg) fndgLog.Infof("Finalizing pendingID(%v) over ChannelPoint(%v), "+ "waiting for channel open on-chain", chanID, fundingPoint) + // Send an update to the upstream client that the negotiation process + // is over. + // TODO(roasbeef): add abstraction over updates to accomdate + // long-polling, or SSE, etc. + resCtx.updates <- &lnrpc.OpenStatusUpdate{ + Update: &lnrpc.OpenStatusUpdate_ChanPending{ + ChanPending: &lnrpc.PendingUpdate{ + Txid: fundingPoint.Hash[:], + }, + }, + } + // Spawn a goroutine which will send the newly open channel to the // source peer once the channel is open. A channel is considered "open" // once it reaches a sufficient number of confirmations. + // TODO(roasbeef): semaphore to limit active chan open goroutines go func() { - // TODO(roasbeef): semaphore to limit active chan open goroutines select { // TODO(roasbeef): need to persist pending broadcast channels, // send chan open proof during scan of blocks mined while down. case openChan := <-resCtx.reservation.DispatchChan(): - // This reservation is no longer pending as the funding // transaction has been fully confirmed. f.resMtx.Lock() @@ -567,9 +580,33 @@ func (f *fundingManager) handleFundingSignComplete(fmsg *fundingSignCompleteMsg) fundingOpen := lnwire.NewSingleFundingOpenProof(chanID, spvProof) fmsg.peer.queueMsg(fundingOpen, nil) - // Finally, respond to the original caller (if any). - resCtx.err <- nil - resCtx.resp <- resCtx.reservation.FundingOutpoint() + // Register the new link wtith the L3 routing manager + // so this new channel can be utilized during path + // finding. + chanInfo := openChan.StateSnapshot() + capacity := float64(chanInfo.Capacity) + fmsg.peer.server.routingMgr.AddChannel( + graph.NewID(fmsg.peer.server.lightningID), + graph.NewID(chanInfo.RemoteID), + graph.NewEdgeID(fundingPoint.Hash.String()), + &rt.ChannelInfo{ + Cpt: capacity, + }, + ) + + // Finally give the caller a final update notifying + // them that the channel is now open. + // TODO(roasbeef): helper funcs for proto construction + resCtx.updates <- &lnrpc.OpenStatusUpdate{ + Update: &lnrpc.OpenStatusUpdate_ChanOpen{ + ChanOpen: &lnrpc.ChannelOpenUpdate{ + ChannelPoint: &lnrpc.ChannelPoint{ + FundingTxid: fundingPoint.Hash[:], + OutputIndex: fundingPoint.Index, + }, + }, + }, + } return case <-f.quit: return @@ -617,6 +654,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { fndgLog.Infof("FundingOpen: ChannelPoint(%v) with peerID(%v) is now open", resCtx.reservation.FundingOutpoint, fmsg.peer.id) + // Notify the L3 routing manager of the newly active channel link. capacity := float64(resCtx.reservation.OurContribution().FundingAmount + resCtx.reservation.TheirContribution().FundingAmount) fmsg.peer.server.routingMgr.AddChannel( @@ -627,23 +665,19 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { Cpt: capacity, }, ) + + // Finally, notify the target peer of the newly open channel. fmsg.peer.newChannels <- openChan } // initFundingWorkflow sends a message to the funding manager instructing it // to initiate a single funder workflow with the source peer. // TODO(roasbeef): re-visit blocking nature.. -func (f *fundingManager) initFundingWorkflow(targetPeer *peer, req *openChanReq) (*wire.OutPoint, error) { - errChan := make(chan error, 1) - respChan := make(chan *wire.OutPoint, 1) +func (f *fundingManager) initFundingWorkflow(targetPeer *peer, req *openChanReq) { f.fundingRequests <- &initFundingMsg{ peer: targetPeer, - resp: respChan, - err: errChan, openChanReq: req, } - - return <-respChan, <-errChan } // handleInitFundingMsg creates a channel reservation within the daemon's @@ -667,7 +701,6 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { reservation, err := f.wallet.InitChannelReservation(capacity, localAmt, nodeID, uint16(numConfs), 4) if err != nil { - msg.resp <- nil msg.err <- err return } @@ -689,8 +722,8 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { f.activeReservations[msg.peer.id][chanID] = &reservationWithCtx{ reservation: reservation, peer: msg.peer, + updates: msg.updates, err: msg.err, - resp: msg.resp, } f.resMtx.Unlock() @@ -700,7 +733,6 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { deliveryScript, err := txscript.PayToAddrScript(contribution.DeliveryAddress) if err != nil { fndgLog.Errorf("Unable to convert address to pkscript: %v", err) - msg.resp <- nil msg.err <- err return } diff --git a/htlcswitch.go b/htlcswitch.go index 05624790..f4af64a8 100644 --- a/htlcswitch.go +++ b/htlcswitch.go @@ -8,6 +8,7 @@ import ( "time" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnwire" "github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcutil" @@ -325,7 +326,6 @@ func (h *htlcSwitch) handleUnregisterLink(req *unregisterLinkMsg) { func (h *htlcSwitch) handleCloseLink(req *closeLinkReq) { targetLink, ok := h.chanIndex[*req.chanPoint] if !ok { - req.resp <- nil req.err <- fmt.Errorf("channel point %v not found", req.chanPoint) return } @@ -396,27 +396,20 @@ func (h *htlcSwitch) UnregisterLink(chanInterface [32]byte, chanPoint *wire.OutP type closeLinkReq struct { chanPoint *wire.OutPoint - resp chan *closeLinkResp - err chan error -} - -// closeChanResp is the response to a closeChanReq is simply houses a boolean -// value indicating if the channel coopertive channel closure was succesful or not. -type closeLinkResp struct { - txid *wire.ShaHash - success bool + updates chan *lnrpc.CloseStatusUpdate + err chan error } // CloseLink closes an active link targetted by it's channel point. Closing the // link initiates a cooperative channel closure. // TODO(roabeef): bool flag for timeout/force -func (h *htlcSwitch) CloseLink(chanPoint *wire.OutPoint) (chan *closeLinkResp, chan error) { - respChan := make(chan *closeLinkResp, 1) +func (h *htlcSwitch) CloseLink(chanPoint *wire.OutPoint) (chan *lnrpc.CloseStatusUpdate, chan error) { + updateChan := make(chan *lnrpc.CloseStatusUpdate, 1) errChan := make(chan error, 1) - h.linkControl <- &closeLinkReq{chanPoint, respChan, errChan} + h.linkControl <- &closeLinkReq{chanPoint, updateChan, errChan} - return respChan, errChan + return updateChan, errChan } // linkInfoUpdateMsg encapsulates a request for the htlc switch to update the diff --git a/lnd.go b/lnd.go index bb8cb928..c478fdba 100644 --- a/lnd.go +++ b/lnd.go @@ -143,7 +143,7 @@ func lndMain() error { lnrpc.RegisterLightningServer(grpcServer, server.rpcServer) // Finally, start the grpc server listening for HTTP/2 connections. - lis, err := net.Listen("tcp", fmt.Sprintf(":%d", loadedConfig.RPCPort)) + lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", loadedConfig.RPCPort)) if err != nil { fmt.Printf("failed to listen: %v", err) return err diff --git a/peer.go b/peer.go index 2abaefd6..f70fee94 100644 --- a/peer.go +++ b/peer.go @@ -12,6 +12,7 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lndc" + "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" "github.com/roasbeef/btcd/btcec" @@ -639,7 +640,6 @@ func (p *peer) handleLocalClose(req *closeLinkReq) { // when the remote node broadcasts the fully signed closing transaction. sig, txid, err := channel.InitCooperativeClose() if err != nil { - req.resp <- nil req.err <- err return } @@ -653,22 +653,33 @@ func (p *peer) handleLocalClose(req *closeLinkReq) { // TODO(roasbeef): remove encoding redundancy closeSig, err := btcec.ParseSignature(sig, btcec.S256()) if err != nil { - req.resp <- nil req.err <- err return } closeReq := lnwire.NewCloseRequest(chanPoint, closeSig) p.queueMsg(closeReq, nil) + // Update the caller w.r.t the current pending state of this request. + req.updates <- &lnrpc.CloseStatusUpdate{ + Update: &lnrpc.CloseStatusUpdate_ClosePending{ + ClosePending: &lnrpc.PendingUpdate{ + Txid: txid[:], + }, + }, + } + // Finally, launch a goroutine which will request to be notified by the // ChainNotifier once the closure transaction obtains a single // confirmation. go func() { // TODO(roasbeef): add param for num needed confs notifier := p.server.lnwallet.ChainNotifier - confNtfn, _ := notifier.RegisterConfirmationsNtfn(txid, 1) + confNtfn, err := notifier.RegisterConfirmationsNtfn(txid, 1) + if err != nil { + req.err <- err + return + } - var success bool select { case height, ok := <-confNtfn.Confirmed: // In the case that the ChainNotifier is shutting @@ -683,17 +694,24 @@ func (p *peer) handleLocalClose(req *closeLinkReq) { // active indexes, and the database state. peerLog.Infof("ChannelPoint(%v) is now "+ "closed at height %v", key, height) - wipeChannel(p, channel) - - success = true + if err := wipeChannel(p, channel); err != nil { + req.err <- err + return + } case <-p.quit: return } // Respond to the local sub-system which requested the channel // closure. - req.resp <- &closeLinkResp{txid, success} - req.err <- nil + req.updates <- &lnrpc.CloseStatusUpdate{ + Update: &lnrpc.CloseStatusUpdate_ChanClose{ + ChanClose: &lnrpc.ChannelCloseUpdate{ + ClosingTxid: txid[:], + Success: true, + }, + }, + } }() } @@ -741,7 +759,7 @@ func (p *peer) handleRemoteClose(req *lnwire.CloseRequest) { // wipeChannel removes the passed channel from all indexes associated with the // peer, and deletes the channel from the database. -func wipeChannel(p *peer, channel *lnwallet.LightningChannel) { +func wipeChannel(p *peer, channel *lnwallet.LightningChannel) error { chanID := channel.ChannelPoint() delete(p.activeChannels, *chanID) @@ -756,7 +774,10 @@ func wipeChannel(p *peer, channel *lnwallet.LightningChannel) { if err := channel.DeleteState(); err != nil { peerLog.Errorf("Unable to delete ChannelPoint(%v) "+ "from db %v", chanID, err) + return err } + + return nil } // pendingPayment represents a pending HTLC which has yet to be settled by the diff --git a/rpcserver.go b/rpcserver.go index b4d98b03..67a97b0c 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -201,30 +201,43 @@ func (r *rpcServer) OpenChannel(in *lnrpc.OpenChannelRequest, remoteFundingAmt := btcutil.Amount(in.RemoteFundingAmount) target := in.TargetPeerId numConfs := in.NumConfs - respChan, errChan := r.server.OpenChannel(target, localFundingAmt, + updateChan, errChan := r.server.OpenChannel(target, localFundingAmt, remoteFundingAmt, numConfs) - if err := <-errChan; err != nil { - rpcsLog.Errorf("unable to open channel to peerid(%v): %v", - target, err) - return err + + var outpoint wire.OutPoint +out: + for { + select { + case err := <-errChan: + rpcsLog.Errorf("unable to open channel to peerid(%v): %v", + target, err) + return err + case fundingUpdate := <-updateChan: + rpcsLog.Tracef("[openchannel] sending update: %v", + fundingUpdate) + if err := updateStream.Send(fundingUpdate); err != nil { + return err + } + + // If a final channel open update is being sent, then + // we can break out of our recv loop as we no longer + // need to process any further updates. + switch update := fundingUpdate.Update.(type) { + case *lnrpc.OpenStatusUpdate_ChanOpen: + chanPoint := update.ChanOpen.ChannelPoint + h, _ := wire.NewShaHash(chanPoint.FundingTxid) + outpoint = wire.OutPoint{ + Hash: *h, + Index: chanPoint.OutputIndex, + } + + break out + } + case <-r.quit: + return nil + } } - var outpoint *wire.OutPoint - select { - case resp := <-respChan: - outpoint = resp.chanPoint - openUpdate := &lnrpc.ChannelOpenUpdate{ - &lnrpc.ChannelPoint{ - FundingTxid: outpoint.Hash[:], - OutputIndex: outpoint.Index, - }, - } - if err := updateStream.Send(openUpdate); err != nil { - return err - } - case <-r.quit: - return nil - } rpcsLog.Tracef("[openchannel] success peerid(%v), ChannelPoint(%v)", in.TargetPeerId, outpoint) return nil @@ -247,24 +260,33 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest, rpcsLog.Tracef("[closechannel] request for ChannelPoint(%v)", targetChannelPoint) - respChan, errChan := r.server.htlcSwitch.CloseLink(targetChannelPoint) - if err := <-errChan; err != nil { - rpcsLog.Errorf("Unable to close ChannelPoint(%v): %v", - targetChannelPoint, err) - return err - } + updateChan, errChan := r.server.htlcSwitch.CloseLink(targetChannelPoint) - select { - case resp := <-respChan: - closeUpdate := &lnrpc.ChannelCloseUpdate{ - ClosingTxid: resp.txid[:], - Success: resp.success, - } - if err := updateStream.Send(closeUpdate); err != nil { +out: + for { + select { + case err := <-errChan: + rpcsLog.Errorf("[closechannel] unable to close "+ + "ChannelPoint(%v): %v", targetChannelPoint, err) return err + case closingUpdate := <-updateChan: + if err := updateStream.Send(closingUpdate); err != nil { + return err + } + + // If a final channel closing updates is being sent, + // then we can break out of our dispatch loop as we no + // longer need to process any further updates. + switch closeUpdate := closingUpdate.Update.(type) { + case *lnrpc.CloseStatusUpdate_ChanClose: + h, _ := wire.NewShaHash(closeUpdate.ChanClose.ClosingTxid) + rpcsLog.Errorf("[closechannel] close completed: "+ + "txid(%v)", h) + break out + } + case <-r.quit: + return nil } - case <-r.quit: - return nil } return nil @@ -350,6 +372,7 @@ func (r *rpcServer) ListPeers(ctx context.Context, // by the wallet. This method can be modified by having the request specify // only witness outputs should be factored into the final output sum. // TODO(roasbeef): split into total and confirmed/unconfirmed +// TODO(roasbeef): add async hooks into wallet balance changes func (r *rpcServer) WalletBalance(ctx context.Context, in *lnrpc.WalletBalanceRequest) (*lnrpc.WalletBalanceResponse, error) { diff --git a/server.go b/server.go index a051c712..29b9869d 100644 --- a/server.go +++ b/server.go @@ -10,14 +10,13 @@ import ( "github.com/btcsuite/fastsha256" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lndc" + "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" "github.com/roasbeef/btcd/btcec" - "github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcutil" "github.com/BitfuryLightning/tools/routing" - "github.com/BitfuryLightning/tools/rt" "github.com/BitfuryLightning/tools/rt/graph" "github.com/roasbeef/btcwallet/waddrmgr" ) @@ -229,14 +228,8 @@ type openChanReq struct { numConfs uint32 - resp chan *openChanResp - err chan error -} - -// openChanResp is the response to an openChanReq, it contains the channel -// point, or outpoint of the broadcast funding transaction. -type openChanResp struct { - chanPoint *wire.OutPoint + updates chan *lnrpc.OpenStatusUpdate + err chan error } // queryHandler handles any requests to modify the server's internal state of @@ -389,7 +382,6 @@ func (s *server) handleOpenChanReq(req *openChanReq) { } if targetPeer == nil { - req.resp <- nil req.err <- fmt.Errorf("unable to find peer %v", target) return } @@ -398,23 +390,8 @@ func (s *server) handleOpenChanReq(req *openChanReq) { // manager. This allows the server to continue handling queries instead of // blocking on this request which is exporeted as a synchronous request to // the outside world. - go func() { - // TODO(roasbeef): server semaphore to restrict num goroutines - fundingID, err := s.fundingMgr.initFundingWorkflow(targetPeer, req) - if err == nil { - capacity := float64(req.localFundingAmt + req.remoteFundingAmt) - s.routingMgr.AddChannel( - graph.NewID(s.lightningID), - graph.NewID([32]byte(targetPeer.lightningID)), - graph.NewEdgeID(fundingID.String()), - &rt.ChannelInfo{ - Cpt: capacity, - }, - ) - } - req.resp <- &openChanResp{fundingID} - req.err <- err - }() + // TODO(roasbeef): server semaphore to restrict num goroutines + go s.fundingMgr.initFundingWorkflow(targetPeer, req) } // ConnectToPeer requests that the server connect to a Lightning Network peer @@ -432,10 +409,10 @@ func (s *server) ConnectToPeer(addr *lndc.LNAdr) (int32, error) { // OpenChannel sends a request to the server to open a channel to the specified // peer identified by ID with the passed channel funding paramters. func (s *server) OpenChannel(nodeID int32, localAmt, remoteAmt btcutil.Amount, - numConfs uint32) (chan *openChanResp, chan error) { + numConfs uint32) (chan *lnrpc.OpenStatusUpdate, chan error) { errChan := make(chan error, 1) - respChan := make(chan *openChanResp, 1) + updateChan := make(chan *lnrpc.OpenStatusUpdate, 1) s.queries <- &openChanReq{ targetNodeID: nodeID, @@ -443,12 +420,11 @@ func (s *server) OpenChannel(nodeID int32, localAmt, remoteAmt btcutil.Amount, remoteFundingAmt: remoteAmt, numConfs: numConfs, - resp: respChan, - err: errChan, + updates: updateChan, + err: errChan, } - // TODO(roasbeef): hook in "progress" channel - return respChan, errChan + return updateChan, errChan } // Peers returns a slice of all active peers.