lnwallet: partition state update logs within channel state machine

This commit patrons the state update logs properly within the channel
state machine. This change fixes a number of bugs caused by treating a
central log as two logically distinct logs. Rather than having a bit
indicating if the entry is incoming/outgoing, an entry is added to a
remote or local log depending on which modification method is used.

As a result the code is much easier to follow due to separation of
concerts.

Finally, when attempting to sign a new update with an exhausted
renovation window a distinct error is returned in order to allow higher
level callers to properly back-off and handle the protocol event.
This commit is contained in:
Olaoluwa Osuntokun 2016-07-21 16:50:20 -07:00
parent 35bca369e7
commit 4063171918
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
3 changed files with 337 additions and 215 deletions

@ -23,6 +23,8 @@ var zeroHash wire.ShaHash
var ( var (
ErrChanClosing = fmt.Errorf("channel is being closed, operation disallowed") ErrChanClosing = fmt.Errorf("channel is being closed, operation disallowed")
ErrNoWindow = fmt.Errorf("unable to sign new commitment, the current" +
" revocation window is exhausted")
) )
const ( const (
@ -105,9 +107,6 @@ type PaymentDescriptor struct {
// Amount is the HTLC amount in satoshis. // Amount is the HTLC amount in satoshis.
Amount btcutil.Amount Amount btcutil.Amount
// IsIncoming denotes if this is an incoming HTLC add/settle/timeout.
IsIncoming bool
// Index is the log entry number that his HTLC update has within the // Index is the log entry number that his HTLC update has within the
// log. Depending on if IsIncoming is true, this is either an entry the // log. Depending on if IsIncoming is true, this is either an entry the
// remote party added, or one that we added locally. // remote party added, or one that we added locally.
@ -125,14 +124,7 @@ type PaymentDescriptor struct {
// Type denotes the exact type of the PaymentDescriptor. In the case of // Type denotes the exact type of the PaymentDescriptor. In the case of
// a Timeout, or Settle type, then the Parent field will point into the // a Timeout, or Settle type, then the Parent field will point into the
// log to the HTLC being modified. // log to the HTLC being modified.
entryType updateType EntryType updateType
// Parent is a pointer to the parent HTLC being modified
// (timedout/settled) if the Type of this PaymentDescriptor isn't Add.
// This pointer is used when determining which HTLC should be included
// within a new commitment, and during log compaction to remove the
// modified HTLC along with the Timeout/Settle entry.
parent *list.Element
// addCommitHeight[Remote|Local] encodes the height of the commitment // addCommitHeight[Remote|Local] encodes the height of the commitment
// which included this HTLC on either the remote or local commitment // which included this HTLC on either the remote or local commitment
@ -286,8 +278,8 @@ type LightningChannel struct {
sync.RWMutex sync.RWMutex
ourLogIndex uint32 ourLogCounter uint32
theirLogIndex uint32 theirLogCounter uint32
status channelState status channelState
@ -337,11 +329,13 @@ type LightningChannel struct {
// updates to this channel. The log is walked backwards as HTLC updates // updates to this channel. The log is walked backwards as HTLC updates
// are applied in order to re-construct a commitment transaction from a // are applied in order to re-construct a commitment transaction from a
// commitment. The log is compacted once a revocation is received. // commitment. The log is compacted once a revocation is received.
stateUpdateLog *list.List ourUpdateLog *list.List
theirUpdateLog *list.List
// logIndex is an index into the above log. This index is used to // logIndex is an index into the above log. This index is used to
// remove Add state updates, once a timeout/settle is received. // remove Add state updates, once a timeout/settle is received.
logIndex map[uint32]*list.Element ourLogIndex map[uint32]*list.Element
theirLogIndex map[uint32]*list.Element
fundingTxIn *wire.TxIn fundingTxIn *wire.TxIn
fundingP2WSH []byte fundingP2WSH []byte
@ -372,8 +366,10 @@ func NewLightningChannel(wallet *LightningWallet, events chainntnfs.ChainNotifie
localCommitChain: newCommitmentChain(state.NumUpdates), localCommitChain: newCommitmentChain(state.NumUpdates),
channelState: state, channelState: state,
revocationWindowEdge: state.NumUpdates, revocationWindowEdge: state.NumUpdates,
stateUpdateLog: list.New(), ourUpdateLog: list.New(),
logIndex: make(map[uint32]*list.Element), theirUpdateLog: list.New(),
ourLogIndex: make(map[uint32]*list.Element),
theirLogIndex: make(map[uint32]*list.Element),
channelDB: chanDB, channelDB: chanDB,
} }
@ -402,62 +398,43 @@ func NewLightningChannel(wallet *LightningWallet, events chainntnfs.ChainNotifie
return lc, nil return lc, nil
} }
// getCommitedHTLCs returns all HTLCs which are currently fully committed, type htlcView struct {
// meaning they are present at the commitment which is at the tip of the ourUpdates []*PaymentDescriptor
// local+remote commitment chains. theirUpdates []*PaymentDescriptor
func (lc *LightningChannel) getCommitedHTLCs() []*PaymentDescriptor {
var activeHtlcs []*PaymentDescriptor
remoteChainTail := lc.remoteCommitChain.tail().height
localChainTail := lc.localCommitChain.tail().height
for e := lc.stateUpdateLog.Front(); e != nil; e = e.Next() {
htlc := e.Value.(*PaymentDescriptor)
// TODO(roasbeef): should only look at the updates in our log?
// If the state update isn't a timeout, or settle, then it may
// not be considered an active HTLC.
if htlc.entryType != Add {
continue
}
// If the height of the tail of both the local, and
// remote commitment chains are above the height we
// both committed to the HTLC in our chains, then the
// HTLC is considered fully active/locked.
if remoteChainTail >= htlc.addCommitHeightRemote &&
localChainTail >= htlc.addCommitHeightLocal {
activeHtlcs = append(activeHtlcs, htlc)
}
}
return activeHtlcs
} }
// fetchHTLCView returns all the candidate HTLC updates which should be // fetchHTLCView returns all the candidate HTLC updates which should be
// considered for inclusion within a commitment based on the passed HTLC log // considered for inclusion within a commitment based on the passed HTLC log
// indexes. // indexes.
func (lc *LightningChannel) fetchHTLCView(theirLogIndex, ourLogIndex uint32) []*PaymentDescriptor { func (lc *LightningChannel) fetchHTLCView(theirLogIndex, ourLogIndex uint32) *htlcView {
var activeHtlcs []*PaymentDescriptor var ourHTLCs []*PaymentDescriptor
for e := lc.ourUpdateLog.Front(); e != nil; e = e.Next() {
for e := lc.stateUpdateLog.Front(); e != nil; e = e.Next() { htlc := e.Value.(*PaymentDescriptor)
// This HTLC is active from this point-of-view iff the log
// index of the state update is below the specified index in
// our update log.
if htlc.Index < ourLogIndex {
ourHTLCs = append(ourHTLCs, htlc)
}
}
var theirHTLCs []*PaymentDescriptor
for e := lc.theirUpdateLog.Front(); e != nil; e = e.Next() {
htlc := e.Value.(*PaymentDescriptor) htlc := e.Value.(*PaymentDescriptor)
if htlc.IsIncoming && htlc.Index <= theirLogIndex {
// If this is an incoming HTLC, then it is only active from // If this is an incoming HTLC, then it is only active from
// this point-of-view if the index of the HTLC addition in // this point-of-view if the index of the HTLC addition in
// their log is below the specified view index. // their log is below the specified view index.
activeHtlcs = append(activeHtlcs, htlc) if htlc.Index < theirLogIndex {
} else if htlc.Index <= ourLogIndex { theirHTLCs = append(theirHTLCs, htlc)
// Otherwise, this HTLC is active from this
// point-of-view iff the log index of the state
// update is below the specified index in our
// update log.
activeHtlcs = append(activeHtlcs, htlc)
} }
} }
return activeHtlcs return &htlcView{
ourUpdates: ourHTLCs,
theirUpdates: theirHTLCs,
}
} }
// fetchCommitmentView returns a populated commitment which expresses the state // fetchCommitmentView returns a populated commitment which expresses the state
@ -487,25 +464,15 @@ func (lc *LightningChannel) fetchCommitmentView(remoteChain bool,
theirBalance = commitChain.tip().theirBalance theirBalance = commitChain.tip().theirBalance
} }
nextHeight := commitChain.tip().height + 1
// Run through all the HTLC's that will be covered by this transaction // Run through all the HTLC's that will be covered by this transaction
// in order to update their commitment addition height, and to adjust // in order to update their commitment addition height, and to adjust
// the balances on the commitment transaction accordingly. // the balances on the commitment transaction accordingly.
// TODO(roasbeef): error if log empty? // TODO(roasbeef): error if log empty?
skip := make(map[PaymentHash]struct{}) htlcView := lc.fetchHTLCView(theirLogIndex, ourLogIndex)
nextHeight := commitChain.tip().height + 1 filteredHTLCView := lc.evaluateHTLCView(htlcView, &ourBalance, &theirBalance,
logViewEntries := lc.fetchHTLCView(theirLogIndex, ourLogIndex) nextHeight, remoteChain)
htlcs := make([]*PaymentDescriptor, 0, len(logViewEntries))
for i := len(logViewEntries) - 1; i >= 0; i-- {
logEntry := logViewEntries[i]
if _, ok := skip[logEntry.RHash]; ok {
continue
}
if processLogEntry(skip, logEntry, &ourBalance, &theirBalance,
ourLogIndex, theirLogIndex, nextHeight, remoteChain) {
htlcs = append(htlcs, logEntry)
}
}
var selfKey *btcec.PublicKey var selfKey *btcec.PublicKey
var remoteKey *btcec.PublicKey var remoteKey *btcec.PublicKey
@ -533,9 +500,15 @@ func (lc *LightningChannel) fetchCommitmentView(remoteChain bool,
if err != nil { if err != nil {
return nil, err return nil, err
} }
for _, htlc := range htlcs { for _, htlc := range filteredHTLCView.ourUpdates {
if err := lc.addHTLC(commitTx, ourCommitTx, htlc, if err := lc.addHTLC(commitTx, ourCommitTx, htlc,
revocationHash, delay); err != nil { revocationHash, delay, false); err != nil {
return nil, err
}
}
for _, htlc := range filteredHTLCView.theirUpdates {
if err := lc.addHTLC(commitTx, ourCommitTx, htlc,
revocationHash, delay, true); err != nil {
return nil, err return nil, err
} }
} }
@ -555,40 +528,82 @@ func (lc *LightningChannel) fetchCommitmentView(remoteChain bool,
}, nil }, nil
} }
// processLogEntry processes a log entry within the HTLC log. Processes entries // evaluateHTLCView processes all update entries in both HTLC update logs,
// either add new HTLCs to the commitment which weren't present in prior // producing a final view which is the result of properly applying all adds,
// commitments, or remove a commited HTLC which is being settled or timedout. // settles, and timeouts found in both logs. The resulting view returned
// In either case, the change of balances are applied via the pointers to the // reflects the current state of htlc's within the remote or local commitment
// balances passed in. In the case of a removal, the skip map is populated with // chain.
// the hash of the HTLC which should be excluded from the commitment func (lc *LightningChannel) evaluateHTLCView(view *htlcView, ourBalance,
// transaction. theirBalance *btcutil.Amount, nextHeight uint64, remoteChain bool) *htlcView {
func processLogEntry(skip map[PaymentHash]struct{}, htlc *PaymentDescriptor,
ourBalance, theirBalance *btcutil.Amount, ourLogIndex, theirLogIndex uint32,
nextHeight uint64, remoteChain bool) bool {
if htlc.entryType == Add { newView := &htlcView{}
processAddEntry(htlc, ourBalance, theirBalance, nextHeight,
ourLogIndex, theirLogIndex, remoteChain) // We use two maps, one for the local log and one for the remote log to
return true // keep track of which entries we need to skip when creating the final
// htlc view. We skip an entry whenever we find a settle or a timeout
// modifying an entry.
skipUs := make(map[uint32]struct{})
skipThem := make(map[uint32]struct{})
// First we run through non-add entries in both logs, populating the
// skip sets and mutating the current chain state (crediting balances, etc) to
// reflect the settle/timeout entry encountered.
for _, entry := range view.ourUpdates {
if entry.EntryType == Add {
continue
} }
// Otherwise, this is a log entry that aims to modify/remove an addEntry := lc.theirLogIndex[entry.ParentIndex].Value.(*PaymentDescriptor)
// existing entry. We add it's "parent" to the skip map since this
// entry removes an earlier log entry in order to avoid adding it to skipThem[addEntry.Index] = struct{}{}
// the current commitment transaction. processRemoveEntry(entry, ourBalance, theirBalance,
parent := htlc.parent.Value.(*PaymentDescriptor) nextHeight, remoteChain, true)
skip[parent.RHash] = struct{}{} }
processRemoveEntry(htlc, ourBalance, theirBalance, nextHeight, for _, entry := range view.theirUpdates {
ourLogIndex, theirLogIndex, remoteChain) if entry.EntryType == Add {
return false continue
} }
// processAddEntry evalualtes the effect of an add entry within the HTLC log. addEntry := lc.ourLogIndex[entry.ParentIndex].Value.(*PaymentDescriptor)
skipUs[addEntry.Index] = struct{}{}
processRemoveEntry(entry, ourBalance, theirBalance,
nextHeight, remoteChain, false)
}
// Next we take a second pass through all the log entries, skipping any
// settled HTLC's, and debiting the chain state balance due to any
// newly added HTLC's.
for _, entry := range view.ourUpdates {
isAdd := entry.EntryType == Add
if _, ok := skipUs[entry.Index]; !isAdd || ok {
continue
}
processAddEntry(entry, ourBalance, theirBalance, nextHeight,
remoteChain, false)
newView.ourUpdates = append(newView.ourUpdates, entry)
}
for _, entry := range view.theirUpdates {
isAdd := entry.EntryType == Add
if _, ok := skipThem[entry.Index]; !isAdd || ok {
continue
}
processAddEntry(entry, ourBalance, theirBalance, nextHeight,
remoteChain, true)
newView.theirUpdates = append(newView.theirUpdates, entry)
}
return newView
}
// processAddEntry evaluates the effect of an add entry within the HTLC log.
// If the HTLC hasn't yet been committed in either chain, then the height it // If the HTLC hasn't yet been committed in either chain, then the height it
// was commited is updated. Keeping track of this inclusion height allows us to // was commited is updated. Keeping track of this inclusion height allows us to
// later compact the log once the change is fully committed in both chains. // later compact the log once the change is fully committed in both chains.
func processAddEntry(htlc *PaymentDescriptor, ourBalance, theirBalance *btcutil.Amount, func processAddEntry(htlc *PaymentDescriptor, ourBalance, theirBalance *btcutil.Amount,
nextHeight uint64, ourLogIndex, theirLogIndex uint32, remoteChain bool) { nextHeight uint64, remoteChain bool, isIncoming bool) {
// If we're evaluating this entry for the remote chain (to create/view // If we're evaluating this entry for the remote chain (to create/view
// a new commitment), then we'll may be updating the height this entry // a new commitment), then we'll may be updating the height this entry
@ -605,7 +620,7 @@ func processAddEntry(htlc *PaymentDescriptor, ourBalance, theirBalance *btcutil.
return return
} }
if htlc.IsIncoming { if isIncoming {
// If this is a new incoming (un-committed) HTLC, then we need // If this is a new incoming (un-committed) HTLC, then we need
// to update their balance accordingly by subtracting the // to update their balance accordingly by subtracting the
// amount of the HTLC that are funds pending. // amount of the HTLC that are funds pending.
@ -624,7 +639,7 @@ func processAddEntry(htlc *PaymentDescriptor, ourBalance, theirBalance *btcutil.
// is skipped. // is skipped.
func processRemoveEntry(htlc *PaymentDescriptor, ourBalance, func processRemoveEntry(htlc *PaymentDescriptor, ourBalance,
theirBalance *btcutil.Amount, nextHeight uint64, theirBalance *btcutil.Amount, nextHeight uint64,
ourLogIndex, theirLogIndex uint32, remoteChain bool) { remoteChain bool, isIncoming bool) {
var removeHeight *uint64 var removeHeight *uint64
if remoteChain { if remoteChain {
@ -643,21 +658,21 @@ func processRemoveEntry(htlc *PaymentDescriptor, ourBalance,
// received the preimage either from another sub-system, or the // received the preimage either from another sub-system, or the
// upstream peer in the route. Therefore, we increase our balance by // upstream peer in the route. Therefore, we increase our balance by
// the HTLC amount. // the HTLC amount.
case htlc.IsIncoming && htlc.entryType == Settle: case isIncoming && htlc.EntryType == Settle:
*ourBalance += htlc.Amount *ourBalance += htlc.Amount
// Otherwise, this HTLC is being timed out, therefore the value of the // Otherwise, this HTLC is being timed out, therefore the value of the
// HTLC should return to the remote party. // HTLC should return to the remote party.
case htlc.IsIncoming && htlc.entryType == Timeout: case isIncoming && htlc.EntryType == Timeout:
*theirBalance += htlc.Amount *theirBalance += htlc.Amount
// If an outgoing HTLC is being settled, then this means that the // If an outgoing HTLC is being settled, then this means that the
// downstream party resented the preimage or learned of it via a // downstream party resented the preimage or learned of it via a
// downstream peer. In either case, we credit their settled value with // downstream peer. In either case, we credit their settled value with
// the value of the HTLC. // the value of the HTLC.
case !htlc.IsIncoming && htlc.entryType == Settle: case !isIncoming && htlc.EntryType == Settle:
*theirBalance += htlc.Amount *theirBalance += htlc.Amount
// Otherwise, one of our outgoing HTLC's has timed out, so the value of // Otherwise, one of our outgoing HTLC's has timed out, so the value of
// the HTLC should be returned to our settled balance. // the HTLC should be returned to our settled balance.
case !htlc.IsIncoming && htlc.entryType == Timeout: case !isIncoming && htlc.EntryType == Timeout:
*ourBalance += htlc.Amount *ourBalance += htlc.Amount
} }
@ -676,9 +691,7 @@ func (lc *LightningChannel) SignNextCommitment() ([]byte, uint32, error) {
// state unless they first revoke a prior commitment transaction. // state unless they first revoke a prior commitment transaction.
if len(lc.revocationWindow) == 0 || if len(lc.revocationWindow) == 0 ||
len(lc.usedRevocations) == InitialRevocationWindow { len(lc.usedRevocations) == InitialRevocationWindow {
// TODO(rosbeef): better error message return nil, 0, ErrNoWindow
return nil, 0, fmt.Errorf("unable to sign new commitment, " +
"revocation window exausted")
} }
// Grab the next revocation hash and key to use for this new commitment // Grab the next revocation hash and key to use for this new commitment
@ -693,8 +706,8 @@ func (lc *LightningChannel) SignNextCommitment() ([]byte, uint32, error) {
// HTLC's. The view includes the latest balances for both sides on the // HTLC's. The view includes the latest balances for both sides on the
// remote node's chain, and also update the addition height of any new // remote node's chain, and also update the addition height of any new
// HTLC log entries. // HTLC log entries.
newCommitView, err := lc.fetchCommitmentView(true, lc.ourLogIndex, newCommitView, err := lc.fetchCommitmentView(true, lc.ourLogCounter,
lc.theirLogIndex, remoteRevocationKey, remoteRevocationHash) lc.theirLogCounter, remoteRevocationKey, remoteRevocationHash)
if err != nil { if err != nil {
return nil, 0, err return nil, 0, err
} }
@ -704,7 +717,9 @@ func (lc *LightningChannel) SignNextCommitment() ([]byte, uint32, error) {
log.Tracef("ChannelPoint(%v): remote chain: our_balance=%v, "+ log.Tracef("ChannelPoint(%v): remote chain: our_balance=%v, "+
"their_balance=%v, commit_tx: %v", lc.channelState.ChanID, "their_balance=%v, commit_tx: %v", lc.channelState.ChanID,
newCommitView.ourBalance, newCommitView.theirBalance, newCommitView.ourBalance, newCommitView.theirBalance,
spew.Sdump(newCommitView.txn)) newLogClosure(func() string {
return spew.Sdump(newCommitView.txn)
}))
// Sign their version of the new commitment transaction. // Sign their version of the new commitment transaction.
hashCache := txscript.NewTxSigHashes(newCommitView.txn) hashCache := txscript.NewTxSigHashes(newCommitView.txn)
@ -729,7 +744,7 @@ func (lc *LightningChannel) SignNextCommitment() ([]byte, uint32, error) {
// Strip off the sighash flag on the signature in order to send it over // Strip off the sighash flag on the signature in order to send it over
// the wire. // the wire.
return sig[:len(sig)], lc.theirLogIndex, nil return sig[:len(sig)], lc.theirLogCounter, nil
} }
// ReceiveNewCommitment processs a signature for a new commitment state sent by // ReceiveNewCommitment processs a signature for a new commitment state sent by
@ -746,7 +761,7 @@ func (lc *LightningChannel) ReceiveNewCommitment(rawSig []byte,
theirCommitKey := lc.channelState.TheirCommitKey theirCommitKey := lc.channelState.TheirCommitKey
theirMultiSigKey := lc.channelState.TheirMultiSigKey theirMultiSigKey := lc.channelState.TheirMultiSigKey
// We're receiving a new commitment which attemps to extend our local // We're receiving a new commitment which attempts to extend our local
// commitment chain height by one, so fetch the proper revocation to // commitment chain height by one, so fetch the proper revocation to
// derive the key+hash needed to construct the new commitment view and // derive the key+hash needed to construct the new commitment view and
// state. // state.
@ -762,7 +777,7 @@ func (lc *LightningChannel) ReceiveNewCommitment(rawSig []byte,
// commitment view which includes all the entries we know of in their // commitment view which includes all the entries we know of in their
// HTLC log, and up to ourLogIndex in our HTLC log. // HTLC log, and up to ourLogIndex in our HTLC log.
localCommitmentView, err := lc.fetchCommitmentView(false, ourLogIndex, localCommitmentView, err := lc.fetchCommitmentView(false, ourLogIndex,
lc.theirLogIndex, revocationKey, revocationHash) lc.theirLogCounter, revocationKey, revocationHash)
if err != nil { if err != nil {
return err return err
} }
@ -772,7 +787,9 @@ func (lc *LightningChannel) ReceiveNewCommitment(rawSig []byte,
log.Tracef("ChannelPoint(%v): local chain: our_balance=%v, "+ log.Tracef("ChannelPoint(%v): local chain: our_balance=%v, "+
"their_balance=%v, commit_tx: %v", lc.channelState.ChanID, "their_balance=%v, commit_tx: %v", lc.channelState.ChanID,
localCommitmentView.ourBalance, localCommitmentView.theirBalance, localCommitmentView.ourBalance, localCommitmentView.theirBalance,
spew.Sdump(localCommitmentView.txn)) newLogClosure(func() string {
return spew.Sdump(localCommitmentView.txn)
}))
// Construct the sighash of the commitment transaction corresponding to // Construct the sighash of the commitment transaction corresponding to
// this newly proposed state update. // this newly proposed state update.
@ -940,49 +957,87 @@ func (lc *LightningChannel) ReceiveRevocation(revMsg *lnwire.CommitRevocation) (
// Now that we've verified the revocation update the state of the HTLC // Now that we've verified the revocation update the state of the HTLC
// log as we may be able to prune portions of it now, and update their // log as we may be able to prune portions of it now, and update their
// balance. // balance.
// TODO(roasbeef): move this out to another func?
// * .CompactLog()
var next *list.Element
var htlcsToForward []*PaymentDescriptor var htlcsToForward []*PaymentDescriptor
for e := lc.stateUpdateLog.Front(); e != nil; e = next { for e := lc.theirUpdateLog.Front(); e != nil; e = e.Next() {
next = e.Next()
htlc := e.Value.(*PaymentDescriptor) htlc := e.Value.(*PaymentDescriptor)
if htlc.isForwarded { if htlc.isForwarded {
continue continue
} }
// If this entry is either a timeout or settle, then we // TODO(roasbeef): re-visit after adding persistence to HTLC's
// can remove it from our log once the update it locked // * either record add height, or set to N - 1
// into both of our chains. uncomitted := (htlc.addCommitHeightRemote == 0 ||
if htlc.entryType != Add && htlc.addCommitHeightLocal == 0)
if htlc.EntryType == Add && uncomitted {
continue
}
if htlc.EntryType == Add &&
remoteChainTail >= htlc.addCommitHeightRemote &&
localChainTail >= htlc.addCommitHeightLocal {
htlc.isForwarded = true
htlcsToForward = append(htlcsToForward, htlc)
} else if htlc.EntryType != Add &&
remoteChainTail >= htlc.removeCommitHeightRemote && remoteChainTail >= htlc.removeCommitHeightRemote &&
localChainTail >= htlc.removeCommitHeightLocal { localChainTail >= htlc.removeCommitHeightLocal {
parentLink := htlc.parent
lc.stateUpdateLog.Remove(e)
lc.stateUpdateLog.Remove(parentLink)
if !htlc.IsIncoming {
htlc.ParentIndex = parentLink.Value.(*PaymentDescriptor).Index
htlcsToForward = append(htlcsToForward, htlc)
}
} else if remoteChainTail >= htlc.addCommitHeightRemote &&
localChainTail >= htlc.addCommitHeightLocal {
// Once an HTLC has been fully locked into both of our
// chains, then we can safely forward it to the next
// hop.
if htlc.IsIncoming {
htlc.isForwarded = true htlc.isForwarded = true
htlcsToForward = append(htlcsToForward, htlc) htlcsToForward = append(htlcsToForward, htlc)
} }
} }
}
lc.compactLogs(lc.ourUpdateLog, lc.theirUpdateLog,
localChainTail, remoteChainTail)
return htlcsToForward, nil return htlcsToForward, nil
} }
// compactLogs performs garbage collection within the log removing HTLC's which
// have been removed from the point-of-view of the tail of both chains. The
// entries which timeout/settle HTLC's are also removed.
func (lc *LightningChannel) compactLogs(ourLog, theirLog *list.List,
localChainTail, remoteChainTail uint64) {
compactLog := func(logA, logB *list.List, indexB, indexA map[uint32]*list.Element) {
var nextA *list.Element
for e := logA.Front(); e != nil; e = nextA {
nextA = e.Next()
htlc := e.Value.(*PaymentDescriptor)
if htlc.EntryType == Add {
continue
}
// If the HTLC hasn't yet been removed from either
// chain, the skip it.
if htlc.removeCommitHeightRemote == 0 ||
htlc.removeCommitHeightLocal == 0 {
continue
}
// Otherwise if the height of the tail of both chains
// is at least the height in which the HTLC was
// removed, then evict the settle/timeout entry along
// with the original add entry.
if remoteChainTail >= htlc.removeCommitHeightRemote &&
localChainTail >= htlc.removeCommitHeightLocal {
parentLink := indexB[htlc.ParentIndex]
parentIndex := parentLink.Value.(*PaymentDescriptor).Index
logB.Remove(parentLink)
logA.Remove(e)
delete(indexB, parentIndex)
delete(indexA, htlc.Index)
}
}
}
compactLog(ourLog, theirLog, lc.theirLogIndex, lc.ourLogIndex)
compactLog(theirLog, ourLog, lc.ourLogIndex, lc.theirLogIndex)
}
// ExtendRevocationWindow extends our revocation window by a single revocation, // ExtendRevocationWindow extends our revocation window by a single revocation,
// increasing the number of new commitment updates the remote party can // increasing the number of new commitment updates the remote party can
// initiate without our cooperation. // initiate without our cooperation.
@ -1009,49 +1064,57 @@ func (lc *LightningChannel) ExtendRevocationWindow() (*lnwire.CommitRevocation,
return revMsg, nil return revMsg, nil
} }
// AddPayment adds a new HTLC to either the local or remote HTLC log depending // AddHTLC adds an HTLC to the state machine's local update log. This method
// on the value of 'incoming'. // should be called when preparing to send an outgoing HTLC.
func (lc *LightningChannel) AddHTLC(htlc *lnwire.HTLCAddRequest, incoming bool) uint32 { func (lc *LightningChannel) AddHTLC(htlc *lnwire.HTLCAddRequest) uint32 {
pd := &PaymentDescriptor{ pd := &PaymentDescriptor{
entryType: Add, EntryType: Add,
RHash: PaymentHash(htlc.RedemptionHashes[0]), RHash: PaymentHash(htlc.RedemptionHashes[0]),
Timeout: htlc.Expiry, Timeout: htlc.Expiry,
Amount: btcutil.Amount(htlc.Amount), Amount: btcutil.Amount(htlc.Amount),
IsIncoming: incoming, Index: lc.ourLogCounter,
} }
var index uint32 lc.ourLogIndex[pd.Index] = lc.ourUpdateLog.PushBack(pd)
if !incoming { lc.ourLogCounter++
index = lc.ourLogIndex
lc.ourLogIndex++ return pd.Index
} else {
index = lc.theirLogIndex
lc.theirLogIndex++
} }
pd.Index = index // ReceiveHTLC adds an HTLC to the state machine's remote update log. This
lc.stateUpdateLog.PushBack(pd) // method should be called in response to receiving a new HTLC from the remote
// party.
return index func (lc *LightningChannel) ReceiveHTLC(htlc *lnwire.HTLCAddRequest) uint32 {
pd := &PaymentDescriptor{
EntryType: Add,
RHash: PaymentHash(htlc.RedemptionHashes[0]),
Timeout: htlc.Expiry,
Amount: btcutil.Amount(htlc.Amount),
Index: lc.theirLogCounter,
} }
// SettleHTLC attempts to settle an existing outstanding HTLC with an htlc lc.theirLogIndex[pd.Index] = lc.theirUpdateLog.PushBack(pd)
// settle request. When settling incoming HTLC's the value of incoming should lc.theirLogCounter++
// be false, when receiving a settlement to a previously outgoing HTLC, then
// the value of incoming should be true. If the settlement fails due to an return pd.Index
// invalid preimage, then an error is returned. }
func (lc *LightningChannel) SettleHTLC(preimage [32]byte, incoming bool) (uint32, error) {
// SettleHTLC attempst to settle an existing outstanding received HTLC. The
// remote log index of the HTLC settled is returned in order to facilitate
// creating the corresponding wire message. In the case the supplied pre-image
// is invalid, an error is returned.
func (lc *LightningChannel) SettleHTLC(preimage [32]byte) (uint32, error) {
var targetHTLC *list.Element var targetHTLC *list.Element
// TODO(roasbeef): optimize // TODO(roasbeef): optimize
paymentHash := fastsha256.Sum256(preimage[:]) paymentHash := fastsha256.Sum256(preimage[:])
for e := lc.stateUpdateLog.Back(); e != nil; e = e.Next() { for e := lc.theirUpdateLog.Front(); e != nil; e = e.Next() {
htlc := e.Value.(*PaymentDescriptor) htlc := e.Value.(*PaymentDescriptor)
if htlc.entryType != Add { if htlc.EntryType != Add {
continue continue
} }
if bytes.Equal(htlc.RHash[:], paymentHash[:]) && !htlc.settled { if !htlc.settled && bytes.Equal(htlc.RHash[:], paymentHash[:]) {
htlc.settled = true htlc.settled = true
targetHTLC = e targetHTLC = e
break break
@ -1064,27 +1127,48 @@ func (lc *LightningChannel) SettleHTLC(preimage [32]byte, incoming bool) (uint32
parentPd := targetHTLC.Value.(*PaymentDescriptor) parentPd := targetHTLC.Value.(*PaymentDescriptor)
// TODO(roasbeef): maybe make the log entries an interface? // TODO(roasbeef): maybe make the log entries an interface?
pd := &PaymentDescriptor{} pd := &PaymentDescriptor{
pd.IsIncoming = parentPd.IsIncoming Amount: parentPd.Amount,
pd.Amount = parentPd.Amount Index: lc.ourLogCounter,
pd.parent = targetHTLC ParentIndex: parentPd.Index,
pd.entryType = Settle EntryType: Settle,
var index uint32
if !incoming {
index = lc.ourLogIndex
lc.ourLogIndex++
} else {
index = lc.theirLogIndex
lc.theirLogIndex++
} }
pd.Index = index lc.ourUpdateLog.PushBack(pd)
lc.stateUpdateLog.PushBack(pd) lc.ourLogCounter++
return targetHTLC.Value.(*PaymentDescriptor).Index, nil return targetHTLC.Value.(*PaymentDescriptor).Index, nil
} }
// ReceiveHTLCSettle attempts to settle an existing outgoing HTLC indexed by an
// index into the local log. If the specified index doesn't exist within the
// log, and error is returned. Similarly if the preimage is invalid w.r.t to
// the referenced of then a distinct error is returned.
func (lc *LightningChannel) ReceiveHTLCSettle(preimage [32]byte, logIndex uint32) error {
paymentHash := fastsha256.Sum256(preimage[:])
addEntry, ok := lc.ourLogIndex[logIndex]
if !ok {
return fmt.Errorf("non existant log entry")
}
htlc := addEntry.Value.(*PaymentDescriptor)
if !bytes.Equal(htlc.RHash[:], paymentHash[:]) {
return fmt.Errorf("invalid payment hash")
}
pd := &PaymentDescriptor{
Amount: htlc.Amount,
ParentIndex: htlc.Index,
Index: lc.theirLogCounter,
EntryType: Settle,
}
lc.theirUpdateLog.PushBack(pd)
lc.theirLogCounter++
return nil
}
// TimeoutHTLC... // TimeoutHTLC...
func (lc *LightningChannel) TimeoutHTLC() error { func (lc *LightningChannel) TimeoutHTLC() error {
return nil return nil
@ -1102,7 +1186,8 @@ func (lc *LightningChannel) ChannelPoint() *wire.OutPoint {
// is incoming and if it's being applied to our commitment transaction or that // is incoming and if it's being applied to our commitment transaction or that
// of the remote node's. // of the remote node's.
func (lc *LightningChannel) addHTLC(commitTx *wire.MsgTx, ourCommit bool, func (lc *LightningChannel) addHTLC(commitTx *wire.MsgTx, ourCommit bool,
paymentDesc *PaymentDescriptor, revocation [32]byte, delay uint32) error { paymentDesc *PaymentDescriptor, revocation [32]byte, delay uint32,
isIncoming bool) error {
localKey := lc.channelState.OurCommitKey.PubKey() localKey := lc.channelState.OurCommitKey.PubKey()
remoteKey := lc.channelState.TheirCommitKey remoteKey := lc.channelState.TheirCommitKey
@ -1118,25 +1203,25 @@ func (lc *LightningChannel) addHTLC(commitTx *wire.MsgTx, ourCommit bool,
// The HTLC is paying to us, and being applied to our commitment // The HTLC is paying to us, and being applied to our commitment
// transaction. So we need to use the receiver's version of HTLC the // transaction. So we need to use the receiver's version of HTLC the
// script. // script.
case paymentDesc.IsIncoming && ourCommit: case isIncoming && ourCommit:
pkScript, err = receiverHTLCScript(timeout, delay, remoteKey, pkScript, err = receiverHTLCScript(timeout, delay, remoteKey,
localKey, revocation[:], rHash[:]) localKey, revocation[:], rHash[:])
// We're being paid via an HTLC by the remote party, and the HTLC is // We're being paid via an HTLC by the remote party, and the HTLC is
// being added to their commitment transaction, so we use the sender's // being added to their commitment transaction, so we use the sender's
// version of the HTLC script. // version of the HTLC script.
case paymentDesc.IsIncoming && !ourCommit: case isIncoming && !ourCommit:
pkScript, err = senderHTLCScript(timeout, delay, remoteKey, pkScript, err = senderHTLCScript(timeout, delay, remoteKey,
localKey, revocation[:], rHash[:]) localKey, revocation[:], rHash[:])
// We're sending an HTLC which is being added to our commitment // We're sending an HTLC which is being added to our commitment
// transaction. Therefore, we need to use the sender's version of the // transaction. Therefore, we need to use the sender's version of the
// HTLC script. // HTLC script.
case !paymentDesc.IsIncoming && ourCommit: case !isIncoming && ourCommit:
pkScript, err = senderHTLCScript(timeout, delay, localKey, pkScript, err = senderHTLCScript(timeout, delay, localKey,
remoteKey, revocation[:], rHash[:]) remoteKey, revocation[:], rHash[:])
// Finally, we're paying the remote party via an HTLC, which is being // Finally, we're paying the remote party via an HTLC, which is being
// added to their commitment transaction. Therefore, we use the // added to their commitment transaction. Therefore, we use the
// receiver's version of the HTLC script. // receiver's version of the HTLC script.
case !paymentDesc.IsIncoming && !ourCommit: case !isIncoming && !ourCommit:
pkScript, err = receiverHTLCScript(timeout, delay, localKey, pkScript, err = receiverHTLCScript(timeout, delay, localKey,
remoteKey, revocation[:], rHash[:]) remoteKey, revocation[:], rHash[:])
} }

@ -223,12 +223,10 @@ func TestSimpleAddSettleWorkflow(t *testing.T) {
} }
// First Alice adds the outgoing HTLC to her local channel's state // First Alice adds the outgoing HTLC to her local channel's state
// update log. // update log. Then Alice sends this wire message over to Bob who also
aliceChannel.AddHTLC(htlc, false) // adds this htlc to his local state update log.
aliceChannel.AddHTLC(htlc)
// Then Alice sends this wire message over to Bob who also adds this bobChannel.ReceiveHTLC(htlc)
// htlc to his local state update log.
bobChannel.AddHTLC(htlc, true)
// Next alice commits this change by sending a signature message. // Next alice commits this change by sending a signature message.
aliceSig, bobLogIndex, err := aliceChannel.SignNextCommitment() aliceSig, bobLogIndex, err := aliceChannel.SignNextCommitment()
@ -324,10 +322,11 @@ func TestSimpleAddSettleWorkflow(t *testing.T) {
// HTLC once he learns of the preimage. // HTLC once he learns of the preimage.
var preimage [32]byte var preimage [32]byte
copy(preimage[:], paymentPreimage) copy(preimage[:], paymentPreimage)
if _, err := bobChannel.SettleHTLC(preimage, false); err != nil { settleIndex, err := bobChannel.SettleHTLC(preimage)
if err != nil {
t.Fatalf("bob unable to settle inbound htlc: %v", err) t.Fatalf("bob unable to settle inbound htlc: %v", err)
} }
if _, err := aliceChannel.SettleHTLC(preimage, true); err != nil { if err := aliceChannel.ReceiveHTLCSettle(preimage, settleIndex); err != nil {
t.Fatalf("alice unable to accept settle of outbound htlc: %v", err) t.Fatalf("alice unable to accept settle of outbound htlc: %v", err)
} }
bobSig2, aliceIndex2, err := bobChannel.SignNextCommitment() bobSig2, aliceIndex2, err := bobChannel.SignNextCommitment()
@ -408,15 +407,37 @@ func TestSimpleAddSettleWorkflow(t *testing.T) {
// The logs of both sides should now be cleared since the entry adding // The logs of both sides should now be cleared since the entry adding
// the HTLC should have been removed once both sides recieve the // the HTLC should have been removed once both sides recieve the
// revocation. // revocation.
aliceLogLen := aliceChannel.stateUpdateLog.Len() if aliceChannel.ourUpdateLog.Len() != 0 {
if aliceLogLen != 0 { t.Fatalf("alice's local not updated, should be empty, has %v entries "+
t.Fatalf("alice's log not updated, should be empty, has %v entries "+ "instead", aliceChannel.ourUpdateLog.Len())
"instead", aliceLogLen)
} }
bobLogLen := bobChannel.stateUpdateLog.Len() if aliceChannel.theirUpdateLog.Len() != 0 {
if bobLogLen != 0 { t.Fatalf("alice's remote not updated, should be empty, has %v entries "+
t.Fatalf("bob's log not updated, should be empty, has %v entries "+ "instead", aliceChannel.theirUpdateLog.Len())
"instead", bobLogLen) }
if len(aliceChannel.ourLogIndex) != 0 {
t.Fatalf("alice's local log index not cleared, should be empty but "+
"has %v entries", len(aliceChannel.ourLogIndex))
}
if len(aliceChannel.theirLogIndex) != 0 {
t.Fatalf("alice's remote log index not cleared, should be empty but "+
"has %v entries", len(aliceChannel.theirLogIndex))
}
if bobChannel.ourUpdateLog.Len() != 0 {
t.Fatalf("bob's local log not updated, should be empty, has %v entries "+
"instead", bobChannel.ourUpdateLog.Len())
}
if bobChannel.theirUpdateLog.Len() != 0 {
t.Fatalf("bob's remote log not updated, should be empty, has %v entries "+
"instead", bobChannel.theirUpdateLog.Len())
}
if len(bobChannel.ourLogIndex) != 0 {
t.Fatalf("bob's local log index not cleared, should be empty but "+
"has %v entries", len(bobChannel.ourLogIndex))
}
if len(bobChannel.theirLogIndex) != 0 {
t.Fatalf("bob's remote log index not cleared, should be empty but "+
"has %v entries", len(bobChannel.theirLogIndex))
} }
} }

@ -54,3 +54,19 @@ func SetLogWriter(w io.Writer, level string) error {
UseLogger(l) UseLogger(l)
return nil return nil
} }
// logClosure is used to provide a closure over expensive logging operations
// so don't have to be performed when the logging level doesn't warrant it.
type logClosure func() string
// String invokes the underlying function and returns the result.
func (c logClosure) String() string {
return c()
}
// newLogClosure returns a new closure over a function that returns a string
// which itself provides a Stringer interface so that it can be used with the
// logging system.
func newLogClosure(c func() string) logClosure {
return logClosure(c)
}