htlcswitch/link: upgrade to use lnpeer.Peer.SendMessage

This commit is contained in:
Conner Fromknecht 2018-06-07 20:17:15 -07:00
parent 3046ea6c5b
commit 4380c67124
No known key found for this signature in database
GPG Key ID: 39DE78FBE6ACB0EF

@ -15,6 +15,7 @@ import (
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/htlcswitch/hodl" "github.com/lightningnetwork/lnd/htlcswitch/hodl"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcd/chaincfg/chainhash" "github.com/roasbeef/btcd/chaincfg/chainhash"
@ -163,7 +164,7 @@ type ChannelLinkConfig struct {
// Peer is a lightning network node with which we have the channel link // Peer is a lightning network node with which we have the channel link
// opened. // opened.
Peer Peer Peer lnpeer.Peer
// Registry is a sub-system which responsible for managing the invoices // Registry is a sub-system which responsible for managing the invoices
// in thread-safe manner. // in thread-safe manner.
@ -534,7 +535,7 @@ func (l *channelLink) syncChanStates() error {
return fmt.Errorf("unable to generate chan sync message for "+ return fmt.Errorf("unable to generate chan sync message for "+
"ChannelPoint(%v)", l.channel.ChannelPoint()) "ChannelPoint(%v)", l.channel.ChannelPoint())
} }
if err := l.cfg.Peer.SendMessage(localChanSyncMsg, false); err != nil { if err := l.cfg.Peer.SendMessage(false, localChanSyncMsg); err != nil {
return fmt.Errorf("Unable to send chan sync message for "+ return fmt.Errorf("Unable to send chan sync message for "+
"ChannelPoint(%v)", l.channel.ChannelPoint()) "ChannelPoint(%v)", l.channel.ChannelPoint())
} }
@ -576,7 +577,7 @@ func (l *channelLink) syncChanStates() error {
fundingLockedMsg := lnwire.NewFundingLocked( fundingLockedMsg := lnwire.NewFundingLocked(
l.ChanID(), nextRevocation, l.ChanID(), nextRevocation,
) )
err = l.cfg.Peer.SendMessage(fundingLockedMsg, false) err = l.cfg.Peer.SendMessage(false, fundingLockedMsg)
if err != nil { if err != nil {
return fmt.Errorf("unable to re-send "+ return fmt.Errorf("unable to re-send "+
"FundingLocked: %v", err) "FundingLocked: %v", err)
@ -626,7 +627,7 @@ func (l *channelLink) syncChanStates() error {
// immediately so we return to a synchronized state as soon as // immediately so we return to a synchronized state as soon as
// possible. // possible.
for _, msg := range msgsToReSend { for _, msg := range msgsToReSend {
l.cfg.Peer.SendMessage(msg, false) l.cfg.Peer.SendMessage(false, msg)
} }
case <-l.quit: case <-l.quit:
@ -1107,7 +1108,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
l.openedCircuits = append(l.openedCircuits, pkt.inKey()) l.openedCircuits = append(l.openedCircuits, pkt.inKey())
l.keystoneBatch = append(l.keystoneBatch, pkt.keystone()) l.keystoneBatch = append(l.keystoneBatch, pkt.keystone())
l.cfg.Peer.SendMessage(htlc, false) l.cfg.Peer.SendMessage(false, htlc)
case *lnwire.UpdateFulfillHTLC: case *lnwire.UpdateFulfillHTLC:
// If hodl.SettleOutgoing mode is active, we exit early to // If hodl.SettleOutgoing mode is active, we exit early to
@ -1148,7 +1149,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
// Then we send the HTLC settle message to the connected peer // Then we send the HTLC settle message to the connected peer
// so we can continue the propagation of the settle message. // so we can continue the propagation of the settle message.
l.cfg.Peer.SendMessage(htlc, false) l.cfg.Peer.SendMessage(false, htlc)
isSettle = true isSettle = true
case *lnwire.UpdateFailHTLC: case *lnwire.UpdateFailHTLC:
@ -1189,7 +1190,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
// Finally, we send the HTLC message to the peer which // Finally, we send the HTLC message to the peer which
// initially created the HTLC. // initially created the HTLC.
l.cfg.Peer.SendMessage(htlc, false) l.cfg.Peer.SendMessage(false, htlc)
isSettle = true isSettle = true
} }
@ -1342,7 +1343,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
log.Errorf("unable to revoke commitment: %v", err) log.Errorf("unable to revoke commitment: %v", err)
return return
} }
l.cfg.Peer.SendMessage(nextRevocation, false) l.cfg.Peer.SendMessage(false, nextRevocation)
// Since we just revoked our commitment, we may have a new set // Since we just revoked our commitment, we may have a new set
// of HTLC's on our commitment, so we'll send them over our // of HTLC's on our commitment, so we'll send them over our
@ -1561,7 +1562,7 @@ func (l *channelLink) updateCommitTx() error {
CommitSig: theirCommitSig, CommitSig: theirCommitSig,
HtlcSigs: htlcSigs, HtlcSigs: htlcSigs,
} }
l.cfg.Peer.SendMessage(commitSig, false) l.cfg.Peer.SendMessage(false, commitSig)
// We've just initiated a state transition, attempt to stop the // We've just initiated a state transition, attempt to stop the
// logCommitTimer. If the timer already ticked, then we'll consume the // logCommitTimer. If the timer already ticked, then we'll consume the
@ -1585,7 +1586,7 @@ func (l *channelLink) updateCommitTx() error {
// channel link opened. // channel link opened.
// //
// NOTE: Part of the ChannelLink interface. // NOTE: Part of the ChannelLink interface.
func (l *channelLink) Peer() Peer { func (l *channelLink) Peer() lnpeer.Peer {
return l.cfg.Peer return l.cfg.Peer
} }
@ -1852,7 +1853,7 @@ func (l *channelLink) updateChannelFee(feePerKw lnwallet.SatPerKWeight) error {
// We'll then attempt to send a new UpdateFee message, and also lock it // We'll then attempt to send a new UpdateFee message, and also lock it
// in immediately by triggering a commitment update. // in immediately by triggering a commitment update.
msg := lnwire.NewUpdateFee(l.ChanID(), uint32(feePerKw)) msg := lnwire.NewUpdateFee(l.ChanID(), uint32(feePerKw))
if err := l.cfg.Peer.SendMessage(msg, false); err != nil { if err := l.cfg.Peer.SendMessage(false, msg); err != nil {
return err return err
} }
return l.updateCommitTx() return l.updateCommitTx()
@ -2260,11 +2261,11 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
// HTLC was successfully settled locally send // HTLC was successfully settled locally send
// notification about it remote peer. // notification about it remote peer.
l.cfg.Peer.SendMessage(&lnwire.UpdateFulfillHTLC{ l.cfg.Peer.SendMessage(false, &lnwire.UpdateFulfillHTLC{
ChanID: l.ChanID(), ChanID: l.ChanID(),
ID: pd.HtlcIndex, ID: pd.HtlcIndex,
PaymentPreimage: preimage, PaymentPreimage: preimage,
}, false) })
needUpdate = true needUpdate = true
// There are additional channels left within this route. So // There are additional channels left within this route. So
@ -2550,11 +2551,11 @@ func (l *channelLink) sendHTLCError(htlcIndex uint64, failure lnwire.FailureMess
return return
} }
l.cfg.Peer.SendMessage(&lnwire.UpdateFailHTLC{ l.cfg.Peer.SendMessage(false, &lnwire.UpdateFailHTLC{
ChanID: l.ChanID(), ChanID: l.ChanID(),
ID: htlcIndex, ID: htlcIndex,
Reason: reason, Reason: reason,
}, false) })
} }
// sendMalformedHTLCError helper function which sends the malformed HTLC update // sendMalformedHTLCError helper function which sends the malformed HTLC update
@ -2569,12 +2570,12 @@ func (l *channelLink) sendMalformedHTLCError(htlcIndex uint64,
return return
} }
l.cfg.Peer.SendMessage(&lnwire.UpdateFailMalformedHTLC{ l.cfg.Peer.SendMessage(false, &lnwire.UpdateFailMalformedHTLC{
ChanID: l.ChanID(), ChanID: l.ChanID(),
ID: htlcIndex, ID: htlcIndex,
ShaOnionBlob: shaOnionBlob, ShaOnionBlob: shaOnionBlob,
FailureCode: code, FailureCode: code,
}, false) })
} }
// fail is a function which is used to encapsulate the action necessary for // fail is a function which is used to encapsulate the action necessary for