htlcswitch: re-introduce dynamic commitment log tick timer

This commit fixes a slight regression in the logic of the switch by
ensuring that the log commitment timer is only start _after_ we receive
a new commitment signature. Otherwise, the ticker will keep ticking and
possibly settle HTLC’s that’ve yet to be locked in, or waste a
signature causing us to be deprived of a revocation which is required
for us to initiate a new state transition.

Additionally, the commit performs a few minor post-merge clean ups.
This commit is contained in:
Olaoluwa Osuntokun 2017-05-31 16:43:37 -07:00
parent 048e4c0a39
commit 2ab03c57be
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2

@ -27,21 +27,22 @@ type ChannelLinkConfig struct {
// packets to other peer which should handle it. // packets to other peer which should handle it.
Switch *Switch Switch *Switch
// DecodeOnion function responsible for decoding htlc Sphinx onion blob, // DecodeOnion function responsible for decoding htlc Sphinx onion
// and creating hop iterator which will give us next destination of htlc. // blob, and creating hop iterator which will give us next destination
// of htlc.
DecodeOnion func(r io.Reader, meta []byte) (HopIterator, error) DecodeOnion func(r io.Reader, meta []byte) (HopIterator, error)
// Peer is a lightning network node with which we have the channel // Peer is a lightning network node with which we have the channel link
// link opened. // opened.
Peer Peer Peer Peer
// Registry is a sub-system which responsible for managing the // Registry is a sub-system which responsible for managing the invoices
// invoices in thread-safe manner. // in thread-safe manner.
Registry InvoiceDatabase Registry InvoiceDatabase
// SettledContracts is used to notify that a channel has peacefully been // SettledContracts is used to notify that a channel has peacefully
// closed. Once a channel has been closed the other subsystem no longer // been closed. Once a channel has been closed the other subsystem no
// needs to watch for breach closes. // longer needs to watch for breach closes.
SettledContracts chan *wire.OutPoint SettledContracts chan *wire.OutPoint
// DebugHTLC should be turned on if you want all HTLCs sent to a node // DebugHTLC should be turned on if you want all HTLCs sent to a node
@ -60,20 +61,23 @@ type channelLink struct {
// The index of the HTLC within the log is mapped to the cancellation // The index of the HTLC within the log is mapped to the cancellation
// reason. This value is used to thread the proper error through to the // reason. This value is used to thread the proper error through to the
// htlcSwitch, or subsystem that initiated the HTLC. // htlcSwitch, or subsystem that initiated the HTLC.
//
// TODO(andrew.shvv) remove after payment descriptor start store // TODO(andrew.shvv) remove after payment descriptor start store
// htlc cancel reasons. // htlc cancel reasons.
cancelReasons map[uint64]lnwire.OpaqueReason cancelReasons map[uint64]lnwire.OpaqueReason
// blobs tracks the remote log index of the incoming htlc's, // blobs tracks the remote log index of the incoming htlc's, mapped to
// mapped to the htlc onion blob which encapsulates the next hop. // the htlc onion blob which encapsulates the next hop.
//
// TODO(andrew.shvv) remove after payment descriptor start store // TODO(andrew.shvv) remove after payment descriptor start store
// htlc onion blobs. // htlc onion blobs.
blobs map[uint64][lnwire.OnionPacketSize]byte blobs map[uint64][lnwire.OnionPacketSize]byte
// batchCounter is the number of updates which we received from // batchCounter is the number of updates which we received from remote
// remote side, but not include in commitment transaciton yet and plus // side, but not include in commitment transaction yet and plus the
// the current number of settles that have been sent, but not yet // current number of settles that have been sent, but not yet committed
// committed to the commitment. // to the commitment.
//
// TODO(andrew.shvv) remove after we add additional // TODO(andrew.shvv) remove after we add additional
// BatchNumber() method in state machine. // BatchNumber() method in state machine.
batchCounter uint32 batchCounter uint32
@ -86,18 +90,18 @@ type channelLink struct {
// which may affect behaviour of the service. // which may affect behaviour of the service.
cfg *ChannelLinkConfig cfg *ChannelLinkConfig
// queue is used to store the htlc add updates which haven't been // overflowQueue is used to store the htlc add updates which haven't
// processed because of the commitment trancation overflow. // been processed because of the commitment transaction overflow.
queue *packetQueue overflowQueue *packetQueue
// upstream is a channel which responsible for propagating the // upstream is a channel which responsible for propagating the received
// received from remote peer messages, with which we have an opened // from remote peer messages, with which we have an opened channel, to
// channel, to handler function. // handler function.
upstream chan lnwire.Message upstream chan lnwire.Message
// downstream is a channel which responsible for propagating // downstream is a channel which responsible for propagating the
// the received htlc switch packet which are forwarded from anther // received htlc switch packet which are forwarded from anther channel
// channel to the handler function. // to the handler function.
downstream chan *htlcPacket downstream chan *htlcPacket
// control is used to propagate the commands to its handlers. This // control is used to propagate the commands to its handlers. This
@ -105,6 +109,14 @@ type channelLink struct {
// i.e in the main handler loop. // i.e in the main handler loop.
control chan interface{} control chan interface{}
// logCommitTimer is a timer which is sent upon if we go an interval
// without receiving/sending a commitment update. It's role is to
// ensure both chains converge to identical state in a timely manner.
//
// TODO(roasbeef): timer should be >> then RTT
logCommitTimer *time.Timer
logCommitTick <-chan time.Time
started int32 started int32
shutdown int32 shutdown int32
wg sync.WaitGroup wg sync.WaitGroup
@ -116,15 +128,16 @@ func NewChannelLink(cfg *ChannelLinkConfig,
channel *lnwallet.LightningChannel) ChannelLink { channel *lnwallet.LightningChannel) ChannelLink {
return &channelLink{ return &channelLink{
cfg: cfg, cfg: cfg,
channel: channel, channel: channel,
blobs: make(map[uint64][lnwire.OnionPacketSize]byte), blobs: make(map[uint64][lnwire.OnionPacketSize]byte),
upstream: make(chan lnwire.Message), upstream: make(chan lnwire.Message),
downstream: make(chan *htlcPacket), downstream: make(chan *htlcPacket),
control: make(chan interface{}), control: make(chan interface{}),
cancelReasons: make(map[uint64]lnwire.OpaqueReason), cancelReasons: make(map[uint64]lnwire.OpaqueReason),
queue: newWaitingQueue(), logCommitTimer: time.NewTimer(300 * time.Millisecond),
quit: make(chan struct{}), overflowQueue: newWaitingQueue(),
quit: make(chan struct{}),
} }
} }
@ -132,8 +145,9 @@ func NewChannelLink(cfg *ChannelLinkConfig,
// interface. // interface.
var _ ChannelLink = (*channelLink)(nil) var _ ChannelLink = (*channelLink)(nil)
// Start starts all helper goroutines required for the operation of the // Start starts all helper goroutines required for the operation of the channel
// channel link. // link.
//
// NOTE: Part of the ChannelLink interface. // NOTE: Part of the ChannelLink interface.
func (l *channelLink) Start() error { func (l *channelLink) Start() error {
if !atomic.CompareAndSwapInt32(&l.started, 0, 1) { if !atomic.CompareAndSwapInt32(&l.started, 0, 1) {
@ -151,6 +165,7 @@ func (l *channelLink) Start() error {
// Stop gracefully stops all active helper goroutines, then waits until they've // Stop gracefully stops all active helper goroutines, then waits until they've
// exited. // exited.
//
// NOTE: Part of the ChannelLink interface. // NOTE: Part of the ChannelLink interface.
func (l *channelLink) Stop() { func (l *channelLink) Stop() {
if !atomic.CompareAndSwapInt32(&l.shutdown, 0, 1) { if !atomic.CompareAndSwapInt32(&l.shutdown, 0, 1) {
@ -172,7 +187,8 @@ func (l *channelLink) Stop() {
// htlc packets to the switch. Additionally, the this goroutine handles acting // htlc packets to the switch. Additionally, the this goroutine handles acting
// upon all timeouts for any active HTLCs, manages the channel's revocation // upon all timeouts for any active HTLCs, manages the channel's revocation
// window, and also the htlc trickle queue+timer for this active channels. // window, and also the htlc trickle queue+timer for this active channels.
// NOTE: Should be started as goroutine. //
// NOTE: This MUST be run as a goroutine.
func (l *channelLink) htlcHandler() { func (l *channelLink) htlcHandler() {
defer l.wg.Done() defer l.wg.Done()
@ -198,8 +214,8 @@ func (l *channelLink) htlcHandler() {
batchTimer := time.NewTicker(50 * time.Millisecond) batchTimer := time.NewTicker(50 * time.Millisecond)
defer batchTimer.Stop() defer batchTimer.Stop()
logCommitTimer := time.NewTicker(100 * time.Millisecond) // TODO(roasbeef): fail chan in case of protocol violation
defer logCommitTimer.Stop()
out: out:
for { for {
select { select {
@ -222,7 +238,7 @@ out:
l.channel.ChannelPoint(), l.cfg.Peer.ID()) l.channel.ChannelPoint(), l.cfg.Peer.ID())
break out break out
case <-logCommitTimer.C: case <-l.logCommitTick:
// If we haven't sent or received a new commitment // If we haven't sent or received a new commitment
// update in some time, check to see if we have any // update in some time, check to see if we have any
// pending updates we need to commit due to our // pending updates we need to commit due to our
@ -257,29 +273,33 @@ out:
break out break out
} }
// Previously add update have been added to the reprocessing // A packet that previously overflowed the commitment
// queue because of the overflooding threat, and now we are // transaction is now eligible for processing once again. So
// trying to process it again. // we'll attempt to re-process the packet in order to allow it
case packet := <-l.queue.pending: // to continue propagating within the network.
case packet := <-l.overflowQueue.pending:
msg := packet.htlc.(*lnwire.UpdateAddHTLC) msg := packet.htlc.(*lnwire.UpdateAddHTLC)
log.Infof("Reprocess downstream add update "+ log.Tracef("Reprocessing downstream add update "+
"with payment hash(%v)", "with payment hash(%v)",
hex.EncodeToString(msg.PaymentHash[:])) hex.EncodeToString(msg.PaymentHash[:]))
l.handleDownStreamPkt(packet) l.handleDownStreamPkt(packet)
case pkt := <-l.downstream: case pkt := <-l.downstream:
// If we have non empty processing queue than in // If we have non empty processing queue then in order
// order to preserve the order of add updates // we'll add this to the overflow rather than
// consume it, and process it later. // processing it directly. Once an active HTLC is
// either settled or failed, then we'll free up a new
// slot.
htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC) htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC)
if ok && l.queue.length() != 0 { if ok && l.overflowQueue.length() != 0 {
log.Infof("Downstream htlc add update with "+ log.Infof("Downstream htlc add update with "+
"payment hash(%v) have been added to "+ "payment hash(%v) have been added to "+
"reprocessing queue, batch: %v", "reprocessing queue, batch: %v",
hex.EncodeToString(htlc.PaymentHash[:]), hex.EncodeToString(htlc.PaymentHash[:]),
l.batchCounter) l.batchCounter)
l.queue.consume(pkt) l.overflowQueue.consume(pkt)
continue continue
} }
l.handleDownStreamPkt(pkt) l.handleDownStreamPkt(pkt)
@ -309,10 +329,9 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket) {
var isSettle bool var isSettle bool
switch htlc := pkt.htlc.(type) { switch htlc := pkt.htlc.(type) {
case *lnwire.UpdateAddHTLC: case *lnwire.UpdateAddHTLC:
// A new payment has been initiated via the // A new payment has been initiated via the downstream channel,
// downstream channel, so we add the new HTLC // so we add the new HTLC to our local log, then update the
// to our local log, then update the commitment // commitment chains.
// chains.
htlc.ChanID = l.ChanID() htlc.ChanID = l.ChanID()
index, err := l.channel.AddHTLC(htlc) index, err := l.channel.AddHTLC(htlc)
if err == lnwallet.ErrMaxHTLCNumber { if err == lnwallet.ErrMaxHTLCNumber {
@ -321,12 +340,9 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket) {
"reprocessing queue, batch: %v", "reprocessing queue, batch: %v",
hex.EncodeToString(htlc.PaymentHash[:]), hex.EncodeToString(htlc.PaymentHash[:]),
l.batchCounter) l.batchCounter)
l.queue.consume(pkt) l.overflowQueue.consume(pkt)
return return
} else if err != nil { } else if err != nil {
// TODO: possibly perform fallback/retry logic
// depending on type of error
// The HTLC was unable to be added to the state // The HTLC was unable to be added to the state
// machine, as a result, we'll signal the switch to // machine, as a result, we'll signal the switch to
// cancel the pending payment. // cancel the pending payment.
@ -339,7 +355,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket) {
err) err)
return return
} }
log.Tracef("Receive downstream htlc with payment hash"+ log.Tracef("Received downstream htlc with payment hash"+
"(%v), assign the index: %v, batch: %v", "(%v), assign the index: %v, batch: %v",
hex.EncodeToString(htlc.PaymentHash[:]), hex.EncodeToString(htlc.PaymentHash[:]),
index, l.batchCounter+1) index, l.batchCounter+1)
@ -399,7 +415,6 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket) {
// If this newly added update exceeds the min batch size for adds, or // If this newly added update exceeds the min batch size for adds, or
// this is a settle request, then initiate an update. // this is a settle request, then initiate an update.
// TODO(roasbeef): enforce max HTLCs in flight limit
if l.batchCounter >= 10 || isSettle { if l.batchCounter >= 10 || isSettle {
if err := l.updateCommitTx(); err != nil { if err := l.updateCommitTx(); err != nil {
log.Errorf("unable to update "+ log.Errorf("unable to update "+
@ -420,7 +435,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
case *lnwire.UpdateAddHTLC: case *lnwire.UpdateAddHTLC:
// We just received an add request from an upstream peer, so we // We just received an add request from an upstream peer, so we
// add it to our state machine, then add the HTLC to our // add it to our state machine, then add the HTLC to our
// "settle" list in the event that we know the preimage // "settle" list in the event that we know the preimage.
index, err := l.channel.ReceiveHTLC(msg) index, err := l.channel.ReceiveHTLC(msg)
if err != nil { if err != nil {
log.Errorf("unable to handle upstream add HTLC: %v", log.Errorf("unable to handle upstream add HTLC: %v",
@ -436,8 +451,8 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
// * time-lock is sane, fee, chain, etc // * time-lock is sane, fee, chain, etc
// Store the onion blob which encapsulate the htlc route and // Store the onion blob which encapsulate the htlc route and
// use in on stage of htlc inclusion to retrieve the // use in on stage of htlc inclusion to retrieve the next hope
// next hope and propagate the htlc farther. // and propagate the htlc farther.
l.blobs[index] = msg.OnionBlob l.blobs[index] = msg.OnionBlob
case *lnwire.UpdateFufillHTLC: case *lnwire.UpdateFufillHTLC:
@ -467,6 +482,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
case *lnwire.CommitSig: case *lnwire.CommitSig:
// We just received a new update to our local commitment chain, // We just received a new update to our local commitment chain,
// validate this new commitment, closing the link if invalid. // validate this new commitment, closing the link if invalid.
//
// TODO(roasbeef): redundant re-serialization // TODO(roasbeef): redundant re-serialization
sig := msg.CommitSig.Serialize() sig := msg.CommitSig.Serialize()
if err := l.channel.ReceiveNewCommitment(sig); err != nil { if err := l.channel.ReceiveNewCommitment(sig); err != nil {
@ -485,6 +501,21 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
} }
l.cfg.Peer.SendMessage(nextRevocation) l.cfg.Peer.SendMessage(nextRevocation)
// As we've just received a commitment signature, we'll
// re-start the log commit timer to wake up the main processing
// loop to check if we need to send a commitment signature as
// we owe one.
//
// TODO(roasbeef): instead after revocation?
if !l.logCommitTimer.Stop() {
select {
case <-l.logCommitTimer.C:
default:
}
}
l.logCommitTimer.Reset(300 * time.Millisecond)
l.logCommitTick = l.logCommitTimer.C
// If both commitment chains are fully synced from our PoV, // If both commitment chains are fully synced from our PoV,
// then we don't need to reply with a signature as both sides // then we don't need to reply with a signature as both sides
// already have a commitment with the latest accepted l. // already have a commitment with the latest accepted l.
@ -512,10 +543,10 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
return return
} }
// After we treat HTLCs as included in both // After we treat HTLCs as included in both remote/local
// remote/local commitment transactions they might be // commitment transactions they might be safely propagated over
// safely propagated over htlc switch or settled if our node was // htlc switch or settled if our node was last node in htlc
// last node in htlc path. // path.
htlcsToForward := l.processLockedInHtlcs(htlcs) htlcsToForward := l.processLockedInHtlcs(htlcs)
go func() { go func() {
for _, packet := range htlcsToForward { for _, packet := range htlcsToForward {
@ -554,19 +585,34 @@ func (l *channelLink) updateCommitTx() error {
} }
l.cfg.Peer.SendMessage(commitSig) l.cfg.Peer.SendMessage(commitSig)
// We've just initiated a state transition, attempt to stop the
// logCommitTimer. If the timer already ticked, then we'll consume the
// value, dropping
if l.logCommitTimer != nil && !l.logCommitTimer.Stop() {
select {
case <-l.logCommitTimer.C:
default:
}
}
l.logCommitTick = nil
// Finally, clear our the current batch, so we can accurately make
// further batch flushing decisions.
l.batchCounter = 0 l.batchCounter = 0
return nil return nil
} }
// Peer returns the representation of remote peer with which we // Peer returns the representation of remote peer with which we have the
// have the channel link opened. // channel link opened.
//
// NOTE: Part of the ChannelLink interface. // NOTE: Part of the ChannelLink interface.
func (l *channelLink) Peer() Peer { func (l *channelLink) Peer() Peer {
return l.cfg.Peer return l.cfg.Peer
} }
// ChannelPoint returns the unique identificator of the channel link. // ChannelPoint returns the unique identificator of the channel link.
//
// NOTE: Part of the ChannelLink interface. // NOTE: Part of the ChannelLink interface.
func (l *channelLink) ChanID() lnwire.ChannelID { func (l *channelLink) ChanID() lnwire.ChannelID {
return lnwire.NewChanIDFromOutPoint(l.channel.ChannelPoint()) return lnwire.NewChanIDFromOutPoint(l.channel.ChannelPoint())
@ -577,9 +623,10 @@ type getBandwidthCmd struct {
done chan btcutil.Amount done chan btcutil.Amount
} }
// Bandwidth returns the amount which current link might pass // Bandwidth returns the amount which current link might pass through channel
// through channel link. Execution through control channel gives as // link. Execution through control channel gives as confidence that bandwidth
// confidence that bandwidth will not be changed during function execution. // will not be changed during function execution.
//
// NOTE: Part of the ChannelLink interface. // NOTE: Part of the ChannelLink interface.
func (l *channelLink) Bandwidth() btcutil.Amount { func (l *channelLink) Bandwidth() btcutil.Amount {
command := &getBandwidthCmd{ command := &getBandwidthCmd{
@ -594,15 +641,17 @@ func (l *channelLink) Bandwidth() btcutil.Amount {
} }
} }
// getBandwidth returns the amount which current link might pass // getBandwidth returns the amount which current link might pass through
// through channel link. // channel link.
// NOTE: Should be use inside main goroutine only, otherwise the result might //
// be accurate. // NOTE: Should be used inside main goroutine only, otherwise the result might
// not be accurate.
func (l *channelLink) getBandwidth() btcutil.Amount { func (l *channelLink) getBandwidth() btcutil.Amount {
return l.channel.LocalAvailableBalance() - l.queue.pendingAmount() return l.channel.LocalAvailableBalance() - l.overflowQueue.pendingAmount()
} }
// Stats return the statistics of channel link. // Stats returns the statistics of channel link.
//
// NOTE: Part of the ChannelLink interface. // NOTE: Part of the ChannelLink interface.
func (l *channelLink) Stats() (uint64, btcutil.Amount, btcutil.Amount) { func (l *channelLink) Stats() (uint64, btcutil.Amount, btcutil.Amount) {
snapshot := l.channel.StateSnapshot() snapshot := l.channel.StateSnapshot()
@ -612,14 +661,16 @@ func (l *channelLink) Stats() (uint64, btcutil.Amount, btcutil.Amount) {
} }
// String returns the string representation of channel link. // String returns the string representation of channel link.
//
// NOTE: Part of the ChannelLink interface. // NOTE: Part of the ChannelLink interface.
func (l *channelLink) String() string { func (l *channelLink) String() string {
return l.channel.ChannelPoint().String() return l.channel.ChannelPoint().String()
} }
// HandleSwitchPacket handles the switch packets. This packets which might // HandleSwitchPacket handles the switch packets. This packets which might be
// be forwarded to us from another channel link in case the htlc update came // forwarded to us from another channel link in case the htlc update came from
// from another peer or if the update was created by user // another peer or if the update was created by user
//
// NOTE: Part of the ChannelLink interface. // NOTE: Part of the ChannelLink interface.
func (l *channelLink) HandleSwitchPacket(packet *htlcPacket) { func (l *channelLink) HandleSwitchPacket(packet *htlcPacket) {
select { select {
@ -628,8 +679,9 @@ func (l *channelLink) HandleSwitchPacket(packet *htlcPacket) {
} }
} }
// HandleChannelUpdate handles the htlc requests as settle/add/fail which // HandleChannelUpdate handles the htlc requests as settle/add/fail which sent
// sent to us from remote peer we have a channel with. // to us from remote peer we have a channel with.
//
// NOTE: Part of the ChannelLink interface. // NOTE: Part of the ChannelLink interface.
func (l *channelLink) HandleChannelUpdate(message lnwire.Message) { func (l *channelLink) HandleChannelUpdate(message lnwire.Message) {
select { select {
@ -639,8 +691,8 @@ func (l *channelLink) HandleChannelUpdate(message lnwire.Message) {
} }
// processLockedInHtlcs function is used to proceed the HTLCs which was // processLockedInHtlcs function is used to proceed the HTLCs which was
// designated as eligible for forwarding. But not all htlc will be // designated as eligible for forwarding. But not all htlc will be forwarder,
// forwarder, if htlc reached its final destination that we should settle it. // if htlc reached its final destination that we should settle it.
func (l *channelLink) processLockedInHtlcs( func (l *channelLink) processLockedInHtlcs(
paymentDescriptors []*lnwallet.PaymentDescriptor) []*htlcPacket { paymentDescriptors []*lnwallet.PaymentDescriptor) []*htlcPacket {
@ -653,7 +705,7 @@ func (l *channelLink) processLockedInHtlcs(
switch pd.EntryType { switch pd.EntryType {
case lnwallet.Settle: case lnwallet.Settle:
// forward message to switch which will decide does // Forward message to switch which will decide does
// this peer is the final destination of htlc and we // this peer is the final destination of htlc and we
// should notify user about successful income or it // should notify user about successful income or it
// should be propagated back to the origin peer. // should be propagated back to the origin peer.
@ -662,22 +714,22 @@ func (l *channelLink) processLockedInHtlcs(
&lnwire.UpdateFufillHTLC{ &lnwire.UpdateFufillHTLC{
PaymentPreimage: pd.RPreimage, PaymentPreimage: pd.RPreimage,
}, pd.RHash, pd.Amount)) }, pd.RHash, pd.Amount))
l.queue.release() l.overflowQueue.release()
case lnwallet.Fail: case lnwallet.Fail:
opaqueReason := l.cancelReasons[pd.ParentIndex] opaqueReason := l.cancelReasons[pd.ParentIndex]
// forward message to switch which will decide does // Forward message to switch which will decide does
// this peer is the final destination of htlc and we // this peer is the final destination of htlc and we
// should notify user about fail income or it // should notify user about fail income or it should be
// should be propagated back to the origin peer. // propagated back to the origin peer.
packetsToForward = append(packetsToForward, packetsToForward = append(packetsToForward,
newFailPacket(l.ChanID(), newFailPacket(l.ChanID(),
&lnwire.UpdateFailHTLC{ &lnwire.UpdateFailHTLC{
Reason: opaqueReason, Reason: opaqueReason,
ChanID: l.ChanID(), ChanID: l.ChanID(),
}, pd.RHash, pd.Amount)) }, pd.RHash, pd.Amount))
l.queue.release() l.overflowQueue.release()
case lnwallet.Add: case lnwallet.Add:
blob := l.blobs[pd.Index] blob := l.blobs[pd.Index]
@ -685,9 +737,10 @@ func (l *channelLink) processLockedInHtlcs(
delete(l.blobs, pd.Index) delete(l.blobs, pd.Index)
// Before adding the new htlc to the state machine, // Before adding the new htlc to the state machine,
// parse the onion object in order to obtain the routing // parse the onion object in order to obtain the
// information with DecodeOnion function which process // routing information with DecodeOnion function which
// the Sphinx packet. // process the Sphinx packet.
//
// We include the payment hash of the htlc as it's // We include the payment hash of the htlc as it's
// authenticated within the Sphinx packet itself as // authenticated within the Sphinx packet itself as
// associated data in order to thwart attempts a replay // associated data in order to thwart attempts a replay
@ -706,10 +759,13 @@ func (l *channelLink) processLockedInHtlcs(
} }
if nextChan := chanIterator.Next(); nextChan != nil { if nextChan := chanIterator.Next(); nextChan != nil {
// There are additional channels left within this // There are additional channels left within
// route. // this route.
var b bytes.Buffer var (
var blob [lnwire.OnionPacketSize]byte b bytes.Buffer
blob [lnwire.OnionPacketSize]byte
)
err := chanIterator.Encode(&b) err := chanIterator.Encode(&b)
if err != nil { if err != nil {
log.Errorf("unable to encode the "+ log.Errorf("unable to encode the "+
@ -744,11 +800,12 @@ func (l *channelLink) processLockedInHtlcs(
continue continue
} }
// If we're not currently in debug mode, and the // If we're not currently in debug mode, and
// extended htlc doesn't meet the value requested, // the extended htlc doesn't meet the value
// then we'll fail the htlc. Otherwise, we settle // requested, then we'll fail the htlc.
// this htlc within our local state update log, // Otherwise, we settle this htlc within our
// then send the update entry to the remote party. // local state update log, then send the update
// entry to the remote party.
if !l.cfg.DebugHTLC && pd.Amount < invoice.Terms.Value { if !l.cfg.DebugHTLC && pd.Amount < invoice.Terms.Value {
log.Errorf("rejecting htlc due to incorrect "+ log.Errorf("rejecting htlc due to incorrect "+
"amount: expected %v, received %v", "amount: expected %v, received %v",
@ -767,8 +824,8 @@ func (l *channelLink) processLockedInHtlcs(
return nil return nil
} }
// Notify the invoiceRegistry of the invoices we // Notify the invoiceRegistry of the invoices
// just settled with this latest commitment // we just settled with this latest commitment
// update. // update.
err = l.cfg.Registry.SettleInvoice(invoiceHash) err = l.cfg.Registry.SettleInvoice(invoiceHash)
if err != nil { if err != nil {
@ -777,7 +834,7 @@ func (l *channelLink) processLockedInHtlcs(
return nil return nil
} }
// htlc was successfully settled locally send // HTLC was successfully settled locally send
// notification about it remote peer. // notification about it remote peer.
l.cfg.Peer.SendMessage(&lnwire.UpdateFufillHTLC{ l.cfg.Peer.SendMessage(&lnwire.UpdateFufillHTLC{
ChanID: l.ChanID(), ChanID: l.ChanID(),
@ -791,8 +848,8 @@ func (l *channelLink) processLockedInHtlcs(
if needUpdate { if needUpdate {
// With all the settle/cancel updates added to the local and // With all the settle/cancel updates added to the local and
// remote htlc logs, initiate a state transition by updating the // remote htlc logs, initiate a state transition by updating
// remote commitment chain. // the remote commitment chain.
if err := l.updateCommitTx(); err != nil { if err := l.updateCommitTx(); err != nil {
log.Errorf("unable to update commitment: %v", err) log.Errorf("unable to update commitment: %v", err)
l.cfg.Peer.Disconnect() l.cfg.Peer.Disconnect()