diff --git a/htlcswitch/link.go b/htlcswitch/link.go index c2ca512e..d2980750 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -302,25 +302,74 @@ func (l *channelLink) htlcManager() { log.Infof("HTLC manager for ChannelPoint(%v) started, "+ "bandwidth=%v", l.channel.ChannelPoint(), l.Bandwidth()) - // If the link have been recreated, than we need to sync the states by - // sending the channel reestablishment message. + // TODO(roasbeef): need to call wipe chan whenever D/C? + + // If this isn't the first time that this channel link has been + // created, then we'll need to check to see if we need to + // re-synchronize state with the remote peer. if l.cfg.SyncStates { - log.Infof("Syncing states for channel(%v) via sending the "+ - "re-establishment message", l.channel.ChannelPoint()) + log.Infof("Attempting to re-resynchronize ChannelPoint(%v)", + l.channel.ChannelPoint()) - localCommitmentNumber, remoteRevocationNumber := l.channel.LastCounters() + // First, we'll generate our ChanSync message to send to the + // other side. Based on this message, the remote party will + // decide if they need to retransmit any data or not. + localChanSyncMsg := l.channel.ChanSyncMsg() + if err := l.cfg.Peer.SendMessage(localChanSyncMsg); err != nil { + l.fail("Unable to send chan sync message for "+ + "ChannelPoint(%v)", l.channel.ChannelPoint()) + return + } - l.cfg.Peer.SendMessage(&lnwire.ChannelReestablish{ - ChanID: l.ChanID(), - NextLocalCommitmentNumber: localCommitmentNumber + 1, - NextRemoteRevocationNumber: remoteRevocationNumber + 1, - }) + // Next, we'll wait to receive the ChanSync message with a + // timeout period. The first message sent MUST be the ChanSync + // message, otherwise, we'll terminate the connection. + chanSyncDeadline := time.After(time.Second * 30) + select { + case msg := <-l.upstream: + remoteChanSyncMsg, ok := msg.(*lnwire.ChannelReestablish) + if !ok { + l.fail("first message sent to sync should be "+ + "ChannelReestablish, instead "+ + "received: %T", msg) + return + } - if err := l.channelInitialization(); err != nil { - err := errors.Errorf("unable to sync the states for channel(%v)"+ - "with remote node: %v", l.ChanID(), err) - log.Error(err) - l.cfg.Peer.Disconnect(err) + // If the remote party indicates that they think we + // haven't done any state updates yet, then we'll + // retransmit the funding locked message first. We do + // this, as at this point we can't be sure if they've + // really received the FundingLocked message. + if remoteChanSyncMsg.NextLocalCommitHeight == 1 && + localChanSyncMsg.NextLocalCommitHeight == 1 { + + log.Debugf("Resending fundingLocked message " + + "to peer") + + nextRevocation, err := l.channel.NextRevocationKey() + if err != nil { + l.fail("unable to create next "+ + "revocation: %v", err) + return + } + + fundingLockedMsg := lnwire.NewFundingLocked( + l.ChanID(), nextRevocation, + ) + err = l.cfg.Peer.SendMessage(fundingLockedMsg) + if err != nil { + l.fail("unable to re-send "+ + "FundingLocked: %v", err) + return + } + } + + // In any case, we'll then process their ChanSync + // message. + l.handleUpstreamMsg(msg) + case <-chanSyncDeadline: + l.fail("didn't receive ChannelReestablish before " + + "deadline") return } } @@ -334,28 +383,6 @@ func (l *channelLink) htlcManager() { defer batchTimer.Stop() // TODO(roasbeef): fail chan in case of protocol violation - - // If the number of updates on this channel has been zero, we should - // resend the fundingLocked message. This is because in this case we - // cannot be sure if the peer really received the last fundingLocked we - // sent, so resend now. - if l.channel.StateSnapshot().NumUpdates == 0 { - log.Debugf("Resending fundingLocked message to peer.") - - nextRevocation, err := l.channel.NextRevocationKey() - if err != nil { - log.Errorf("unable to create next revocation: %v", err) - } - - fundingLockedMsg := lnwire.NewFundingLocked(l.ChanID(), - nextRevocation) - err = l.cfg.Peer.SendMessage(fundingLockedMsg) - if err != nil { - log.Errorf("failed resending fundingLocked to peer: %v", - err) - } - } - out: for { select { @@ -383,6 +410,7 @@ out: log.Warnf("Remote peer has closed ChannelPoint(%v) on-chain", l.channel.ChannelPoint()) + // TODO(roasbeef): remove all together go func() { if err := l.cfg.Peer.WipeChannel(l.channel); err != nil { log.Errorf("unable to wipe channel %v", err) @@ -475,6 +503,7 @@ out: case msg := <-l.upstream: l.handleUpstreamMsg(msg) + // TODO(roasbeef): make distinct goroutine to handle? case cmd := <-l.linkControl: switch req := cmd.(type) { @@ -669,21 +698,26 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { log.Infof("Received re-establishment message from remote side "+ "for channel(%v)", l.channel.ChannelPoint()) - messagesToSyncState, err := l.channel.ReceiveReestablish(msg) + // We've just received a ChnSync message from the remote party, + // so we'll process the message in order to determine if we + // need to re-transmit any messages to the remote party. + msgsToReSend, err := l.channel.ProcessChanSyncMsg(msg) if err != nil { - err := errors.Errorf("unable to handle upstream reestablish "+ + l.fail("unable to handle upstream reestablish "+ "message: %v", err) - log.Error(err) - l.cfg.Peer.Disconnect(err) return } - // Send message to the remote side which are needed to synchronize - // the state. - log.Infof("Sending %v updates to synchronize the "+ - "state for channel(%v)", len(messagesToSyncState), - l.channel.ChannelPoint()) - for _, msg := range messagesToSyncState { + if len(msgsToReSend) > 0 { + log.Infof("Sending %v updates to synchronize the "+ + "state for ChannelPoint(%v)", len(msgsToReSend), + l.channel.ChannelPoint()) + } + + // If we have any messages to retransmit, we'll do so + // immediately so we return to a synchronized state as soon as + // possible. + for _, msg := range msgsToReSend { l.cfg.Peer.SendMessage(msg) } @@ -698,6 +732,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { l.fail("unable to handle upstream add HTLC: %v", err) return } + log.Tracef("Receive upstream htlc with payment hash(%x), "+ "assigning index: %v", msg.PaymentHash[:], index) @@ -763,10 +798,6 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { return } - // Increment the available bandwidth as they've removed our - // HTLC. - atomic.AddUint64(&l.availableBandwidth, uint64(amt)) - case *lnwire.CommitSig: // We just received a new updates to our local commitment chain, // validate this new commitment, closing the link if invalid. @@ -810,7 +841,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // Otherwise, the remote party initiated the state transition, // so we'll reply with a signature to provide them with their - // version of the latest commitment l. + // version of the latest commitment. if err := l.updateCommitTx(); err != nil { l.fail("unable to update commitment: %v", err) return @@ -845,7 +876,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { }() case *lnwire.UpdateFee: - // We received fee update from peer. If we are the initator we + // We received fee update from peer. If we are the initiator we // will fail the channel, if not we will apply the update. fee := msg.FeePerKw if err := l.channel.ReceiveUpdateFee(fee); err != nil { @@ -924,9 +955,9 @@ type getBandwidthCmd struct { } // Bandwidth returns the total amount that can flow through the channel link at -// this given instance. The value returned is expressed in millatoshi and -// can be used by callers when making forwarding decisions to determine if a -// link can accept an HTLC. +// this given instance. The value returned is expressed in millisatoshi and can +// be used by callers when making forwarding decisions to determine if a link +// can accept an HTLC. // // NOTE: Part of the ChannelLink interface. func (l *channelLink) Bandwidth() lnwire.MilliSatoshi { @@ -1498,37 +1529,3 @@ func (l *channelLink) fail(format string, a ...interface{}) { log.Error(reason) l.cfg.Peer.Disconnect(reason) } - -// channelInitialization waits for channel synchronization message to -// be received from another side and handled. -func (l *channelLink) channelInitialization() error { - // Before we launch any of the helper goroutines off the channel link - // struct, we'll first ensure proper adherence to the p2p protocol. The - // channel reestablish message MUST be sent before any other message. - expired := time.After(time.Second * 5) - - for { - select { - case msg := <-l.upstream: - if msg, ok := msg.(*lnwire.ChannelReestablish); ok { - l.handleUpstreamMsg(msg) - return nil - } else { - return errors.New("very first message between nodes " + - "for channel link should be reestablish message") - } - - case pkt := <-l.downstream: - l.overflowQueue.consume(pkt) - - case cmd := <-l.linkControl: - l.handleControlCommand(cmd) - - // In order to avoid blocking indefinitely, we'll give the other peer - // an upper timeout of 5 seconds to respond before we bail out early. - case <-expired: - return errors.Errorf("peer did not complete handshake for channel " + - "link within 5 seconds") - } - } -}