htlcswitch: move channel re-sync into distinct function

This commit is contained in:
Olaoluwa Osuntokun 2018-01-16 20:15:51 -08:00
parent 2d133acaeb
commit ca4eb970ec
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21

@ -372,6 +372,124 @@ func shouldAdjustCommitFee(netFee, chanFee btcutil.Amount) bool {
} }
} }
// syncChanState attempts to synchronize channel states with the remote party.
// This method is to be called upon reconnection after the initial funding
// flow. We'll compare out commitment chains with the remote party, and re-send
// either a danging commit signature, a revocation, or both.
func (l *channelLink) syncChanStates() error {
log.Infof("Attempting to re-resynchronize ChannelPoint(%v)",
l.channel.ChannelPoint())
// First, we'll generate our ChanSync message to send to the other
// side. Based on this message, the remote party will decide if they
// need to retransmit any data or not.
localChanSyncMsg, err := l.channel.ChanSyncMsg()
if err != nil {
return fmt.Errorf("unable to generate chan sync message for "+
"ChannelPoint(%v)", l.channel.ChannelPoint())
}
if err := l.cfg.Peer.SendMessage(localChanSyncMsg); err != nil {
return fmt.Errorf("Unable to send chan sync message for "+
"ChannelPoint(%v)", l.channel.ChannelPoint())
}
var msgsToReSend []lnwire.Message
// Next, we'll wait to receive the ChanSync message with a timeout
// period. The first message sent MUST be the ChanSync message,
// otherwise, we'll terminate the connection.
chanSyncDeadline := time.After(time.Second * 30)
select {
case msg := <-l.upstream:
remoteChanSyncMsg, ok := msg.(*lnwire.ChannelReestablish)
if !ok {
return fmt.Errorf("first message sent to sync "+
"should be ChannelReestablish, instead "+
"received: %T", msg)
}
// If the remote party indicates that they think we haven't
// done any state updates yet, then we'll retransmit the
// funding locked message first. We do this, as at this point
// we can't be sure if they've really received the
// FundingLocked message.
if remoteChanSyncMsg.NextLocalCommitHeight == 1 &&
localChanSyncMsg.NextLocalCommitHeight == 1 &&
!l.channel.IsPending() {
log.Infof("ChannelPoint(%v): resending "+
"FundingLocked message to peer",
l.channel.ChannelPoint())
nextRevocation, err := l.channel.NextRevocationKey()
if err != nil {
return fmt.Errorf("unable to create next "+
"revocation: %v", err)
}
fundingLockedMsg := lnwire.NewFundingLocked(
l.ChanID(), nextRevocation,
)
err = l.cfg.Peer.SendMessage(fundingLockedMsg)
if err != nil {
return fmt.Errorf("unable to re-send "+
"FundingLocked: %v", err)
}
}
// In any case, we'll then process their ChanSync message.
log.Infof("Received re-establishment message from remote side "+
"for channel(%v)", l.channel.ChannelPoint())
// We've just received a ChnSync message from the remote party,
// so we'll process the message in order to determine if we
// need to re-transmit any messages to the remote party.
msgsToReSend, err = l.channel.ProcessChanSyncMsg(remoteChanSyncMsg)
if err != nil {
// TODO(roasbeef): check concrete type of error, act
// accordingly
return fmt.Errorf("unable to handle upstream reestablish "+
"message: %v", err)
}
if len(msgsToReSend) > 0 {
log.Infof("Sending %v updates to synchronize the "+
"state for ChannelPoint(%v)", len(msgsToReSend),
l.channel.ChannelPoint())
}
// If we have any messages to retransmit, we'll do so
// immediately so we return to a synchronized state as soon as
// possible.
for _, msg := range msgsToReSend {
l.cfg.Peer.SendMessage(msg)
}
case <-l.quit:
return fmt.Errorf("shutting down")
case <-chanSyncDeadline:
return fmt.Errorf("didn't receive ChannelReestablish before " +
"deadline")
}
// In order to prep for the fragment below, we'll note if we
// retransmitted any HTLC's settles earlier. We'll track them by the
// HTLC index of the remote party in order to avoid erroneously sending
// a duplicate settle.
htlcsSettled := make(map[uint64]struct{})
for _, msg := range msgsToReSend {
settleMsg, ok := msg.(*lnwire.UpdateFufillHTLC)
if !ok {
// If this isn't a settle message, then we'll skip it.
continue
}
// Otherwise, we'll note the ID of the HTLC we're settling so we
// don't duplicate it below.
htlcsSettled[settleMsg.ID] = struct{}{}
}
// htlcManager is the primary goroutine which drives a channel's commitment // htlcManager is the primary goroutine which drives a channel's commitment
// update state-machine in response to messages received via several channels. // update state-machine in response to messages received via several channels.
// This goroutine reads messages from the upstream (remote) peer, and also from // This goroutine reads messages from the upstream (remote) peer, and also from
@ -396,79 +514,12 @@ func (l *channelLink) htlcManager() {
// If this isn't the first time that this channel link has been // If this isn't the first time that this channel link has been
// created, then we'll need to check to see if we need to // created, then we'll need to check to see if we need to
// re-synchronize state with the remote peer. // re-synchronize state with the remote peer. settledHtlcs is a map of
// HTLC's that we re-settled as part of the channel state sync.
if l.cfg.SyncStates { if l.cfg.SyncStates {
log.Infof("Attempting to re-resynchronize ChannelPoint(%v)", // TODO(roasbeef): need to ensure haven't already settled?
l.channel.ChannelPoint()) if err := l.syncChanStates(); err != nil {
l.fail(err.Error())
// First, we'll generate our ChanSync message to send to the
// other side. Based on this message, the remote party will
// decide if they need to retransmit any data or not.
localChanSyncMsg, err := l.channel.ChanSyncMsg()
if err != nil {
l.fail("unable to generate chan sync message for "+
"ChannelPoint(%v)", l.channel.ChannelPoint())
return
}
if err := l.cfg.Peer.SendMessage(localChanSyncMsg); err != nil {
l.fail("Unable to send chan sync message for "+
"ChannelPoint(%v)", l.channel.ChannelPoint())
return
}
// Next, we'll wait to receive the ChanSync message with a
// timeout period. The first message sent MUST be the ChanSync
// message, otherwise, we'll terminate the connection.
chanSyncDeadline := time.After(time.Second * 30)
select {
case msg := <-l.upstream:
remoteChanSyncMsg, ok := msg.(*lnwire.ChannelReestablish)
if !ok {
l.fail("first message sent to sync should be "+
"ChannelReestablish, instead "+
"received: %T", msg)
return
}
// If the remote party indicates that they think we
// haven't done any state updates yet, then we'll
// retransmit the funding locked message first. We do
// this, as at this point we can't be sure if they've
// really received the FundingLocked message.
if remoteChanSyncMsg.NextLocalCommitHeight == 1 &&
localChanSyncMsg.NextLocalCommitHeight == 1 &&
!l.channel.IsPending() {
log.Infof("ChannelPoint(%v): resending "+
"FundingLocked message to peer",
l.channel.ChannelPoint())
nextRevocation, err := l.channel.NextRevocationKey()
if err != nil {
l.fail("unable to create next "+
"revocation: %v", err)
return
}
fundingLockedMsg := lnwire.NewFundingLocked(
l.ChanID(), nextRevocation,
)
err = l.cfg.Peer.SendMessage(fundingLockedMsg)
if err != nil {
l.fail("unable to re-send "+
"FundingLocked: %v", err)
return
}
}
// In any case, we'll then process their ChanSync
// message.
l.handleUpstreamMsg(msg)
case <-l.quit:
return
case <-chanSyncDeadline:
l.fail("didn't receive ChannelReestablish before " +
"deadline")
return return
} }
} }
@ -816,36 +867,6 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
// direct channel with, updating our respective commitment chains. // direct channel with, updating our respective commitment chains.
func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
switch msg := msg.(type) { switch msg := msg.(type) {
case *lnwire.ChannelReestablish:
log.Infof("Received re-establishment message from remote side "+
"for channel(%v)", l.channel.ChannelPoint())
// We've just received a ChnSync message from the remote party,
// so we'll process the message in order to determine if we
// need to re-transmit any messages to the remote party.
msgsToReSend, err := l.channel.ProcessChanSyncMsg(msg)
if err != nil {
// TODO(roasbeef): check concrete type of error, act
// accordingly
l.fail("unable to handle upstream reestablish "+
"message: %v", err)
return
}
if len(msgsToReSend) > 0 {
log.Infof("Sending %v updates to synchronize the "+
"state for ChannelPoint(%v)", len(msgsToReSend),
l.channel.ChannelPoint())
}
// If we have any messages to retransmit, we'll do so
// immediately so we return to a synchronized state as soon as
// possible.
for _, msg := range msgsToReSend {
l.cfg.Peer.SendMessage(msg)
}
return
case *lnwire.UpdateAddHTLC: case *lnwire.UpdateAddHTLC:
// We just received an add request from an upstream peer, so we // We just received an add request from an upstream peer, so we