Merge pull request #2927 from joostjager/hodl-drop-fix-better
htlcswitch: fix empty commit sig and no commit sig
This commit is contained in:
commit
86bed393f9
@ -291,15 +291,6 @@ type channelLink struct {
|
|||||||
// sure we don't process any more updates.
|
// sure we don't process any more updates.
|
||||||
failed bool
|
failed bool
|
||||||
|
|
||||||
// batchCounter is the number of updates which we received from remote
|
|
||||||
// side, but not include in commitment transaction yet and plus the
|
|
||||||
// current number of settles that have been sent, but not yet committed
|
|
||||||
// to the commitment.
|
|
||||||
//
|
|
||||||
// TODO(andrew.shvv) remove after we add additional BatchNumber()
|
|
||||||
// method in state machine.
|
|
||||||
batchCounter uint32
|
|
||||||
|
|
||||||
// keystoneBatch represents a volatile list of keystones that must be
|
// keystoneBatch represents a volatile list of keystones that must be
|
||||||
// written before attempting to sign the next commitment txn. These
|
// written before attempting to sign the next commitment txn. These
|
||||||
// represent all the HTLC's forwarded to the link from the switch. Once
|
// represent all the HTLC's forwarded to the link from the switch. Once
|
||||||
@ -353,14 +344,6 @@ type channelLink struct {
|
|||||||
// sub-systems with the latest set of active HTLC's on our channel.
|
// sub-systems with the latest set of active HTLC's on our channel.
|
||||||
htlcUpdates chan *contractcourt.ContractUpdate
|
htlcUpdates chan *contractcourt.ContractUpdate
|
||||||
|
|
||||||
// logCommitTimer is a timer which is sent upon if we go an interval
|
|
||||||
// without receiving/sending a commitment update. It's role is to
|
|
||||||
// ensure both chains converge to identical state in a timely manner.
|
|
||||||
//
|
|
||||||
// TODO(roasbeef): timer should be >> then RTT
|
|
||||||
logCommitTimer *time.Timer
|
|
||||||
logCommitTick <-chan time.Time
|
|
||||||
|
|
||||||
// updateFeeTimer is the timer responsible for updating the link's
|
// updateFeeTimer is the timer responsible for updating the link's
|
||||||
// commitment fee every time it fires.
|
// commitment fee every time it fires.
|
||||||
updateFeeTimer *time.Timer
|
updateFeeTimer *time.Timer
|
||||||
@ -406,13 +389,12 @@ func NewChannelLink(cfg ChannelLinkConfig,
|
|||||||
channel: channel,
|
channel: channel,
|
||||||
shortChanID: channel.ShortChanID(),
|
shortChanID: channel.ShortChanID(),
|
||||||
// TODO(roasbeef): just do reserve here?
|
// TODO(roasbeef): just do reserve here?
|
||||||
logCommitTimer: time.NewTimer(300 * time.Millisecond),
|
overflowQueue: newPacketQueue(input.MaxHTLCNumber / 2),
|
||||||
overflowQueue: newPacketQueue(input.MaxHTLCNumber / 2),
|
htlcUpdates: make(chan *contractcourt.ContractUpdate),
|
||||||
htlcUpdates: make(chan *contractcourt.ContractUpdate),
|
hodlMap: make(map[channeldb.CircuitKey]hodlHtlc),
|
||||||
hodlMap: make(map[channeldb.CircuitKey]hodlHtlc),
|
hodlQueue: queue.NewConcurrentQueue(10),
|
||||||
hodlQueue: queue.NewConcurrentQueue(10),
|
log: build.NewPrefixLog(logPrefix, log),
|
||||||
log: build.NewPrefixLog(logPrefix, log),
|
quit: make(chan struct{}),
|
||||||
quit: make(chan struct{}),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -742,19 +724,15 @@ func (l *channelLink) resolveFwdPkgs() error {
|
|||||||
|
|
||||||
l.log.Debugf("loaded %d fwd pks", len(fwdPkgs))
|
l.log.Debugf("loaded %d fwd pks", len(fwdPkgs))
|
||||||
|
|
||||||
var needUpdate bool
|
|
||||||
for _, fwdPkg := range fwdPkgs {
|
for _, fwdPkg := range fwdPkgs {
|
||||||
hasUpdate, err := l.resolveFwdPkg(fwdPkg)
|
if err := l.resolveFwdPkg(fwdPkg); err != nil {
|
||||||
if err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
needUpdate = needUpdate || hasUpdate
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// If any of our reprocessing steps require an update to the commitment
|
// If any of our reprocessing steps require an update to the commitment
|
||||||
// txn, we initiate a state transition to capture all relevant changes.
|
// txn, we initiate a state transition to capture all relevant changes.
|
||||||
if needUpdate {
|
if l.channel.PendingLocalUpdateCount() > 0 {
|
||||||
return l.updateCommitTx()
|
return l.updateCommitTx()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -764,7 +742,7 @@ func (l *channelLink) resolveFwdPkgs() error {
|
|||||||
// resolveFwdPkg interprets the FwdState of the provided package, either
|
// resolveFwdPkg interprets the FwdState of the provided package, either
|
||||||
// reprocesses any outstanding htlcs in the package, or performs garbage
|
// reprocesses any outstanding htlcs in the package, or performs garbage
|
||||||
// collection on the package.
|
// collection on the package.
|
||||||
func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) (bool, error) {
|
func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) error {
|
||||||
// Remove any completed packages to clear up space.
|
// Remove any completed packages to clear up space.
|
||||||
if fwdPkg.State == channeldb.FwdStateCompleted {
|
if fwdPkg.State == channeldb.FwdStateCompleted {
|
||||||
l.log.Debugf("removing completed fwd pkg for height=%d",
|
l.log.Debugf("removing completed fwd pkg for height=%d",
|
||||||
@ -774,7 +752,7 @@ func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) (bool, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
l.log.Errorf("unable to remove fwd pkg for height=%d: "+
|
l.log.Errorf("unable to remove fwd pkg for height=%d: "+
|
||||||
"%v", fwdPkg.Height, err)
|
"%v", fwdPkg.Height, err)
|
||||||
return false, err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -793,7 +771,7 @@ func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) (bool, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
l.log.Errorf("unable to process remote log updates: %v",
|
l.log.Errorf("unable to process remote log updates: %v",
|
||||||
err)
|
err)
|
||||||
return false, err
|
return err
|
||||||
}
|
}
|
||||||
l.processRemoteSettleFails(fwdPkg, settleFails)
|
l.processRemoteSettleFails(fwdPkg, settleFails)
|
||||||
}
|
}
|
||||||
@ -802,7 +780,6 @@ func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) (bool, error) {
|
|||||||
// downstream logic is able to filter out any duplicates, but we must
|
// downstream logic is able to filter out any duplicates, but we must
|
||||||
// shove the entire, original set of adds down the pipeline so that the
|
// shove the entire, original set of adds down the pipeline so that the
|
||||||
// batch of adds presented to the sphinx router does not ever change.
|
// batch of adds presented to the sphinx router does not ever change.
|
||||||
var needUpdate bool
|
|
||||||
if !fwdPkg.AckFilter.IsFull() {
|
if !fwdPkg.AckFilter.IsFull() {
|
||||||
adds, err := lnwallet.PayDescsFromRemoteLogUpdates(
|
adds, err := lnwallet.PayDescsFromRemoteLogUpdates(
|
||||||
fwdPkg.Source, fwdPkg.Height, fwdPkg.Adds,
|
fwdPkg.Source, fwdPkg.Height, fwdPkg.Adds,
|
||||||
@ -810,20 +787,20 @@ func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) (bool, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
l.log.Errorf("unable to process remote log updates: %v",
|
l.log.Errorf("unable to process remote log updates: %v",
|
||||||
err)
|
err)
|
||||||
return false, err
|
return err
|
||||||
}
|
}
|
||||||
needUpdate = l.processRemoteAdds(fwdPkg, adds)
|
l.processRemoteAdds(fwdPkg, adds)
|
||||||
|
|
||||||
// If the link failed during processing the adds, we must
|
// If the link failed during processing the adds, we must
|
||||||
// return to ensure we won't attempted to update the state
|
// return to ensure we won't attempted to update the state
|
||||||
// further.
|
// further.
|
||||||
if l.failed {
|
if l.failed {
|
||||||
return false, fmt.Errorf("link failed while " +
|
return fmt.Errorf("link failed while " +
|
||||||
"processing remote adds")
|
"processing remote adds")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return needUpdate, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// fwdPkgGarbager periodically reads all forwarding packages from disk and
|
// fwdPkgGarbager periodically reads all forwarding packages from disk and
|
||||||
@ -1028,11 +1005,14 @@ out:
|
|||||||
break out
|
break out
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the previous event resulted in a non-empty
|
// If the previous event resulted in a non-empty batch, resume
|
||||||
// batch, reinstate the batch ticker so that it can be
|
// the batch ticker so that it can be cleared. Otherwise pause
|
||||||
// cleared.
|
// the ticker to prevent waking up the htlcManager while the
|
||||||
if l.batchCounter > 0 {
|
// batch is empty.
|
||||||
|
if l.channel.PendingLocalUpdateCount() > 0 {
|
||||||
l.cfg.BatchTicker.Resume()
|
l.cfg.BatchTicker.Resume()
|
||||||
|
} else {
|
||||||
|
l.cfg.BatchTicker.Pause()
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
@ -1098,35 +1078,11 @@ out:
|
|||||||
|
|
||||||
break out
|
break out
|
||||||
|
|
||||||
case <-l.logCommitTick:
|
|
||||||
// If we haven't sent or received a new commitment
|
|
||||||
// update in some time, check to see if we have any
|
|
||||||
// pending updates we need to commit due to our
|
|
||||||
// commitment chains being desynchronized.
|
|
||||||
if l.channel.FullySynced() {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := l.updateCommitTx(); err != nil {
|
|
||||||
l.fail(LinkFailureError{code: ErrInternalError},
|
|
||||||
"unable to update commitment: %v", err)
|
|
||||||
break out
|
|
||||||
}
|
|
||||||
|
|
||||||
case <-l.cfg.BatchTicker.Ticks():
|
case <-l.cfg.BatchTicker.Ticks():
|
||||||
// If the current batch is empty, then we have no work
|
// Attempt to extend the remote commitment chain
|
||||||
// here. We also disable the batch ticker from waking up
|
// including all the currently pending entries. If the
|
||||||
// the htlcManager while the batch is empty.
|
// send was unsuccessful, then abandon the update,
|
||||||
if l.batchCounter == 0 {
|
// waiting for the revocation window to open up.
|
||||||
l.cfg.BatchTicker.Pause()
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Otherwise, attempt to extend the remote commitment
|
|
||||||
// chain including all the currently pending entries.
|
|
||||||
// If the send was unsuccessful, then abandon the
|
|
||||||
// update, waiting for the revocation window to open
|
|
||||||
// up.
|
|
||||||
if err := l.updateCommitTx(); err != nil {
|
if err := l.updateCommitTx(); err != nil {
|
||||||
l.fail(LinkFailureError{code: ErrInternalError},
|
l.fail(LinkFailureError{code: ErrInternalError},
|
||||||
"unable to update commitment: %v", err)
|
"unable to update commitment: %v", err)
|
||||||
@ -1156,9 +1112,9 @@ out:
|
|||||||
if ok && l.overflowQueue.Length() != 0 {
|
if ok && l.overflowQueue.Length() != 0 {
|
||||||
l.log.Infof("downstream htlc add update with "+
|
l.log.Infof("downstream htlc add update with "+
|
||||||
"payment hash(%x) have been added to "+
|
"payment hash(%x) have been added to "+
|
||||||
"reprocessing queue, batch_size=%v",
|
"reprocessing queue, pend_updates=%v",
|
||||||
htlc.PaymentHash[:],
|
htlc.PaymentHash[:],
|
||||||
l.batchCounter)
|
l.channel.PendingLocalUpdateCount())
|
||||||
|
|
||||||
l.overflowQueue.AddPkt(pkt)
|
l.overflowQueue.AddPkt(pkt)
|
||||||
continue
|
continue
|
||||||
@ -1236,8 +1192,6 @@ loop:
|
|||||||
func (l *channelLink) processHodlEvent(hodlEvent invoices.HodlEvent,
|
func (l *channelLink) processHodlEvent(hodlEvent invoices.HodlEvent,
|
||||||
htlc hodlHtlc) error {
|
htlc hodlHtlc) error {
|
||||||
|
|
||||||
l.batchCounter++
|
|
||||||
|
|
||||||
circuitKey := hodlEvent.CircuitKey
|
circuitKey := hodlEvent.CircuitKey
|
||||||
|
|
||||||
// Determine required action for the resolution.
|
// Determine required action for the resolution.
|
||||||
@ -1307,9 +1261,9 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
|
|||||||
case lnwallet.ErrMaxHTLCNumber:
|
case lnwallet.ErrMaxHTLCNumber:
|
||||||
l.log.Infof("downstream htlc add update with "+
|
l.log.Infof("downstream htlc add update with "+
|
||||||
"payment hash(%x) have been added to "+
|
"payment hash(%x) have been added to "+
|
||||||
"reprocessing queue, batch: %v",
|
"reprocessing queue, pend_updates: %v",
|
||||||
htlc.PaymentHash[:],
|
htlc.PaymentHash[:],
|
||||||
l.batchCounter)
|
l.channel.PendingLocalUpdateCount())
|
||||||
|
|
||||||
l.overflowQueue.AddPkt(pkt)
|
l.overflowQueue.AddPkt(pkt)
|
||||||
return
|
return
|
||||||
@ -1386,8 +1340,9 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
l.log.Tracef("received downstream htlc: payment_hash=%x, "+
|
l.log.Tracef("received downstream htlc: payment_hash=%x, "+
|
||||||
"local_log_index=%v, batch_size=%v",
|
"local_log_index=%v, pend_updates=%v",
|
||||||
htlc.PaymentHash[:], index, l.batchCounter+1)
|
htlc.PaymentHash[:], index,
|
||||||
|
l.channel.PendingLocalUpdateCount())
|
||||||
|
|
||||||
pkt.outgoingChanID = l.ShortChanID()
|
pkt.outgoingChanID = l.ShortChanID()
|
||||||
pkt.outgoingHTLCID = index
|
pkt.outgoingHTLCID = index
|
||||||
@ -1518,11 +1473,11 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
|
|||||||
isSettle = true
|
isSettle = true
|
||||||
}
|
}
|
||||||
|
|
||||||
l.batchCounter++
|
|
||||||
|
|
||||||
// If this newly added update exceeds the min batch size for adds, or
|
// If this newly added update exceeds the min batch size for adds, or
|
||||||
// this is a settle request, then initiate an update.
|
// this is a settle request, then initiate an update.
|
||||||
if l.batchCounter >= l.cfg.BatchSize || isSettle {
|
if l.channel.PendingLocalUpdateCount() >= uint64(l.cfg.BatchSize) ||
|
||||||
|
isSettle {
|
||||||
|
|
||||||
if err := l.updateCommitTx(); err != nil {
|
if err := l.updateCommitTx(); err != nil {
|
||||||
l.fail(LinkFailureError{code: ErrInternalError},
|
l.fail(LinkFailureError{code: ErrInternalError},
|
||||||
"unable to update commitment: %v", err)
|
"unable to update commitment: %v", err)
|
||||||
@ -1790,25 +1745,10 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// As we've just received a commitment signature, we'll
|
|
||||||
// re-start the log commit timer to wake up the main processing
|
|
||||||
// loop to check if we need to send a commitment signature as
|
|
||||||
// we owe one.
|
|
||||||
//
|
|
||||||
// TODO(roasbeef): instead after revocation?
|
|
||||||
if !l.logCommitTimer.Stop() {
|
|
||||||
select {
|
|
||||||
case <-l.logCommitTimer.C:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
l.logCommitTimer.Reset(300 * time.Millisecond)
|
|
||||||
l.logCommitTick = l.logCommitTimer.C
|
|
||||||
|
|
||||||
// If both commitment chains are fully synced from our PoV,
|
// If both commitment chains are fully synced from our PoV,
|
||||||
// then we don't need to reply with a signature as both sides
|
// then we don't need to reply with a signature as both sides
|
||||||
// already have a commitment with the latest accepted.
|
// already have a commitment with the latest accepted.
|
||||||
if l.channel.FullySynced() {
|
if !l.channel.OweCommitment(true) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1873,7 +1813,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
l.processRemoteSettleFails(fwdPkg, settleFails)
|
l.processRemoteSettleFails(fwdPkg, settleFails)
|
||||||
needUpdate := l.processRemoteAdds(fwdPkg, adds)
|
l.processRemoteAdds(fwdPkg, adds)
|
||||||
|
|
||||||
// If the link failed during processing the adds, we must
|
// If the link failed during processing the adds, we must
|
||||||
// return to ensure we won't attempted to update the state
|
// return to ensure we won't attempted to update the state
|
||||||
@ -1882,7 +1822,14 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if needUpdate {
|
// The revocation window opened up. If there are pending local
|
||||||
|
// updates, try to update the commit tx. Pending updates could
|
||||||
|
// already have been present because of a previously failed
|
||||||
|
// update to the commit tx or freshly added in by
|
||||||
|
// processRemoteAdds. Also in case there are no local updates,
|
||||||
|
// but there are still remote updates that are not in the remote
|
||||||
|
// commit tx yet, send out an update.
|
||||||
|
if l.channel.OweCommitment(true) {
|
||||||
if err := l.updateCommitTx(); err != nil {
|
if err := l.updateCommitTx(); err != nil {
|
||||||
l.fail(LinkFailureError{code: ErrInternalError},
|
l.fail(LinkFailureError{code: ErrInternalError},
|
||||||
"unable to update commitment: %v", err)
|
"unable to update commitment: %v", err)
|
||||||
@ -1996,8 +1943,9 @@ func (l *channelLink) updateCommitTx() error {
|
|||||||
theirCommitSig, htlcSigs, pendingHTLCs, err := l.channel.SignNextCommitment()
|
theirCommitSig, htlcSigs, pendingHTLCs, err := l.channel.SignNextCommitment()
|
||||||
if err == lnwallet.ErrNoWindow {
|
if err == lnwallet.ErrNoWindow {
|
||||||
l.log.Tracef("revocation window exhausted, unable to send: "+
|
l.log.Tracef("revocation window exhausted, unable to send: "+
|
||||||
"%v, dangling_opens=%v, dangling_closes%v",
|
"%v, pend_updates=%v, dangling_closes%v",
|
||||||
l.batchCounter, newLogClosure(func() string {
|
l.channel.PendingLocalUpdateCount(),
|
||||||
|
newLogClosure(func() string {
|
||||||
return spew.Sdump(l.openedCircuits)
|
return spew.Sdump(l.openedCircuits)
|
||||||
}),
|
}),
|
||||||
newLogClosure(func() string {
|
newLogClosure(func() string {
|
||||||
@ -2032,21 +1980,6 @@ func (l *channelLink) updateCommitTx() error {
|
|||||||
}
|
}
|
||||||
l.cfg.Peer.SendMessage(false, commitSig)
|
l.cfg.Peer.SendMessage(false, commitSig)
|
||||||
|
|
||||||
// We've just initiated a state transition, attempt to stop the
|
|
||||||
// logCommitTimer. If the timer already ticked, then we'll consume the
|
|
||||||
// value, dropping
|
|
||||||
if l.logCommitTimer != nil && !l.logCommitTimer.Stop() {
|
|
||||||
select {
|
|
||||||
case <-l.logCommitTimer.C:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
l.logCommitTick = nil
|
|
||||||
|
|
||||||
// Finally, clear our the current batch, so we can accurately make
|
|
||||||
// further batch flushing decisions.
|
|
||||||
l.batchCounter = 0
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2532,7 +2465,7 @@ func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg,
|
|||||||
// whether we are reprocessing as a result of a failure or restart. Adds that
|
// whether we are reprocessing as a result of a failure or restart. Adds that
|
||||||
// have already been acknowledged in the forwarding package will be ignored.
|
// have already been acknowledged in the forwarding package will be ignored.
|
||||||
func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
|
func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
|
||||||
lockedInHtlcs []*lnwallet.PaymentDescriptor) bool {
|
lockedInHtlcs []*lnwallet.PaymentDescriptor) {
|
||||||
|
|
||||||
l.log.Tracef("processing %d remote adds for height %d",
|
l.log.Tracef("processing %d remote adds for height %d",
|
||||||
len(lockedInHtlcs), fwdPkg.Height)
|
len(lockedInHtlcs), fwdPkg.Height)
|
||||||
@ -2571,13 +2504,10 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
|
|||||||
if sphinxErr != nil {
|
if sphinxErr != nil {
|
||||||
l.fail(LinkFailureError{code: ErrInternalError},
|
l.fail(LinkFailureError{code: ErrInternalError},
|
||||||
"unable to decode hop iterators: %v", sphinxErr)
|
"unable to decode hop iterators: %v", sphinxErr)
|
||||||
return false
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var switchPackets []*htlcPacket
|
||||||
needUpdate bool
|
|
||||||
switchPackets []*htlcPacket
|
|
||||||
)
|
|
||||||
|
|
||||||
for i, pd := range lockedInHtlcs {
|
for i, pd := range lockedInHtlcs {
|
||||||
idx := uint16(i)
|
idx := uint16(i)
|
||||||
@ -2614,7 +2544,6 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
|
|||||||
// sender.
|
// sender.
|
||||||
l.sendMalformedHTLCError(pd.HtlcIndex, failureCode,
|
l.sendMalformedHTLCError(pd.HtlcIndex, failureCode,
|
||||||
onionBlob[:], pd.SourceRef)
|
onionBlob[:], pd.SourceRef)
|
||||||
needUpdate = true
|
|
||||||
|
|
||||||
l.log.Errorf("unable to decode onion hop "+
|
l.log.Errorf("unable to decode onion hop "+
|
||||||
"iterator: %v", failureCode)
|
"iterator: %v", failureCode)
|
||||||
@ -2633,7 +2562,6 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
|
|||||||
l.sendMalformedHTLCError(
|
l.sendMalformedHTLCError(
|
||||||
pd.HtlcIndex, failureCode, onionBlob[:], pd.SourceRef,
|
pd.HtlcIndex, failureCode, onionBlob[:], pd.SourceRef,
|
||||||
)
|
)
|
||||||
needUpdate = true
|
|
||||||
|
|
||||||
l.log.Errorf("unable to decode onion "+
|
l.log.Errorf("unable to decode onion "+
|
||||||
"obfuscator: %v", failureCode)
|
"obfuscator: %v", failureCode)
|
||||||
@ -2664,7 +2592,6 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
|
|||||||
lnwire.NewInvalidOnionPayload(failedType, 0),
|
lnwire.NewInvalidOnionPayload(failedType, 0),
|
||||||
obfuscator, pd.SourceRef,
|
obfuscator, pd.SourceRef,
|
||||||
)
|
)
|
||||||
needUpdate = true
|
|
||||||
|
|
||||||
l.log.Errorf("unable to decode forwarding "+
|
l.log.Errorf("unable to decode forwarding "+
|
||||||
"instructions: %v", err)
|
"instructions: %v", err)
|
||||||
@ -2675,7 +2602,7 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
|
|||||||
|
|
||||||
switch fwdInfo.NextHop {
|
switch fwdInfo.NextHop {
|
||||||
case hop.Exit:
|
case hop.Exit:
|
||||||
updated, err := l.processExitHop(
|
err := l.processExitHop(
|
||||||
pd, obfuscator, fwdInfo, heightNow, pld,
|
pd, obfuscator, fwdInfo, heightNow, pld,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -2683,10 +2610,7 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
|
|||||||
err.Error(),
|
err.Error(),
|
||||||
)
|
)
|
||||||
|
|
||||||
return false
|
return
|
||||||
}
|
|
||||||
if updated {
|
|
||||||
needUpdate = true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// There are additional channels left within this route. So
|
// There are additional channels left within this route. So
|
||||||
@ -2781,7 +2705,6 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
|
|||||||
l.sendHTLCError(
|
l.sendHTLCError(
|
||||||
pd.HtlcIndex, failure, obfuscator, pd.SourceRef,
|
pd.HtlcIndex, failure, obfuscator, pd.SourceRef,
|
||||||
)
|
)
|
||||||
needUpdate = true
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2821,12 +2744,12 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
l.fail(LinkFailureError{code: ErrInternalError},
|
l.fail(LinkFailureError{code: ErrInternalError},
|
||||||
"unable to set fwd filter: %v", err)
|
"unable to set fwd filter: %v", err)
|
||||||
return false
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(switchPackets) == 0 {
|
if len(switchPackets) == 0 {
|
||||||
return needUpdate
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
l.log.Debugf("forwarding %d packets to switch", len(switchPackets))
|
l.log.Debugf("forwarding %d packets to switch", len(switchPackets))
|
||||||
@ -2837,15 +2760,13 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
|
|||||||
// opened circuits, which violates assumptions made by the circuit
|
// opened circuits, which violates assumptions made by the circuit
|
||||||
// trimming.
|
// trimming.
|
||||||
l.forwardBatch(switchPackets...)
|
l.forwardBatch(switchPackets...)
|
||||||
|
|
||||||
return needUpdate
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// processExitHop handles an htlc for which this link is the exit hop. It
|
// processExitHop handles an htlc for which this link is the exit hop. It
|
||||||
// returns a boolean indicating whether the commitment tx needs an update.
|
// returns a boolean indicating whether the commitment tx needs an update.
|
||||||
func (l *channelLink) processExitHop(pd *lnwallet.PaymentDescriptor,
|
func (l *channelLink) processExitHop(pd *lnwallet.PaymentDescriptor,
|
||||||
obfuscator hop.ErrorEncrypter, fwdInfo hop.ForwardingInfo,
|
obfuscator hop.ErrorEncrypter, fwdInfo hop.ForwardingInfo,
|
||||||
heightNow uint32, payload invoices.Payload) (bool, error) {
|
heightNow uint32, payload invoices.Payload) error {
|
||||||
|
|
||||||
// If hodl.ExitSettle is requested, we will not validate the final hop's
|
// If hodl.ExitSettle is requested, we will not validate the final hop's
|
||||||
// ADD, nor will we settle the corresponding invoice or respond with the
|
// ADD, nor will we settle the corresponding invoice or respond with the
|
||||||
@ -2853,7 +2774,7 @@ func (l *channelLink) processExitHop(pd *lnwallet.PaymentDescriptor,
|
|||||||
if l.cfg.HodlMask.Active(hodl.ExitSettle) {
|
if l.cfg.HodlMask.Active(hodl.ExitSettle) {
|
||||||
l.log.Warnf(hodl.ExitSettle.Warning())
|
l.log.Warnf(hodl.ExitSettle.Warning())
|
||||||
|
|
||||||
return false, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// As we're the exit hop, we'll double check the hop-payload included in
|
// As we're the exit hop, we'll double check the hop-payload included in
|
||||||
@ -2868,7 +2789,7 @@ func (l *channelLink) processExitHop(pd *lnwallet.PaymentDescriptor,
|
|||||||
failure := lnwire.NewFinalIncorrectHtlcAmount(pd.Amount)
|
failure := lnwire.NewFinalIncorrectHtlcAmount(pd.Amount)
|
||||||
l.sendHTLCError(pd.HtlcIndex, failure, obfuscator, pd.SourceRef)
|
l.sendHTLCError(pd.HtlcIndex, failure, obfuscator, pd.SourceRef)
|
||||||
|
|
||||||
return true, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// We'll also ensure that our time-lock value has been computed
|
// We'll also ensure that our time-lock value has been computed
|
||||||
@ -2881,7 +2802,7 @@ func (l *channelLink) processExitHop(pd *lnwallet.PaymentDescriptor,
|
|||||||
failure := lnwire.NewFinalIncorrectCltvExpiry(pd.Timeout)
|
failure := lnwire.NewFinalIncorrectCltvExpiry(pd.Timeout)
|
||||||
l.sendHTLCError(pd.HtlcIndex, failure, obfuscator, pd.SourceRef)
|
l.sendHTLCError(pd.HtlcIndex, failure, obfuscator, pd.SourceRef)
|
||||||
|
|
||||||
return true, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Notify the invoiceRegistry of the exit hop htlc. If we crash right
|
// Notify the invoiceRegistry of the exit hop htlc. If we crash right
|
||||||
@ -2906,14 +2827,14 @@ func (l *channelLink) processExitHop(pd *lnwallet.PaymentDescriptor,
|
|||||||
failure := lnwire.NewFailIncorrectDetails(pd.Amount, heightNow)
|
failure := lnwire.NewFailIncorrectDetails(pd.Amount, heightNow)
|
||||||
l.sendHTLCError(pd.HtlcIndex, failure, obfuscator, pd.SourceRef)
|
l.sendHTLCError(pd.HtlcIndex, failure, obfuscator, pd.SourceRef)
|
||||||
|
|
||||||
return true, nil
|
return nil
|
||||||
|
|
||||||
// No error.
|
// No error.
|
||||||
case nil:
|
case nil:
|
||||||
|
|
||||||
// Pass error to caller.
|
// Pass error to caller.
|
||||||
default:
|
default:
|
||||||
return false, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a hodlHtlc struct and decide either resolved now or later.
|
// Create a hodlHtlc struct and decide either resolved now or later.
|
||||||
@ -2926,15 +2847,11 @@ func (l *channelLink) processExitHop(pd *lnwallet.PaymentDescriptor,
|
|||||||
// Save payment descriptor for future reference.
|
// Save payment descriptor for future reference.
|
||||||
l.hodlMap[circuitKey] = htlc
|
l.hodlMap[circuitKey] = htlc
|
||||||
|
|
||||||
return false, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process the received resolution.
|
// Process the received resolution.
|
||||||
err = l.processHodlEvent(*event, htlc)
|
return l.processHodlEvent(*event, htlc)
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
return true, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// settleHTLC settles the HTLC on the channel.
|
// settleHTLC settles the HTLC on the channel.
|
||||||
|
@ -139,6 +139,21 @@ func (l *linkTestContext) receiveRevAndAckAliceToBob() {
|
|||||||
func (l *linkTestContext) receiveCommitSigAliceToBob(expHtlcs int) {
|
func (l *linkTestContext) receiveCommitSigAliceToBob(expHtlcs int) {
|
||||||
l.t.Helper()
|
l.t.Helper()
|
||||||
|
|
||||||
|
comSig := l.receiveCommitSigAlice(expHtlcs)
|
||||||
|
|
||||||
|
err := l.bobChannel.ReceiveNewCommitment(
|
||||||
|
comSig.CommitSig, comSig.HtlcSigs,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
l.t.Fatalf("bob failed receiving commitment: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// receiveCommitSigAlice waits for Alice to send a CommitSig, signing expHtlcs
|
||||||
|
// numbers of HTLCs.
|
||||||
|
func (l *linkTestContext) receiveCommitSigAlice(expHtlcs int) *lnwire.CommitSig {
|
||||||
|
l.t.Helper()
|
||||||
|
|
||||||
var msg lnwire.Message
|
var msg lnwire.Message
|
||||||
select {
|
select {
|
||||||
case msg = <-l.aliceMsgs:
|
case msg = <-l.aliceMsgs:
|
||||||
@ -155,11 +170,8 @@ func (l *linkTestContext) receiveCommitSigAliceToBob(expHtlcs int) {
|
|||||||
l.t.Fatalf("expected %d htlc sigs, got %d", expHtlcs,
|
l.t.Fatalf("expected %d htlc sigs, got %d", expHtlcs,
|
||||||
len(comSig.HtlcSigs))
|
len(comSig.HtlcSigs))
|
||||||
}
|
}
|
||||||
err := l.bobChannel.ReceiveNewCommitment(comSig.CommitSig,
|
|
||||||
comSig.HtlcSigs)
|
return comSig
|
||||||
if err != nil {
|
|
||||||
l.t.Fatalf("bob failed receiving commitment: %v", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// sendRevAndAckBobToAlice make Bob revoke his current commitment, then hand
|
// sendRevAndAckBobToAlice make Bob revoke his current commitment, then hand
|
||||||
@ -242,3 +254,15 @@ func (l *linkTestContext) receiveFailAliceToBob() {
|
|||||||
l.t.Fatalf("unable to apply received fail htlc: %v", err)
|
l.t.Fatalf("unable to apply received fail htlc: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// assertNoMsgFromAlice asserts that Alice hasn't sent a message. Before
|
||||||
|
// calling, make sure that Alice has had the opportunity to send the message.
|
||||||
|
func (l *linkTestContext) assertNoMsgFromAlice(timeout time.Duration) {
|
||||||
|
l.t.Helper()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case msg := <-l.aliceMsgs:
|
||||||
|
l.t.Fatalf("unexpected message from Alice: %v", msg)
|
||||||
|
case <-time.After(timeout):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -4620,6 +4620,91 @@ func TestChannelLinkWaitForRevocation(t *testing.T) {
|
|||||||
assertNoMsgFromAlice()
|
assertNoMsgFromAlice()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestChannelLinkNoEmptySig asserts that no empty commit sig message is sent
|
||||||
|
// when the commitment txes are out of sync.
|
||||||
|
func TestChannelLinkNoEmptySig(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
const chanAmt = btcutil.SatoshiPerBitcoin * 5
|
||||||
|
const chanReserve = btcutil.SatoshiPerBitcoin * 1
|
||||||
|
aliceLink, bobChannel, batchTicker, start, cleanUp, _, err :=
|
||||||
|
newSingleLinkTestHarness(chanAmt, chanReserve)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to create link: %v", err)
|
||||||
|
}
|
||||||
|
defer cleanUp()
|
||||||
|
|
||||||
|
if err := start(); err != nil {
|
||||||
|
t.Fatalf("unable to start test harness: %v", err)
|
||||||
|
}
|
||||||
|
defer aliceLink.Stop()
|
||||||
|
|
||||||
|
var (
|
||||||
|
coreLink = aliceLink.(*channelLink)
|
||||||
|
aliceMsgs = coreLink.cfg.Peer.(*mockPeer).sentMsgs
|
||||||
|
)
|
||||||
|
|
||||||
|
ctx := linkTestContext{
|
||||||
|
t: t,
|
||||||
|
aliceLink: aliceLink,
|
||||||
|
aliceMsgs: aliceMsgs,
|
||||||
|
bobChannel: bobChannel,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send htlc 1 from Alice to Bob.
|
||||||
|
htlc1, _ := generateHtlcAndInvoice(t, 0)
|
||||||
|
ctx.sendHtlcAliceToBob(0, htlc1)
|
||||||
|
ctx.receiveHtlcAliceToBob()
|
||||||
|
|
||||||
|
// Tick the batch ticker to trigger a commitsig from Alice->Bob.
|
||||||
|
select {
|
||||||
|
case batchTicker <- time.Now():
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatalf("could not force commit sig")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Receive a CommitSig from Alice covering the Add from above.
|
||||||
|
ctx.receiveCommitSigAliceToBob(1)
|
||||||
|
|
||||||
|
// Bob revokes previous commitment tx.
|
||||||
|
ctx.sendRevAndAckBobToAlice()
|
||||||
|
|
||||||
|
// Alice sends htlc 2 to Bob.
|
||||||
|
htlc2, _ := generateHtlcAndInvoice(t, 0)
|
||||||
|
ctx.sendHtlcAliceToBob(1, htlc2)
|
||||||
|
ctx.receiveHtlcAliceToBob()
|
||||||
|
|
||||||
|
// Tick the batch ticker to trigger a commitsig from Alice->Bob.
|
||||||
|
select {
|
||||||
|
case batchTicker <- time.Now():
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatalf("could not force commit sig")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the commit sig from Alice, but don't send it to Bob yet.
|
||||||
|
commitSigAlice := ctx.receiveCommitSigAlice(2)
|
||||||
|
|
||||||
|
// Bob adds htlc 1 to its remote commit tx.
|
||||||
|
ctx.sendCommitSigBobToAlice(1)
|
||||||
|
|
||||||
|
// Now send Bob the signature from Alice covering both htlcs.
|
||||||
|
err = bobChannel.ReceiveNewCommitment(
|
||||||
|
commitSigAlice.CommitSig, commitSigAlice.HtlcSigs,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("bob failed receiving commitment: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Both Alice and Bob revoke their previous commitment txes.
|
||||||
|
ctx.receiveRevAndAckAliceToBob()
|
||||||
|
ctx.sendRevAndAckBobToAlice()
|
||||||
|
|
||||||
|
// The commit txes are not in sync, but it is Bob's turn to send a new
|
||||||
|
// signature. We don't expect Alice to send out any message. This check
|
||||||
|
// allows some time for the log commit ticker to trigger for Alice.
|
||||||
|
ctx.assertNoMsgFromAlice(time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
// TestChannelLinkBatchPreimageWrite asserts that a link will batch preimage
|
// TestChannelLinkBatchPreimageWrite asserts that a link will batch preimage
|
||||||
// writes when just as it receives a CommitSig to lock in any Settles, and also
|
// writes when just as it receives a CommitSig to lock in any Settles, and also
|
||||||
// if the link is aware of any uncommitted preimages if the link is stopped,
|
// if the link is aware of any uncommitted preimages if the link is stopped,
|
||||||
@ -5817,6 +5902,95 @@ func TestChannelLinkHoldInvoiceRestart(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestChannelLinkRevocationWindowRegular asserts that htlcs paying to a regular
|
||||||
|
// invoice are settled even if the revocation window gets exhausted.
|
||||||
|
func TestChannelLinkRevocationWindowRegular(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
const (
|
||||||
|
chanAmt = btcutil.SatoshiPerBitcoin * 5
|
||||||
|
)
|
||||||
|
|
||||||
|
// We'll start by creating a new link with our chanAmt (5 BTC). We will
|
||||||
|
// only be testing Alice's behavior, so the reference to Bob's channel
|
||||||
|
// state is unnecessary.
|
||||||
|
aliceLink, bobChannel, _, start, cleanUp, _, err :=
|
||||||
|
newSingleLinkTestHarness(chanAmt, 0)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to create link: %v", err)
|
||||||
|
}
|
||||||
|
defer cleanUp()
|
||||||
|
|
||||||
|
if err := start(); err != nil {
|
||||||
|
t.Fatalf("unable to start test harness: %v", err)
|
||||||
|
}
|
||||||
|
defer aliceLink.Stop()
|
||||||
|
|
||||||
|
var (
|
||||||
|
coreLink = aliceLink.(*channelLink)
|
||||||
|
registry = coreLink.cfg.Registry.(*mockInvoiceRegistry)
|
||||||
|
aliceMsgs = coreLink.cfg.Peer.(*mockPeer).sentMsgs
|
||||||
|
)
|
||||||
|
|
||||||
|
ctx := linkTestContext{
|
||||||
|
t: t,
|
||||||
|
aliceLink: aliceLink,
|
||||||
|
aliceMsgs: aliceMsgs,
|
||||||
|
bobChannel: bobChannel,
|
||||||
|
}
|
||||||
|
|
||||||
|
registry.settleChan = make(chan lntypes.Hash)
|
||||||
|
|
||||||
|
htlc1, invoice1 := generateHtlcAndInvoice(t, 0)
|
||||||
|
htlc2, invoice2 := generateHtlcAndInvoice(t, 1)
|
||||||
|
|
||||||
|
// We must add the invoice to the registry, such that Alice
|
||||||
|
// expects this payment.
|
||||||
|
err = registry.AddInvoice(*invoice1, htlc1.PaymentHash)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to add invoice to registry: %v", err)
|
||||||
|
}
|
||||||
|
err = registry.AddInvoice(*invoice2, htlc2.PaymentHash)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to add invoice to registry: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Lock in htlc 1 on both sides.
|
||||||
|
ctx.sendHtlcBobToAlice(htlc1)
|
||||||
|
ctx.sendCommitSigBobToAlice(1)
|
||||||
|
ctx.receiveRevAndAckAliceToBob()
|
||||||
|
ctx.receiveCommitSigAliceToBob(1)
|
||||||
|
ctx.sendRevAndAckBobToAlice()
|
||||||
|
|
||||||
|
// We expect a call to the invoice registry to notify the arrival of the
|
||||||
|
// htlc.
|
||||||
|
select {
|
||||||
|
case <-registry.settleChan:
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatal("expected invoice to be settled")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Expect alice to send a settle and commitsig message to bob. Bob does
|
||||||
|
// not yet send the revocation.
|
||||||
|
ctx.receiveSettleAliceToBob()
|
||||||
|
ctx.receiveCommitSigAliceToBob(0)
|
||||||
|
|
||||||
|
// Pay invoice 2.
|
||||||
|
ctx.sendHtlcBobToAlice(htlc2)
|
||||||
|
ctx.sendCommitSigBobToAlice(2)
|
||||||
|
ctx.receiveRevAndAckAliceToBob()
|
||||||
|
|
||||||
|
// At this point, Alice cannot send a new commit sig to bob because the
|
||||||
|
// revocation window is exhausted.
|
||||||
|
|
||||||
|
// Bob sends revocation and signs commit with htlc1 settled.
|
||||||
|
ctx.sendRevAndAckBobToAlice()
|
||||||
|
|
||||||
|
// After the revocation, it is again possible for Alice to send a commit
|
||||||
|
// sig with htlc2.
|
||||||
|
ctx.receiveCommitSigAliceToBob(1)
|
||||||
|
}
|
||||||
|
|
||||||
// TestChannelLinkRevocationWindowHodl asserts that htlcs paying to a hodl
|
// TestChannelLinkRevocationWindowHodl asserts that htlcs paying to a hodl
|
||||||
// invoice are settled even if the revocation window gets exhausted.
|
// invoice are settled even if the revocation window gets exhausted.
|
||||||
func TestChannelLinkRevocationWindowHodl(t *testing.T) {
|
func TestChannelLinkRevocationWindowHodl(t *testing.T) {
|
||||||
@ -5962,6 +6136,77 @@ func TestChannelLinkRevocationWindowHodl(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestChannelLinkReceiveEmptySig tests the response of the link to receiving an
|
||||||
|
// empty commit sig. This should be tolerated, but we shouldn't send out an
|
||||||
|
// empty sig ourselves.
|
||||||
|
func TestChannelLinkReceiveEmptySig(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
const chanAmt = btcutil.SatoshiPerBitcoin * 5
|
||||||
|
const chanReserve = btcutil.SatoshiPerBitcoin * 1
|
||||||
|
aliceLink, bobChannel, batchTicker, start, cleanUp, _, err :=
|
||||||
|
newSingleLinkTestHarness(chanAmt, chanReserve)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to create link: %v", err)
|
||||||
|
}
|
||||||
|
defer cleanUp()
|
||||||
|
|
||||||
|
if err := start(); err != nil {
|
||||||
|
t.Fatalf("unable to start test harness: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
coreLink = aliceLink.(*channelLink)
|
||||||
|
aliceMsgs = coreLink.cfg.Peer.(*mockPeer).sentMsgs
|
||||||
|
)
|
||||||
|
|
||||||
|
ctx := linkTestContext{
|
||||||
|
t: t,
|
||||||
|
aliceLink: aliceLink,
|
||||||
|
aliceMsgs: aliceMsgs,
|
||||||
|
bobChannel: bobChannel,
|
||||||
|
}
|
||||||
|
|
||||||
|
htlc, _ := generateHtlcAndInvoice(t, 0)
|
||||||
|
|
||||||
|
// First, send an Add from Alice to Bob.
|
||||||
|
ctx.sendHtlcAliceToBob(0, htlc)
|
||||||
|
ctx.receiveHtlcAliceToBob()
|
||||||
|
|
||||||
|
// Tick the batch ticker to trigger a commitsig from Alice->Bob.
|
||||||
|
select {
|
||||||
|
case batchTicker <- time.Now():
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatalf("could not force commit sig")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make Bob send a CommitSig. Since Bob hasn't received Alice's sig, he
|
||||||
|
// cannot add the htlc to his remote tx yet. The commit sig that we
|
||||||
|
// force Bob to send will be empty. Note that this normally does not
|
||||||
|
// happen, because the link (which is not present for Bob in this test)
|
||||||
|
// check whether Bob actually owes a sig first.
|
||||||
|
ctx.sendCommitSigBobToAlice(0)
|
||||||
|
|
||||||
|
// Receive a CommitSig from Alice covering the htlc from above.
|
||||||
|
ctx.receiveCommitSigAliceToBob(1)
|
||||||
|
|
||||||
|
// Wait for RevokeAndAck Alice->Bob. Even though Bob sent an empty
|
||||||
|
// commit sig, Alice still needs to revoke the previous commitment tx.
|
||||||
|
ctx.receiveRevAndAckAliceToBob()
|
||||||
|
|
||||||
|
// Send RevokeAndAck Bob->Alice to ack the added htlc.
|
||||||
|
ctx.sendRevAndAckBobToAlice()
|
||||||
|
|
||||||
|
// We received an empty commit sig, we accepted it, but there is nothing
|
||||||
|
// new to sign for us.
|
||||||
|
|
||||||
|
// No other messages are expected.
|
||||||
|
ctx.assertNoMsgFromAlice(time.Second)
|
||||||
|
|
||||||
|
// Stop the link
|
||||||
|
aliceLink.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
// assertFailureCode asserts that an error is of type ForwardingError and that
|
// assertFailureCode asserts that an error is of type ForwardingError and that
|
||||||
// the failure code is as expected.
|
// the failure code is as expected.
|
||||||
func assertFailureCode(t *testing.T, err error, code lnwire.FailCode) {
|
func assertFailureCode(t *testing.T, err error, code lnwire.FailCode) {
|
||||||
|
@ -3233,6 +3233,14 @@ func (lc *LightningChannel) SignNextCommitment() (lnwire.Sig, []lnwire.Sig, []ch
|
|||||||
lc.Lock()
|
lc.Lock()
|
||||||
defer lc.Unlock()
|
defer lc.Unlock()
|
||||||
|
|
||||||
|
// Check for empty commit sig. This should never happen, but we don't
|
||||||
|
// dare to fail hard here. We assume peers can deal with the empty sig
|
||||||
|
// and continue channel operation. We log an error so that the bug
|
||||||
|
// causing this can be tracked down.
|
||||||
|
if !lc.oweCommitment(true) {
|
||||||
|
lc.log.Errorf("sending empty commit sig")
|
||||||
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
sig lnwire.Sig
|
sig lnwire.Sig
|
||||||
htlcSigs []lnwire.Sig
|
htlcSigs []lnwire.Sig
|
||||||
@ -3527,7 +3535,7 @@ func (lc *LightningChannel) ProcessChanSyncMsg(
|
|||||||
// but died before the signature was sent. We re-transmit our
|
// but died before the signature was sent. We re-transmit our
|
||||||
// revocation, but also initiate a state transition to re-sync
|
// revocation, but also initiate a state transition to re-sync
|
||||||
// them.
|
// them.
|
||||||
if !lc.FullySynced() {
|
if lc.OweCommitment(true) {
|
||||||
commitSig, htlcSigs, _, err := lc.SignNextCommitment()
|
commitSig, htlcSigs, _, err := lc.SignNextCommitment()
|
||||||
switch {
|
switch {
|
||||||
|
|
||||||
@ -3987,6 +3995,18 @@ func (lc *LightningChannel) ReceiveNewCommitment(commitSig lnwire.Sig,
|
|||||||
lc.Lock()
|
lc.Lock()
|
||||||
defer lc.Unlock()
|
defer lc.Unlock()
|
||||||
|
|
||||||
|
// Check for empty commit sig. Because of a previously existing bug, it
|
||||||
|
// is possible that we receive an empty commit sig from nodes running an
|
||||||
|
// older version. This is a relaxation of the spec, but it is still
|
||||||
|
// possible to handle it. To not break any channels with those older
|
||||||
|
// nodes, we just log the event. This check is also not totally
|
||||||
|
// reliable, because it could be that we've sent out a new sig, but the
|
||||||
|
// remote hasn't received it yet. We could then falsely assume that they
|
||||||
|
// should add our updates to their remote commitment tx.
|
||||||
|
if !lc.oweCommitment(false) {
|
||||||
|
lc.log.Warnf("empty commit sig message received")
|
||||||
|
}
|
||||||
|
|
||||||
// Determine the last update on the local log that has been locked in.
|
// Determine the last update on the local log that has been locked in.
|
||||||
localACKedIndex := lc.remoteCommitChain.tail().ourMessageIndex
|
localACKedIndex := lc.remoteCommitChain.tail().ourMessageIndex
|
||||||
localHtlcIndex := lc.remoteCommitChain.tail().ourHtlcIndex
|
localHtlcIndex := lc.remoteCommitChain.tail().ourHtlcIndex
|
||||||
@ -4140,24 +4160,80 @@ func (lc *LightningChannel) ReceiveNewCommitment(commitSig lnwire.Sig,
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// FullySynced returns a boolean value reflecting if both commitment chains
|
// OweCommitment returns a boolean value reflecting whether we need to send
|
||||||
// (remote+local) are fully in sync. Both commitment chains are fully in sync
|
// out a commitment signature because there are outstanding local updates and/or
|
||||||
// if the tip of each chain includes the latest committed changes from both
|
// updates in the local commit tx that aren't reflected in the remote commit tx
|
||||||
// sides.
|
// yet.
|
||||||
func (lc *LightningChannel) FullySynced() bool {
|
func (lc *LightningChannel) OweCommitment(local bool) bool {
|
||||||
|
lc.RLock()
|
||||||
|
defer lc.RUnlock()
|
||||||
|
|
||||||
|
return lc.oweCommitment(local)
|
||||||
|
}
|
||||||
|
|
||||||
|
// oweCommitment is the internal version of OweCommitment. This function expects
|
||||||
|
// to be executed with a lock held.
|
||||||
|
func (lc *LightningChannel) oweCommitment(local bool) bool {
|
||||||
|
var (
|
||||||
|
remoteUpdatesPending, localUpdatesPending bool
|
||||||
|
|
||||||
|
lastLocalCommit = lc.localCommitChain.tip()
|
||||||
|
lastRemoteCommit = lc.remoteCommitChain.tip()
|
||||||
|
|
||||||
|
perspective string
|
||||||
|
)
|
||||||
|
|
||||||
|
if local {
|
||||||
|
perspective = "local"
|
||||||
|
|
||||||
|
// There are local updates pending if our local update log is
|
||||||
|
// not in sync with our remote commitment tx.
|
||||||
|
localUpdatesPending = lc.localUpdateLog.logIndex !=
|
||||||
|
lastRemoteCommit.ourMessageIndex
|
||||||
|
|
||||||
|
// There are remote updates pending if their remote commitment
|
||||||
|
// tx (our local commitment tx) contains updates that we don't
|
||||||
|
// have added to our remote commitment tx yet.
|
||||||
|
remoteUpdatesPending = lastLocalCommit.theirMessageIndex !=
|
||||||
|
lastRemoteCommit.theirMessageIndex
|
||||||
|
|
||||||
|
} else {
|
||||||
|
perspective = "remote"
|
||||||
|
|
||||||
|
// There are local updates pending (local updates from the
|
||||||
|
// perspective of the remote party) if the remote party has
|
||||||
|
// updates to their remote tx pending for which they haven't
|
||||||
|
// signed yet.
|
||||||
|
localUpdatesPending = lc.remoteUpdateLog.logIndex !=
|
||||||
|
lastLocalCommit.theirMessageIndex
|
||||||
|
|
||||||
|
// There are remote updates pending (remote updates from the
|
||||||
|
// perspective of the remote party) if we have updates on our
|
||||||
|
// remote commitment tx that they haven't added to theirs yet.
|
||||||
|
remoteUpdatesPending = lastRemoteCommit.ourMessageIndex !=
|
||||||
|
lastLocalCommit.ourMessageIndex
|
||||||
|
}
|
||||||
|
|
||||||
|
// If any of the conditions above is true, we owe a commitment
|
||||||
|
// signature.
|
||||||
|
oweCommitment := localUpdatesPending || remoteUpdatesPending
|
||||||
|
|
||||||
|
lc.log.Tracef("%v owes commit: %v (local updates: %v, "+
|
||||||
|
"remote updates %v)", perspective, oweCommitment,
|
||||||
|
localUpdatesPending, remoteUpdatesPending)
|
||||||
|
|
||||||
|
return oweCommitment
|
||||||
|
}
|
||||||
|
|
||||||
|
// PendingLocalUpdateCount returns the number of local updates that still need
|
||||||
|
// to be applied to the remote commitment tx.
|
||||||
|
func (lc *LightningChannel) PendingLocalUpdateCount() uint64 {
|
||||||
lc.RLock()
|
lc.RLock()
|
||||||
defer lc.RUnlock()
|
defer lc.RUnlock()
|
||||||
|
|
||||||
lastLocalCommit := lc.localCommitChain.tip()
|
|
||||||
lastRemoteCommit := lc.remoteCommitChain.tip()
|
lastRemoteCommit := lc.remoteCommitChain.tip()
|
||||||
|
|
||||||
localUpdatesSynced := (lastLocalCommit.ourMessageIndex ==
|
return lc.localUpdateLog.logIndex - lastRemoteCommit.ourMessageIndex
|
||||||
lastRemoteCommit.ourMessageIndex)
|
|
||||||
|
|
||||||
remoteUpdatesSynced := (lastLocalCommit.theirMessageIndex ==
|
|
||||||
lastRemoteCommit.theirMessageIndex)
|
|
||||||
|
|
||||||
return localUpdatesSynced && remoteUpdatesSynced
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// RevokeCurrentCommitment revokes the next lowest unrevoked commitment
|
// RevokeCurrentCommitment revokes the next lowest unrevoked commitment
|
||||||
|
Loading…
Reference in New Issue
Block a user