peer: simplify channel state update handling by using

This commit simplifies the channel state update handling by doing away
with the commitmentState.pendingUpdate method all together. The newly
added LightningChannel.FullySynced method replace the prior state and
also replaced all other uses of PendingUpdates.

By moving to using channel.FullySynced() we also eliminate class of
desynchronization error caused by a node failing to provide the other
side with the latest commitment state.
This commit is contained in:
Olaoluwa Osuntokun 2017-04-11 21:55:05 -07:00
parent 31acace692
commit 3393f3a8db
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2

42
peer.go

@ -1071,10 +1071,6 @@ type commitmentState struct {
pendingBatch []*pendingPayment pendingBatch []*pendingPayment
// pendingUpdate is a bool which indicates if we have a pending state
// update outstanding whch has not yet been ACK'd.
pendingUpdate bool
// clearedHTCLs is a map of outgoing HTLCs we've committed to in our // clearedHTCLs is a map of outgoing HTLCs we've committed to in our
// chain which have not yet been settled by the upstream peer. // chain which have not yet been settled by the upstream peer.
clearedHTCLs map[uint64]*pendingPayment clearedHTCLs map[uint64]*pendingPayment
@ -1183,15 +1179,14 @@ out:
case <-state.logCommitTick: case <-state.logCommitTick:
// If we haven't sent or received a new commitment // If we haven't sent or received a new commitment
// update in some time, check to see if we have any // update in some time, check to see if we have any
// pending updates we need to commit. If so, then send // pending updates we need to commit due to our
// an update incrementing the unacked counter is // commitment chains being desynchronized.
// successfully. if state.channel.FullySynced() &&
if !state.channel.PendingUpdates() &&
len(state.htlcsToSettle) == 0 { len(state.htlcsToSettle) == 0 {
continue continue
} }
if err := p.updateCommitTx(state, false); err != nil { if err := p.updateCommitTx(state); err != nil {
peerLog.Errorf("unable to update commitment: %v", peerLog.Errorf("unable to update commitment: %v",
err) err)
p.Disconnect() p.Disconnect()
@ -1210,7 +1205,7 @@ out:
// If the send was unsuccessful, then abandon the // If the send was unsuccessful, then abandon the
// update, waiting for the revocation window to open // update, waiting for the revocation window to open
// up. // up.
if err := p.updateCommitTx(state, false); err != nil { if err := p.updateCommitTx(state); err != nil {
peerLog.Errorf("unable to update "+ peerLog.Errorf("unable to update "+
"commitment: %v", err) "commitment: %v", err)
p.Disconnect() p.Disconnect()
@ -1369,7 +1364,7 @@ func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) {
// this is a settle request, then initiate an update. // this is a settle request, then initiate an update.
// TODO(roasbeef): enforce max HTLCs in flight limit // TODO(roasbeef): enforce max HTLCs in flight limit
if len(state.pendingBatch) >= 10 || isSettle { if len(state.pendingBatch) >= 10 || isSettle {
if err := p.updateCommitTx(state, false); err != nil { if err := p.updateCommitTx(state); err != nil {
peerLog.Errorf("unable to update "+ peerLog.Errorf("unable to update "+
"commitment: %v", err) "commitment: %v", err)
p.Disconnect() p.Disconnect()
@ -1513,14 +1508,6 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
} }
p.queueMsg(nextRevocation, nil) p.queueMsg(nextRevocation, nil)
// If we just initiated a state transition, and we were waiting
// for a reply from the remote peer, then we don't need to
// response with a subsequent CommitSig message. So we toggle
// the `pendingUpdate` bool, and set a timer to wake us up in
// the future to check if we have any updates we need to
// commit.
if state.pendingUpdate {
state.pendingUpdate = false
if !state.logCommitTimer.Stop() { if !state.logCommitTimer.Stop() {
select { select {
@ -1531,13 +1518,17 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
state.logCommitTimer.Reset(300 * time.Millisecond) state.logCommitTimer.Reset(300 * time.Millisecond)
state.logCommitTick = state.logCommitTimer.C state.logCommitTick = state.logCommitTimer.C
// If both commitment chains are fully synced from our PoV,
// then we don't need to reply with a signature as both sides
// already have a commitment with the latest accepted state.
if state.channel.FullySynced() {
return return
} }
// Otherwise, the remote party initiated the state transition, // Otherwise, the remote party initiated the state transition,
// so we'll reply with a signature to provide them with their // so we'll reply with a signature to provide them with their
// version of the latest commitment state. // version of the latest commitment state.
if err := p.updateCommitTx(state, true); err != nil { if err := p.updateCommitTx(state); err != nil {
peerLog.Errorf("unable to update commitment: %v", err) peerLog.Errorf("unable to update commitment: %v", err)
p.Disconnect() p.Disconnect()
return return
@ -1693,7 +1684,7 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
// With all the settle updates added to the local and remote // With all the settle updates added to the local and remote
// HTLC logs, initiate a state transition by updating the // HTLC logs, initiate a state transition by updating the
// remote commitment chain. // remote commitment chain.
if err := p.updateCommitTx(state, false); err != nil { if err := p.updateCommitTx(state); err != nil {
peerLog.Errorf("unable to update commitment: %v", err) peerLog.Errorf("unable to update commitment: %v", err)
p.Disconnect() p.Disconnect()
return return
@ -1714,7 +1705,7 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
// updateCommitTx signs, then sends an update to the remote peer adding a new // updateCommitTx signs, then sends an update to the remote peer adding a new
// commitment to their commitment chain which includes all the latest updates // commitment to their commitment chain which includes all the latest updates
// we've received+processed up to this point. // we've received+processed up to this point.
func (p *peer) updateCommitTx(state *commitmentState, reply bool) error { func (p *peer) updateCommitTx(state *commitmentState) error {
sigTheirs, err := state.channel.SignNextCommitment() sigTheirs, err := state.channel.SignNextCommitment()
if err == lnwallet.ErrNoWindow { if err == lnwallet.ErrNoWindow {
peerLog.Tracef("revocation window exhausted, unable to send %v", peerLog.Tracef("revocation window exhausted, unable to send %v",
@ -1757,13 +1748,6 @@ func (p *peer) updateCommitTx(state *commitmentState, reply bool) error {
// TODO(roasbeef): re-slice instead to avoid GC? // TODO(roasbeef): re-slice instead to avoid GC?
state.pendingBatch = nil state.pendingBatch = nil
// If this isn't a reply to a state transitioned initiated by the
// remote node, then we toggle the `pendingUpdate` bool to indicate
// that we're waiting for a CommitSig in response.
if !reply {
state.pendingUpdate = true
}
return nil return nil
} }