From ca4eb970ec034efe59eead1137aa1350d1869763 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 16 Jan 2018 20:15:51 -0800 Subject: [PATCH] htlcswitch: move channel re-sync into distinct function --- htlcswitch/link.go | 225 +++++++++++++++++++++++++-------------------- 1 file changed, 123 insertions(+), 102 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 3fcb2a71..451b46ac 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -372,6 +372,124 @@ func shouldAdjustCommitFee(netFee, chanFee btcutil.Amount) bool { } } +// syncChanState attempts to synchronize channel states with the remote party. +// This method is to be called upon reconnection after the initial funding +// flow. We'll compare out commitment chains with the remote party, and re-send +// either a danging commit signature, a revocation, or both. +func (l *channelLink) syncChanStates() error { + log.Infof("Attempting to re-resynchronize ChannelPoint(%v)", + l.channel.ChannelPoint()) + + // First, we'll generate our ChanSync message to send to the other + // side. Based on this message, the remote party will decide if they + // need to retransmit any data or not. + localChanSyncMsg, err := l.channel.ChanSyncMsg() + if err != nil { + return fmt.Errorf("unable to generate chan sync message for "+ + "ChannelPoint(%v)", l.channel.ChannelPoint()) + } + if err := l.cfg.Peer.SendMessage(localChanSyncMsg); err != nil { + return fmt.Errorf("Unable to send chan sync message for "+ + "ChannelPoint(%v)", l.channel.ChannelPoint()) + } + + var msgsToReSend []lnwire.Message + + // Next, we'll wait to receive the ChanSync message with a timeout + // period. The first message sent MUST be the ChanSync message, + // otherwise, we'll terminate the connection. + chanSyncDeadline := time.After(time.Second * 30) + select { + case msg := <-l.upstream: + remoteChanSyncMsg, ok := msg.(*lnwire.ChannelReestablish) + if !ok { + return fmt.Errorf("first message sent to sync "+ + "should be ChannelReestablish, instead "+ + "received: %T", msg) + } + + // If the remote party indicates that they think we haven't + // done any state updates yet, then we'll retransmit the + // funding locked message first. We do this, as at this point + // we can't be sure if they've really received the + // FundingLocked message. + if remoteChanSyncMsg.NextLocalCommitHeight == 1 && + localChanSyncMsg.NextLocalCommitHeight == 1 && + !l.channel.IsPending() { + + log.Infof("ChannelPoint(%v): resending "+ + "FundingLocked message to peer", + l.channel.ChannelPoint()) + + nextRevocation, err := l.channel.NextRevocationKey() + if err != nil { + return fmt.Errorf("unable to create next "+ + "revocation: %v", err) + } + + fundingLockedMsg := lnwire.NewFundingLocked( + l.ChanID(), nextRevocation, + ) + err = l.cfg.Peer.SendMessage(fundingLockedMsg) + if err != nil { + return fmt.Errorf("unable to re-send "+ + "FundingLocked: %v", err) + } + } + + // In any case, we'll then process their ChanSync message. + log.Infof("Received re-establishment message from remote side "+ + "for channel(%v)", l.channel.ChannelPoint()) + + // We've just received a ChnSync message from the remote party, + // so we'll process the message in order to determine if we + // need to re-transmit any messages to the remote party. + msgsToReSend, err = l.channel.ProcessChanSyncMsg(remoteChanSyncMsg) + if err != nil { + // TODO(roasbeef): check concrete type of error, act + // accordingly + return fmt.Errorf("unable to handle upstream reestablish "+ + "message: %v", err) + } + + if len(msgsToReSend) > 0 { + log.Infof("Sending %v updates to synchronize the "+ + "state for ChannelPoint(%v)", len(msgsToReSend), + l.channel.ChannelPoint()) + } + + // If we have any messages to retransmit, we'll do so + // immediately so we return to a synchronized state as soon as + // possible. + for _, msg := range msgsToReSend { + l.cfg.Peer.SendMessage(msg) + } + + case <-l.quit: + return fmt.Errorf("shutting down") + + case <-chanSyncDeadline: + return fmt.Errorf("didn't receive ChannelReestablish before " + + "deadline") + } + + // In order to prep for the fragment below, we'll note if we + // retransmitted any HTLC's settles earlier. We'll track them by the + // HTLC index of the remote party in order to avoid erroneously sending + // a duplicate settle. + htlcsSettled := make(map[uint64]struct{}) + for _, msg := range msgsToReSend { + settleMsg, ok := msg.(*lnwire.UpdateFufillHTLC) + if !ok { + // If this isn't a settle message, then we'll skip it. + continue + } + + // Otherwise, we'll note the ID of the HTLC we're settling so we + // don't duplicate it below. + htlcsSettled[settleMsg.ID] = struct{}{} + } + // htlcManager is the primary goroutine which drives a channel's commitment // update state-machine in response to messages received via several channels. // This goroutine reads messages from the upstream (remote) peer, and also from @@ -396,79 +514,12 @@ func (l *channelLink) htlcManager() { // If this isn't the first time that this channel link has been // created, then we'll need to check to see if we need to - // re-synchronize state with the remote peer. + // re-synchronize state with the remote peer. settledHtlcs is a map of + // HTLC's that we re-settled as part of the channel state sync. if l.cfg.SyncStates { - log.Infof("Attempting to re-resynchronize ChannelPoint(%v)", - l.channel.ChannelPoint()) - - // First, we'll generate our ChanSync message to send to the - // other side. Based on this message, the remote party will - // decide if they need to retransmit any data or not. - localChanSyncMsg, err := l.channel.ChanSyncMsg() - if err != nil { - l.fail("unable to generate chan sync message for "+ - "ChannelPoint(%v)", l.channel.ChannelPoint()) - return - } - if err := l.cfg.Peer.SendMessage(localChanSyncMsg); err != nil { - l.fail("Unable to send chan sync message for "+ - "ChannelPoint(%v)", l.channel.ChannelPoint()) - return - } - - // Next, we'll wait to receive the ChanSync message with a - // timeout period. The first message sent MUST be the ChanSync - // message, otherwise, we'll terminate the connection. - chanSyncDeadline := time.After(time.Second * 30) - select { - case msg := <-l.upstream: - remoteChanSyncMsg, ok := msg.(*lnwire.ChannelReestablish) - if !ok { - l.fail("first message sent to sync should be "+ - "ChannelReestablish, instead "+ - "received: %T", msg) - return - } - - // If the remote party indicates that they think we - // haven't done any state updates yet, then we'll - // retransmit the funding locked message first. We do - // this, as at this point we can't be sure if they've - // really received the FundingLocked message. - if remoteChanSyncMsg.NextLocalCommitHeight == 1 && - localChanSyncMsg.NextLocalCommitHeight == 1 && - !l.channel.IsPending() { - - log.Infof("ChannelPoint(%v): resending "+ - "FundingLocked message to peer", - l.channel.ChannelPoint()) - - nextRevocation, err := l.channel.NextRevocationKey() - if err != nil { - l.fail("unable to create next "+ - "revocation: %v", err) - return - } - - fundingLockedMsg := lnwire.NewFundingLocked( - l.ChanID(), nextRevocation, - ) - err = l.cfg.Peer.SendMessage(fundingLockedMsg) - if err != nil { - l.fail("unable to re-send "+ - "FundingLocked: %v", err) - return - } - } - - // In any case, we'll then process their ChanSync - // message. - l.handleUpstreamMsg(msg) - case <-l.quit: - return - case <-chanSyncDeadline: - l.fail("didn't receive ChannelReestablish before " + - "deadline") + // TODO(roasbeef): need to ensure haven't already settled? + if err := l.syncChanStates(); err != nil { + l.fail(err.Error()) return } } @@ -816,36 +867,6 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { // direct channel with, updating our respective commitment chains. func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { switch msg := msg.(type) { - case *lnwire.ChannelReestablish: - log.Infof("Received re-establishment message from remote side "+ - "for channel(%v)", l.channel.ChannelPoint()) - - // We've just received a ChnSync message from the remote party, - // so we'll process the message in order to determine if we - // need to re-transmit any messages to the remote party. - msgsToReSend, err := l.channel.ProcessChanSyncMsg(msg) - if err != nil { - // TODO(roasbeef): check concrete type of error, act - // accordingly - l.fail("unable to handle upstream reestablish "+ - "message: %v", err) - return - } - - if len(msgsToReSend) > 0 { - log.Infof("Sending %v updates to synchronize the "+ - "state for ChannelPoint(%v)", len(msgsToReSend), - l.channel.ChannelPoint()) - } - - // If we have any messages to retransmit, we'll do so - // immediately so we return to a synchronized state as soon as - // possible. - for _, msg := range msgsToReSend { - l.cfg.Peer.SendMessage(msg) - } - - return case *lnwire.UpdateAddHTLC: // We just received an add request from an upstream peer, so we