multi: exporting PendingUpdate, ChannelCloseUpdate, ErrorBufferSize

This commit is contained in:
nsa 2020-06-26 21:04:15 -04:00
parent ac3d416b04
commit 0dda9e06ce
3 changed files with 26 additions and 26 deletions

32
peer.go
View File

@ -58,8 +58,8 @@ const (
// this struct. // this struct.
outgoingQueueLen = 50 outgoingQueueLen = 50
// errorBufferSize is the number of historic peer errors that we store. // ErrorBufferSize is the number of historic peer errors that we store.
errorBufferSize = 10 ErrorBufferSize = 10
) )
// outgoingMsg packages an lnwire.Message to be sent out on the wire, along with // outgoingMsg packages an lnwire.Message to be sent out on the wire, along with
@ -87,23 +87,23 @@ type closeMsg struct {
msg lnwire.Message msg lnwire.Message
} }
// pendingUpdate describes the pending state of a closing channel. // PendingUpdate describes the pending state of a closing channel.
type pendingUpdate struct { type PendingUpdate struct {
Txid []byte Txid []byte
OutputIndex uint32 OutputIndex uint32
} }
// channelCloseUpdate contains the outcome of the close channel operation. // ChannelCloseUpdate contains the outcome of the close channel operation.
type channelCloseUpdate struct { type ChannelCloseUpdate struct {
ClosingTxid []byte ClosingTxid []byte
Success bool Success bool
} }
// timestampedError is a timestamped error that is used to store the most recent // TimestampedError is a timestamped error that is used to store the most recent
// errors we have experienced with our peers. // errors we have experienced with our peers.
type timestampedError struct { type TimestampedError struct {
error error Error error
timestamp time.Time Timestamp time.Time
} }
// peer is an active peer on the Lightning Network. This struct is responsible // peer is an active peer on the Lightning Network. This struct is responsible
@ -1347,7 +1347,7 @@ func (p *peer) storeError(err error) {
} }
p.errorBuffer.Add( p.errorBuffer.Add(
&timestampedError{timestamp: time.Now(), error: err}, &TimestampedError{Timestamp: time.Now(), Error: err},
) )
} }
@ -2562,18 +2562,18 @@ func (p *peer) finalizeChanClosure(chanCloser *chancloser.ChanCloser) {
// If this is a locally requested shutdown, update the caller with a // If this is a locally requested shutdown, update the caller with a
// new event detailing the current pending state of this request. // new event detailing the current pending state of this request.
if closeReq != nil { if closeReq != nil {
closeReq.Updates <- &pendingUpdate{ closeReq.Updates <- &PendingUpdate{
Txid: closingTxid[:], Txid: closingTxid[:],
} }
} }
go waitForChanToClose(chanCloser.NegotiationHeight(), notifier, errChan, go WaitForChanToClose(chanCloser.NegotiationHeight(), notifier, errChan,
chanPoint, &closingTxid, closingTx.TxOut[0].PkScript, func() { chanPoint, &closingTxid, closingTx.TxOut[0].PkScript, func() {
// Respond to the local subsystem which requested the // Respond to the local subsystem which requested the
// channel closure. // channel closure.
if closeReq != nil { if closeReq != nil {
closeReq.Updates <- &channelCloseUpdate{ closeReq.Updates <- &ChannelCloseUpdate{
ClosingTxid: closingTxid[:], ClosingTxid: closingTxid[:],
Success: true, Success: true,
} }
@ -2581,12 +2581,12 @@ func (p *peer) finalizeChanClosure(chanCloser *chancloser.ChanCloser) {
}) })
} }
// waitForChanToClose uses the passed notifier to wait until the channel has // WaitForChanToClose uses the passed notifier to wait until the channel has
// been detected as closed on chain and then concludes by executing the // been detected as closed on chain and then concludes by executing the
// following actions: the channel point will be sent over the settleChan, and // following actions: the channel point will be sent over the settleChan, and
// finally the callback will be executed. If any error is encountered within // finally the callback will be executed. If any error is encountered within
// the function, then it will be sent over the errChan. // the function, then it will be sent over the errChan.
func waitForChanToClose(bestHeight uint32, notifier chainntnfs.ChainNotifier, func WaitForChanToClose(bestHeight uint32, notifier chainntnfs.ChainNotifier,
errChan chan error, chanPoint *wire.OutPoint, errChan chan error, chanPoint *wire.OutPoint,
closingTxID *chainhash.Hash, closeScript []byte, cb func()) { closingTxID *chainhash.Hash, closeScript []byte, cb func()) {

View File

@ -2105,17 +2105,17 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest,
// With the transaction broadcast, we send our first update to // With the transaction broadcast, we send our first update to
// the client. // the client.
updateChan = make(chan interface{}, 2) updateChan = make(chan interface{}, 2)
updateChan <- &pendingUpdate{ updateChan <- &PendingUpdate{
Txid: closingTxid[:], Txid: closingTxid[:],
} }
errChan = make(chan error, 1) errChan = make(chan error, 1)
notifier := r.server.cc.chainNotifier notifier := r.server.cc.chainNotifier
go waitForChanToClose(uint32(bestHeight), notifier, errChan, chanPoint, go WaitForChanToClose(uint32(bestHeight), notifier, errChan, chanPoint,
&closingTxid, closingTx.TxOut[0].PkScript, func() { &closingTxid, closingTx.TxOut[0].PkScript, func() {
// Respond to the local subsystem which // Respond to the local subsystem which
// requested the channel closure. // requested the channel closure.
updateChan <- &channelCloseUpdate{ updateChan <- &ChannelCloseUpdate{
ClosingTxid: closingTxid[:], ClosingTxid: closingTxid[:],
Success: true, Success: true,
} }
@ -2228,7 +2228,7 @@ out:
// then we can break out of our dispatch loop as we no // then we can break out of our dispatch loop as we no
// longer need to process any further updates. // longer need to process any further updates.
switch closeUpdate := closingUpdate.(type) { switch closeUpdate := closingUpdate.(type) {
case *channelCloseUpdate: case *ChannelCloseUpdate:
h, _ := chainhash.NewHash(closeUpdate.ClosingTxid) h, _ := chainhash.NewHash(closeUpdate.ClosingTxid)
rpcsLog.Infof("[closechannel] close completed: "+ rpcsLog.Infof("[closechannel] close completed: "+
"txid(%v)", h) "txid(%v)", h)
@ -2246,7 +2246,7 @@ func createRPCCloseUpdate(update interface{}) (
*lnrpc.CloseStatusUpdate, error) { *lnrpc.CloseStatusUpdate, error) {
switch u := update.(type) { switch u := update.(type) {
case *channelCloseUpdate: case *ChannelCloseUpdate:
return &lnrpc.CloseStatusUpdate{ return &lnrpc.CloseStatusUpdate{
Update: &lnrpc.CloseStatusUpdate_ChanClose{ Update: &lnrpc.CloseStatusUpdate_ChanClose{
ChanClose: &lnrpc.ChannelCloseUpdate{ ChanClose: &lnrpc.ChannelCloseUpdate{
@ -2254,7 +2254,7 @@ func createRPCCloseUpdate(update interface{}) (
}, },
}, },
}, nil }, nil
case *pendingUpdate: case *PendingUpdate:
return &lnrpc.CloseStatusUpdate{ return &lnrpc.CloseStatusUpdate{
Update: &lnrpc.CloseStatusUpdate_ClosePending{ Update: &lnrpc.CloseStatusUpdate_ClosePending{
ClosePending: &lnrpc.PendingUpdate{ ClosePending: &lnrpc.PendingUpdate{
@ -2601,11 +2601,11 @@ func (r *rpcServer) ListPeers(ctx context.Context,
// Add the relevant peer errors to our response. // Add the relevant peer errors to our response.
for _, error := range peerErrors { for _, error := range peerErrors {
tsError := error.(*timestampedError) tsError := error.(*TimestampedError)
rpcErr := &lnrpc.TimestampedError{ rpcErr := &lnrpc.TimestampedError{
Timestamp: uint64(tsError.timestamp.Unix()), Timestamp: uint64(tsError.Timestamp.Unix()),
Error: tsError.error.Error(), Error: tsError.Error.Error(),
} }
peer.Errors = append(peer.Errors, rpcErr) peer.Errors = append(peer.Errors, rpcErr)

View File

@ -2792,7 +2792,7 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
errBuffer, ok := s.peerErrors[pkStr] errBuffer, ok := s.peerErrors[pkStr]
if !ok { if !ok {
var err error var err error
errBuffer, err = queue.NewCircularBuffer(errorBufferSize) errBuffer, err = queue.NewCircularBuffer(ErrorBufferSize)
if err != nil { if err != nil {
srvrLog.Errorf("unable to create peer %v", err) srvrLog.Errorf("unable to create peer %v", err)
return return