lnd: use asynchronous streaming responses for [open|close]channel RPC's

This commit switches the implementation of the open/close channel RPC’s
from a fully blocking synchronous model to one that’s async by default,
allowing callers to add a sync wrapper.

The new proto specs also allow for “updates” for the pending channels
in the form of new confirmations which progress the pending status of
the channel. At this point, only the final open/close updates have been
implemented. Obtaining confirmation notifications requires a bit of
re-working within the current ChainNotifier interface, thus this has
been deferred to a later time.
This commit is contained in:
Olaoluwa Osuntokun 2016-07-07 15:30:55 -07:00
parent dd9acfdd4d
commit c0383679d2
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
3 changed files with 51 additions and 33 deletions

@ -555,11 +555,12 @@ func (p *peer) handleLocalClose(req *closeChanReq) {
success = true success = true
case <-p.quit: case <-p.quit:
return
} }
// Respond to the local sub-system which requested the channel // Respond to the local sub-system which requested the channel
// closure. // closure.
req.resp <- &closeChanResp{success} req.resp <- &closeChanResp{txid, success}
req.err <- nil req.err <- nil
}() }()
} }

@ -188,8 +188,8 @@ func (r *rpcServer) ConnectPeer(ctx context.Context,
// OpenChannel attempts to open a singly funded channel specified in the // OpenChannel attempts to open a singly funded channel specified in the
// request to a remote peer. // request to a remote peer.
func (r *rpcServer) OpenChannel(ctx context.Context, func (r *rpcServer) OpenChannel(in *lnrpc.OpenChannelRequest,
in *lnrpc.OpenChannelRequest) (*lnrpc.OpenChannelResponse, error) { updateStream lnrpc.Lightning_OpenChannelServer) error {
rpcsLog.Tracef("[openchannel] request to peerid(%v) "+ rpcsLog.Tracef("[openchannel] request to peerid(%v) "+
"allocation(us=%v, them=%v) numconfs=%v", in.TargetPeerId, "allocation(us=%v, them=%v) numconfs=%v", in.TargetPeerId,
@ -199,50 +199,73 @@ func (r *rpcServer) OpenChannel(ctx context.Context,
remoteFundingAmt := btcutil.Amount(in.RemoteFundingAmount) remoteFundingAmt := btcutil.Amount(in.RemoteFundingAmount)
target := in.TargetPeerId target := in.TargetPeerId
numConfs := in.NumConfs numConfs := in.NumConfs
resp, err := r.server.OpenChannel(target, localFundingAmt, respChan, errChan := r.server.OpenChannel(target, localFundingAmt,
remoteFundingAmt, numConfs) remoteFundingAmt, numConfs)
if err != nil { if err := <-errChan; err != nil {
rpcsLog.Errorf("unable to open channel to peerid(%v): %v", rpcsLog.Errorf("unable to open channel to peerid(%v): %v",
target, err) target, err)
return nil, err return err
} }
rpcsLog.Tracef("[openchannel] success peerid(%v), ChannelPoint(%v)", var outpoint *wire.OutPoint
in.TargetPeerId, resp) select {
case resp := <-respChan:
return &lnrpc.OpenChannelResponse{ outpoint = resp.chanPoint
openUpdate := &lnrpc.ChannelOpenUpdate{
&lnrpc.ChannelPoint{ &lnrpc.ChannelPoint{
FundingTxid: resp.Hash[:], FundingTxid: outpoint.Hash[:],
OutputIndex: resp.Index, OutputIndex: outpoint.Index,
}, },
}, nil }
if err := updateStream.Send(openUpdate); err != nil {
return err
}
case <-r.quit:
return nil
}
rpcsLog.Tracef("[openchannel] success peerid(%v), ChannelPoint(%v)",
in.TargetPeerId, outpoint)
return nil
} }
// CloseChannel attempts to close an active channel identified by its channel // CloseChannel attempts to close an active channel identified by its channel
// point. The actions of this method can additionally be augmented to attempt // point. The actions of this method can additionally be augmented to attempt
// a force close after a timeout period in the case of an inactive peer. // a force close after a timeout period in the case of an inactive peer.
func (r *rpcServer) CloseChannel(ctx context.Context, func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest,
in *lnrpc.CloseChannelRequest) (*lnrpc.CloseChannelResponse, error) { updateStream lnrpc.Lightning_CloseChannelServer) error {
index := in.ChannelPoint.OutputIndex index := in.ChannelPoint.OutputIndex
txid, err := wire.NewShaHash(in.ChannelPoint.FundingTxid) txid, err := wire.NewShaHash(in.ChannelPoint.FundingTxid)
if err != nil { if err != nil {
rpcsLog.Errorf("[closechannel] invalid txid: %v", err) rpcsLog.Errorf("[closechannel] invalid txid: %v", err)
return nil, err return err
} }
targetChannelPoint := wire.NewOutPoint(txid, index) targetChannelPoint := wire.NewOutPoint(txid, index)
rpcsLog.Tracef("[closechannel] request for ChannelPoint(%v)", rpcsLog.Tracef("[closechannel] request for ChannelPoint(%v)",
targetChannelPoint) targetChannelPoint)
resp, err := r.server.CloseChannel(targetChannelPoint) respChan, errChan := r.server.CloseChannel(targetChannelPoint)
if err != nil { if err := <-errChan; err != nil {
rpcsLog.Errorf("Unable to close ChannelPoint(%v): %v", rpcsLog.Errorf("Unable to close ChannelPoint(%v): %v",
targetChannelPoint, err) targetChannelPoint, err)
return nil, err return err
} }
return &lnrpc.CloseChannelResponse{resp}, nil select {
case resp := <-respChan:
closeUpdate := &lnrpc.ChannelCloseUpdate{
ClosingTxid: resp.txid[:],
Success: resp.success,
}
if err := updateStream.Send(closeUpdate); err != nil {
return err
}
case <-r.quit:
return nil
}
return nil
} }
// GetInfo serves a request to the "getinfo" RPC call. This call returns // GetInfo serves a request to the "getinfo" RPC call. This call returns

@ -256,6 +256,7 @@ type closeChanReq struct {
// closeChanResp is the response to a closeChanReq is simply houses a boolean // closeChanResp is the response to a closeChanReq is simply houses a boolean
// value indicating if the channel coopertive channel closure was succesful or not. // value indicating if the channel coopertive channel closure was succesful or not.
type closeChanResp struct { type closeChanResp struct {
txid *wire.ShaHash
success bool success bool
} }
@ -437,7 +438,7 @@ func (s *server) ConnectToPeer(addr *lndc.LNAdr) (int32, error) {
// OpenChannel sends a request to the server to open a channel to the specified // OpenChannel sends a request to the server to open a channel to the specified
// peer identified by ID with the passed channel funding paramters. // peer identified by ID with the passed channel funding paramters.
func (s *server) OpenChannel(nodeID int32, localAmt, remoteAmt btcutil.Amount, func (s *server) OpenChannel(nodeID int32, localAmt, remoteAmt btcutil.Amount,
numConfs uint32) (*wire.OutPoint, error) { numConfs uint32) (chan *openChanResp, chan error) {
errChan := make(chan error, 1) errChan := make(chan error, 1)
respChan := make(chan *openChanResp, 1) respChan := make(chan *openChanResp, 1)
@ -451,17 +452,14 @@ func (s *server) OpenChannel(nodeID int32, localAmt, remoteAmt btcutil.Amount,
resp: respChan, resp: respChan,
err: errChan, err: errChan,
} }
// TODO(roasbeef): hook in "progress" channel
if err := <-errChan; err != nil { return respChan, errChan
return nil, err
}
return (<-respChan).chanPoint, nil
} }
// CloseChannel attempts to close the channel identified by the specified // CloseChannel attempts to close the channel identified by the specified
// outpoint in a coopertaive manner. // outpoint in a coopertaive manner.
func (s *server) CloseChannel(channelPoint *wire.OutPoint) (bool, error) { func (s *server) CloseChannel(channelPoint *wire.OutPoint) (chan *closeChanResp, chan error) {
errChan := make(chan error, 1) errChan := make(chan error, 1)
respChan := make(chan *closeChanResp, 1) respChan := make(chan *closeChanResp, 1)
@ -472,11 +470,7 @@ func (s *server) CloseChannel(channelPoint *wire.OutPoint) (bool, error) {
err: errChan, err: errChan,
} }
if err := <-errChan; err != nil { return respChan, errChan
return false, err
}
return (<-respChan).success, nil
} }
// Peers returns a slice of all active peers. // Peers returns a slice of all active peers.