diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 930960d0..528dd353 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -31,6 +31,10 @@ const ( expiryGraceDelta = 2 ) +// ErrInternalLinkFailure is a generic error returned to the remote party so as +// to obfuscate the true failure. +var ErrInternalLinkFailure = errors.New("internal link failure") + // ForwardingPolicy describes the set of constraints that a given ChannelLink // is to adhere to when forwarding HTLC's. For each incoming HTLC, this set of // constraints will be consulted in order to ensure that adequate fees are @@ -119,13 +123,21 @@ type ChannelLinkConfig struct { // targeted at a given ChannelLink concrete interface implementation. FwrdingPolicy ForwardingPolicy - // Switch is a subsystem which is used to forward the incoming HTLC - // packets according to the encoded hop forwarding information - // contained in the forwarding blob within each HTLC. - // - // TODO(roasbeef): remove in favor of simple ForwardPacket closure func + // Circuits provides restricted access to the switch's circuit map, + // allowing the link to open and close circuits. + Circuits CircuitModifier + + // Switch provides a reference to the HTLC switch, we only use this in + // testing to access circuit operations not typically exposed by the + // CircuitModifier. + // TODO(conner): remove after refactoring htlcswitch testing framework. Switch *Switch + // ForwardPackets attempts to forward the batch of htlcs through the + // switch. Any failed packets will be returned to the provided + // ChannelLink. + ForwardPackets func(...*htlcPacket) chan error + // DecodeHopIterator function is responsible for decoding HTLC Sphinx // onion blob, and creating hop iterator which will give us next // destination of HTLC. @@ -209,9 +221,20 @@ type ChannelLinkConfig struct { // coalesced into a single commit. BatchTicker Ticker + // FwdPkgGCTicker is the ticker determining the frequency at which + // garbage collection of forwarding packages occurs. We use a time-based + // approach, as opposed to block epochs, as to not hinder syncing. + FwdPkgGCTicker Ticker + // BatchSize is the max size of a batch of updates done to the link // before we do a state update. BatchSize uint32 + + // UnsafeReplay will cause a link to replay the adds in its latest + // commitment txn after the link is restarted. This should only be used + // in testing, it is here to ensure the sphinx replay detection on the + // receiving node is persistent. + UnsafeReplay bool } // channelLink is the service which drives a channel's commitment update @@ -237,6 +260,13 @@ type channelLink struct { // use this information to govern decisions based on HTLC timeouts. bestHeight uint32 + // keystoneBatch represents a volatile list of keystones that must be + // written before attempting to sign the next commitment txn. + keystoneBatch []Keystone + + openedCircuits []CircuitKey + closedCircuits []CircuitKey + // channel is a lightning network channel to which we apply htlc // updates. channel *lnwallet.LightningChannel @@ -252,11 +282,15 @@ type channelLink struct { // been processed because of the commitment transaction overflow. overflowQueue *packetQueue + // startMailBox directs whether or not to start the mailbox when + // starting the link. It may have already been started by the switch. + startMailBox bool + // mailBox is the main interface between the outside world and the // link. All incoming messages will be sent over this mailBox. Messages // include new updates from our connected peer, and new packets to be // forwarded sent by the switch. - mailBox *memoryMailBox + mailBox MailBox // upstream is a channel that new messages sent from the remote peer to // the local peer will be sent across. @@ -295,11 +329,10 @@ type channelLink struct { func NewChannelLink(cfg ChannelLinkConfig, channel *lnwallet.LightningChannel, currentHeight uint32) ChannelLink { - link := &channelLink{ + return &channelLink{ cfg: cfg, channel: channel, shortChanID: channel.ShortChanID(), - mailBox: newMemoryMailBox(), linkControl: make(chan interface{}), // TODO(roasbeef): just do reserve here? logCommitTimer: time.NewTimer(300 * time.Millisecond), @@ -308,11 +341,6 @@ func NewChannelLink(cfg ChannelLinkConfig, channel *lnwallet.LightningChannel, htlcUpdates: make(chan []channeldb.HTLC), quit: make(chan struct{}), } - - link.upstream = link.mailBox.MessageOutBox() - link.downstream = link.mailBox.PacketOutBox() - - return link } // A compile time check to ensure channelLink implements the ChannelLink @@ -347,7 +375,7 @@ func (l *channelLink) Start() error { } }() - l.mailBox.Start() + l.mailBox.ResetMessages() l.overflowQueue.Start() l.wg.Add(1) @@ -374,7 +402,6 @@ func (l *channelLink) Stop() { l.channel.Stop() - l.mailBox.Stop() l.overflowQueue.Stop() close(l.quit) @@ -500,10 +527,16 @@ func (l *channelLink) syncChanStates() error { log.Infof("Received re-establishment message from remote side "+ "for channel(%v)", l.channel.ChannelPoint()) + var ( + openedCircuits []CircuitKey + closedCircuits []CircuitKey + ) + // We've just received a ChnSync message from the remote party, // so we'll process the message in order to determine if we // need to re-transmit any messages to the remote party. - msgsToReSend, _, _, err = l.channel.ProcessChanSyncMsg(remoteChanSyncMsg) + msgsToReSend, openedCircuits, closedCircuits, err = + l.channel.ProcessChanSyncMsg(remoteChanSyncMsg) if err != nil { // TODO(roasbeef): check concrete type of error, act // accordingly @@ -511,6 +544,17 @@ func (l *channelLink) syncChanStates() error { "message: %v", err) } + // Repopulate any identifiers for circuits that may have been + // opened or unclosed. + l.openedCircuits = openedCircuits + l.closedCircuits = closedCircuits + + // Ensure that all packets have been have been removed from the + // link's mailbox. + if err := l.ackDownStreamPackets(true); err != nil { + return err + } + if len(msgsToReSend) > 0 { log.Infof("Sending %v updates to synchronize the "+ "state for ChannelPoint(%v)", len(msgsToReSend), @@ -532,79 +576,128 @@ func (l *channelLink) syncChanStates() error { "deadline") } - // In order to prep for the fragment below, we'll note if we - // retransmitted any HTLC's settles earlier. We'll track them by the - // HTLC index of the remote party in order to avoid erroneously sending - // a duplicate settle. - htlcsSettled := make(map[uint64]struct{}) - for _, msg := range msgsToReSend { - settleMsg, ok := msg.(*lnwire.UpdateFulfillHTLC) - if !ok { - // If this isn't a settle message, then we'll skip it. - continue - } + return nil +} - // Otherwise, we'll note the ID of the HTLC we're settling so we - // don't duplicate it below. - htlcsSettled[settleMsg.ID] = struct{}{} +// resolveFwdPkgs loads any forwarding packages for this link from disk, and +// reprocesses them in order. The primary goal is to make sure that any HTLCs we +// previously received are reinstated in memory, and forwarded to the switch if +// necessary. After a restart, this will also delete any previously completed +// packages. +func (l *channelLink) resolveFwdPkgs() error { + fwdPkgs, err := l.channel.LoadFwdPkgs() + if err != nil { + return err } - // Now that we've synchronized our state, we'll check to see if - // there're any HTLC's that we received, but weren't able to settle - // directly the last time we were active. If we find any, then we'll - // send the settle message, then being to initiate a state transition. - // - // TODO(roasbeef): can later just inspect forwarding package - activeHTLCs := l.channel.ActiveHtlcs() - for _, htlc := range activeHTLCs { - if !htlc.Incoming { - continue - } + l.debugf("loaded %d fwd pks", len(fwdPkgs)) - // Before we attempt to settle this HTLC, we'll check to see if - // we just re-sent it as part of the channel sync. If so, then - // we'll skip it. - if _, ok := htlcsSettled[htlc.HtlcIndex]; ok { - continue - } - - // Now we'll check to if we we actually know the preimage if we - // don't then we'll skip it. - preimage, ok := l.cfg.PreimageCache.LookupPreimage(htlc.RHash[:]) - if !ok { - continue - } - - // At this point, we've found an unsettled HTLC that we know - // the preimage to, so we'll send a settle message to the - // remote party. - var p [32]byte - copy(p[:], preimage) - err := l.channel.SettleHTLC(p, htlc.HtlcIndex, nil, nil, nil) + var needUpdate bool + for _, fwdPkg := range fwdPkgs { + hasUpdate, err := l.resolveFwdPkg(fwdPkg) if err != nil { - l.fail("unable to settle htlc: %v", err) return err } - // We'll now mark the HTLC as settled in the invoice database, - // then send the settle message to the remote party. - err = l.cfg.Registry.SettleInvoice(htlc.RHash) - if err != nil { - l.fail("unable to settle invoice: %v", err) - return err - } - l.batchCounter++ - l.cfg.Peer.SendMessage(&lnwire.UpdateFulfillHTLC{ - ChanID: l.ChanID(), - ID: htlc.HtlcIndex, - PaymentPreimage: p, - }) + needUpdate = needUpdate || hasUpdate + } + // If any of our reprocessing steps require an update to the commitment + // txn, we initiate a state transition to capture all relevant changes. + if needUpdate { + return l.updateCommitTx() } return nil } +// resolveFwdPkg interprets the FwdState of the provided package, either +// reprocesses any outstanding htlcs in the package, or performs garbage +// collection on the package. +func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) (bool, error) { + // Remove any completed packages to clear up space. + if fwdPkg.State == channeldb.FwdStateCompleted { + l.debugf("removing completed fwd pkg for height=%d", + fwdPkg.Height) + + err := l.channel.RemoveFwdPkg(fwdPkg.Height) + if err != nil { + l.errorf("unable to remove fwd pkg for height=%d: %v", + fwdPkg.Height, err) + return false, err + } + } + + // Otherwise this is either a new package or one has gone through + // processing, but contains htlcs that need to be restored in memory. We + // replay this forwarding package to make sure our local mem state is + // resurrected, we mimic any original responses back to the remote + // party, and reforward the relevant HTLCs to the switch. + + // If the package is fully acked but not completed, it must still have + // settles and fails to propagate. + if !fwdPkg.SettleFailFilter.IsFull() { + settleFails := lnwallet.PayDescsFromRemoteLogUpdates( + fwdPkg.Source, fwdPkg.Height, fwdPkg.SettleFails, + ) + l.processRemoteSettleFails(fwdPkg, settleFails) + } + + // Finally, replay *ALL ADDS* in this forwarding package. The downstream + // logic is able to filter out any duplicates, but we must shove the + // entire, original set of adds down the pipeline so that the batch of + // adds presented to the sphinx router does not ever change. + var needUpdate bool + if !fwdPkg.AckFilter.IsFull() { + adds := lnwallet.PayDescsFromRemoteLogUpdates( + fwdPkg.Source, fwdPkg.Height, fwdPkg.Adds, + ) + needUpdate = l.processRemoteAdds(fwdPkg, adds) + } + + return needUpdate, nil +} + +// fwdPkgGarbager periodically reads all forwarding packages from disk and +// removes those that can be discarded. It is safe to do this entirely in the +// background, since all state is coordinated on disk. This also ensures the +// link can continue to process messages and interleave database accesses. +// +// NOTE: This MUST be run as a goroutine. +func (l *channelLink) fwdPkgGarbager() { + defer l.wg.Done() + + fwdPkgGcTick := l.cfg.FwdPkgGCTicker.Start() + defer l.cfg.FwdPkgGCTicker.Stop() + + for { + select { + case <-fwdPkgGcTick: + fwdPkgs, err := l.channel.LoadFwdPkgs() + if err != nil { + l.warnf("unable to load fwdpkgs for gc: %v", err) + continue + } + + // TODO(conner): batch removal of forward packages. + for _, fwdPkg := range fwdPkgs { + if fwdPkg.State != channeldb.FwdStateCompleted { + continue + } + + err = l.channel.RemoveFwdPkg(fwdPkg.Height) + if err != nil { + l.warnf("unable to remove fwd pkg "+ + "for height=%d: %v", + fwdPkg.Height, err) + } + } + case <-l.quit: + return + } + } +} + // htlcManager is the primary goroutine which drives a channel's commitment // update state-machine in response to messages received via several channels. // This goroutine reads messages from the upstream (remote) peer, and also from @@ -625,6 +718,24 @@ func (l *channelLink) htlcManager() { log.Infof("HTLC manager for ChannelPoint(%v) started, "+ "bandwidth=%v", l.channel.ChannelPoint(), l.Bandwidth()) + // Before handling any messages, revert any circuits that were marked + // open in the switch's circuit map, but did not make it into a + // commitment txn. We use the next local htlc index as the cut off + // point, since all indexes below that are committed. + // + // NOTE: This is automatically done by the switch when it starts up, but + // is necessary to prevent inconsistencies in the case that the link + // flaps. This is a result of a link's life-cycle being shorter than + // that of the switch. + localHtlcIndex := l.channel.LocalHtlcIndex() + err := l.cfg.Circuits.TrimOpenCircuits(l.ShortChanID(), localHtlcIndex) + if err != nil { + l.errorf("unable to trim circuits above local htlc index %d: %v", + localHtlcIndex, err) + l.fail(ErrInternalLinkFailure.Error()) + return + } + // TODO(roasbeef): need to call wipe chan whenever D/C? // If this isn't the first time that this channel link has been @@ -634,11 +745,34 @@ func (l *channelLink) htlcManager() { if l.cfg.SyncStates { // TODO(roasbeef): need to ensure haven't already settled? if err := l.syncChanStates(); err != nil { + l.errorf("unable to synchronize channel states: %v", err) l.fail(err.Error()) return } } + // With the channel states synced, we now reset the mailbox to ensure we + // start processing all unacked packets in order. This is done here to + // ensure that all acknowledgments that occur during channel + // resynchronization have taken affect, causing us only to pull unacked + // packets after starting to read from the downstream mailbox. + l.mailBox.ResetPackets() + + // After cleaning up any memory pertaining to incoming packets, we now + // replay our forwarding packages to handle any htlcs that can be + // processed locally, or need to be forwarded out to the switch. + if err := l.resolveFwdPkgs(); err != nil { + l.errorf("unable to resolve fwd pkgs: %v", err) + l.fail(ErrInternalLinkFailure.Error()) + return + } + + // With our link's in-memory state fully reconstructed, spawn a + // goroutine to manage the reclamation of disk space occupied by + // completed forwarding packages. + l.wg.Add(1) + go l.fwdPkgGarbager() + batchTick := l.cfg.BatchTicker.Start() defer l.cfg.BatchTicker.Stop() @@ -815,11 +949,13 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { var isSettle bool switch htlc := pkt.htlc.(type) { case *lnwire.UpdateAddHTLC: + // A new payment has been initiated via the downstream channel, // so we add the new HTLC to our local log, then update the // commitment chains. htlc.ChanID = l.ChanID() - index, err := l.channel.AddHTLC(htlc, nil) + openCircuitRef := pkt.inKey() + index, err := l.channel.AddHTLC(htlc, &openCircuitRef) if err != nil { switch err { @@ -871,17 +1007,28 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { failPkt := &htlcPacket{ incomingChanID: pkt.incomingChanID, incomingHTLCID: pkt.incomingHTLCID, - amount: htlc.Amount, - isRouted: true, + circuit: pkt.circuit, + sourceRef: pkt.sourceRef, + hasSource: true, localFailure: localFailure, htlc: &lnwire.UpdateFailHTLC{ Reason: reason, }, } - // TODO(roasbeef): need to identify if sent - // from switch so don't need to obfuscate - go l.cfg.Switch.forward(failPkt) + go l.forwardBatch(failPkt) + + // Remove this packet from the link's mailbox, + // this prevents it from being reprocessed if + // the link restarts and resets it mailbox. If + // this response doesn't make it back to the + // originating link, it will be rejected upon + // attempting to reforward the Add to the + // switch, since the circuit was never fully + // opened, and the forwarding package shows it + // as unacknowledged. + l.mailBox.AckPacket(pkt.inKey()) + return } } @@ -890,39 +1037,41 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { "local_log_index=%v, batch_size=%v", htlc.PaymentHash[:], index, l.batchCounter+1) - // Create circuit (remember the path) in order to forward - // settle/fail packet back. - l.cfg.Switch.addCircuit(&PaymentCircuit{ - PaymentHash: htlc.PaymentHash, - IncomingChanID: pkt.incomingChanID, - IncomingHTLCID: pkt.incomingHTLCID, - IncomingAmt: pkt.incomingHtlcAmt, - OutgoingChanID: l.ShortChanID(), - OutgoingHTLCID: index, - OutgoingAmt: htlc.Amount, - ErrorEncrypter: pkt.obfuscator, - }) - + pkt.outgoingChanID = l.ShortChanID() + pkt.outgoingHTLCID = index htlc.ID = index + + l.debugf("Queueing keystone of ADD open circuit: %s->%s", + pkt.inKey(), pkt.outKey()) + + l.openedCircuits = append(l.openedCircuits, pkt.inKey()) + l.keystoneBatch = append(l.keystoneBatch, pkt.keystone()) + l.cfg.Peer.SendMessage(htlc) case *lnwire.UpdateFulfillHTLC: // An HTLC we forward to the switch has just settled somewhere // upstream. Therefore we settle the HTLC within the our local // state machine. - err := l.channel.SettleHTLC( + + closedCircuitRef := pkt.inKey() + if err := l.channel.SettleHTLC( htlc.PaymentPreimage, pkt.incomingHTLCID, - nil, - nil, - nil, - ) - if err != nil { + pkt.sourceRef, + pkt.destRef, + &closedCircuitRef, + ); err != nil { // TODO(roasbeef): broadcast on-chain l.fail("unable to settle incoming HTLC: %v", err) return } + l.debugf("Queueing removal of SETTLE closed circuit: %s->%s", + pkt.inKey(), pkt.outKey()) + + l.closedCircuits = append(l.closedCircuits, pkt.inKey()) + // With the HTLC settled, we'll need to populate the wire // message to target the specific channel and HTLC to be // cancelled. @@ -937,18 +1086,23 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { case *lnwire.UpdateFailHTLC: // An HTLC cancellation has been triggered somewhere upstream, // we'll remove then HTLC from our local state machine. - err := l.channel.FailHTLC( + closedCircuitRef := pkt.inKey() + if err := l.channel.FailHTLC( pkt.incomingHTLCID, htlc.Reason, - nil, - nil, - nil, - ) - if err != nil { + pkt.sourceRef, + pkt.destRef, + &closedCircuitRef, + ); err != nil { log.Errorf("unable to cancel HTLC: %v", err) return } + l.debugf("Queueing removal of FAIL closed circuit: %s->%s", + pkt.inKey(), pkt.outKey()) + + l.closedCircuits = append(l.closedCircuits, pkt.inKey()) + // With the HTLC removed, we'll need to populate the wire // message to target the specific channel and HTLC to be // cancelled. The "Reason" field will have already been set @@ -1141,32 +1295,21 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // We've received a revocation from the remote chain, if valid, // this moves the remote chain forward, and expands our // revocation window. - _, adds, settleFails, err := l.channel.ReceiveRevocation(msg) + fwdPkg, adds, settleFails, err := l.channel.ReceiveRevocation(msg) if err != nil { l.fail("unable to accept revocation: %v", err) return } - // After we treat HTLCs as included in both remote/local - // commitment transactions they might be safely propagated over - // htlc switch or settled if our node was last node in htlc - // path. - htlcs := append(settleFails, adds...) - htlcsToForward := l.processLockedInHtlcs(htlcs) - go func() { - log.Debugf("ChannelPoint(%v) forwarding %v HTLC's", - l.channel.ChannelPoint(), len(htlcsToForward)) - for _, packet := range htlcsToForward { - if err := l.cfg.Switch.forward(packet); err != nil { - // TODO(roasbeef): cancel back htlc - // under certain conditions? - log.Errorf("channel link(%v): "+ - "unhandled error while forwarding "+ - "htlc packet over htlc "+ - "switch: %v", l, err) - } + l.processRemoteSettleFails(fwdPkg, settleFails) + + needUpdate := l.processRemoteAdds(fwdPkg, adds) + if needUpdate { + if err := l.updateCommitTx(); err != nil { + l.fail("unable to update commitment: %v", err) + return } - }() + } case *lnwire.UpdateFee: // We received fee update from peer. If we are the initiator we @@ -1179,10 +1322,97 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { } } +// ackDownStreamPackets is responsible for removing htlcs from a link's +// mailbox for packets delivered from server, and cleaning up any circuits +// closed by signing a previous commitment txn. This method ensures that the +// circuits are removed from the circuit map before removing them from the +// link's mailbox, otherwise it could be possible for some circuit to be missed +// if this link flaps. +// +// The `forgive` flag allows this method to tolerate restarts, and ignores +// errors that could be caused by a previous circuit deletion. Under normal +// operation, this is set to false so that we would fail the link if we were +// unable to remove a circuit. +func (l *channelLink) ackDownStreamPackets(forgive bool) error { + // First, remove the downstream Add packets that were included in the + // previous commitment signature. This will prevent the Adds from being + // replayed if this link disconnects. + for _, inKey := range l.openedCircuits { + // In order to test the sphinx replay logic of the remote party, + // unsafe replay does not acknowledge the packets from the + // mailbox. We can then force a replay of any Add packets held + // in memory by disconnecting and reconnecting the link. + if l.cfg.UnsafeReplay { + continue + } + + l.debugf("Removing Add packet %s from mailbox", inKey) + l.mailBox.AckPacket(inKey) + } + + // Now, we will delete all circuits closed by the previous commitment + // signature, which is the result of downstream Settle/Fail packets. We + // batch them here to ensure circuits are closed atomically and for + // performance. + err := l.cfg.Circuits.DeleteCircuits(l.closedCircuits...) + switch err { + case nil: + // Successful deletion. + + case ErrUnknownCircuit: + if forgive { + // After a restart, we may have already removed this + // circuit. Since it shouldn't be possible for a circuit + // to be closed by different htlcs, we assume this error + // signals that the whole batch was successfully + // removed. + l.warnf("Forgiving unknown circuit error after " + + "attempting deletion, circuit was probably " + + "removed before shutting down.") + break + } + + return err + + default: + l.errorf("unable to delete %d circuits: %v", + len(l.closedCircuits), err) + return err + } + + // With the circuits removed from memory and disk, we now ack any + // Settle/Fails in the mailbox to ensure they do not get redelivered + // after startup. If forgive is enabled and we've reached this point, + // the circuits must have been removed at some point, so it is now safe + // to unqueue the corresponding Settle/Fails. + for _, inKey := range l.closedCircuits { + l.debugf("Removing Fail/Settle packet %s from mailbox", inKey) + l.mailBox.AckPacket(inKey) + } + + // Lastly, reset our buffers to be empty while keeping any acquired + // growth in the backing array. + l.openedCircuits = l.openedCircuits[:0] + l.closedCircuits = l.closedCircuits[:0] + + return nil +} + // updateCommitTx signs, then sends an update to the remote peer adding a new // commitment to their commitment chain which includes all the latest updates // we've received+processed up to this point. func (l *channelLink) updateCommitTx() error { + // Preemptively write all pending keystones to disk, just in case the + // HTLCs we have in memory are included in the subsequent attempt to + // sign a commitment state. + err := l.cfg.Circuits.OpenCircuits(l.keystoneBatch...) + if err != nil { + return err + } + + // Reset the batch, but keep the backing buffer to avoid reallocating. + l.keystoneBatch = l.keystoneBatch[:0] + theirCommitSig, htlcSigs, err := l.channel.SignNextCommitment() if err == lnwallet.ErrNoWindow { log.Tracef("revocation window exhausted, unable to send %v", @@ -1192,6 +1422,10 @@ func (l *channelLink) updateCommitTx() error { return err } + if err := l.ackDownStreamPackets(false); err != nil { + return err + } + commitSig := &lnwire.CommitSig{ ChanID: l.ChanID(), CommitSig: theirCommitSig, @@ -1231,8 +1465,6 @@ func (l *channelLink) Peer() Peer { // // NOTE: Part of the ChannelLink interface. func (l *channelLink) ShortChanID() lnwire.ShortChannelID { - l.RLock() - defer l.RUnlock() return l.shortChanID } @@ -1301,6 +1533,17 @@ func (l *channelLink) Bandwidth() lnwire.MilliSatoshi { return linkBandwidth - reserve } +// AttachMailBox updates the current mailbox used by this link, and hooks up the +// mailbox's message and packet outboxes to the link's upstream and downstream +// chans, respectively. +func (l *channelLink) AttachMailBox(mailbox MailBox) { + l.Lock() + l.mailBox = mailbox + l.upstream = mailbox.MessageOutBox() + l.downstream = mailbox.PacketOutBox() + l.Unlock() +} + // policyUpdate is a message sent to a channel link when an outside sub-system // wishes to update the current forwarding policy. type policyUpdate struct { @@ -1357,8 +1600,11 @@ func (l *channelLink) String() string { // another peer or if the update was created by user // // NOTE: Part of the ChannelLink interface. -func (l *channelLink) HandleSwitchPacket(packet *htlcPacket) { - l.mailBox.AddPacket(packet) +func (l *channelLink) HandleSwitchPacket(pkt *htlcPacket) error { + l.tracef("received switch packet inkey=%v, outkey=%v", + pkt.inKey(), pkt.outKey()) + l.mailBox.AddPacket(pkt) + return nil } // HandleChannelUpdate handles the htlc requests as settle/add/fail which sent @@ -1379,8 +1625,8 @@ func (l *channelLink) updateChannelFee(feePerKw lnwallet.SatPerKWeight) error { // We skip sending the UpdateFee message if the channel is not // currently eligible to forward messages. if !l.EligibleToForward() { - log.Debugf("ChannelPoint(%v): skipping fee update for " + - "inactive channel") + log.Debugf("ChannelPoint(%v): skipping fee update for "+ + "inactive channel", l.ChanID()) return nil } @@ -1398,22 +1644,29 @@ func (l *channelLink) updateChannelFee(feePerKw lnwallet.SatPerKWeight) error { return l.updateCommitTx() } -// processLockedInHtlcs serially processes each of the log updates which have -// been "locked-in". An HTLC is considered locked-in once it has been fully -// committed to in both the remote and local commitment state. Once a channel -// updates is locked-in, then it can be acted upon, meaning: settling HTLCs, -// cancelling them, or forwarding new HTLCs to the next hop. -func (l *channelLink) processLockedInHtlcs( - paymentDescriptors []*lnwallet.PaymentDescriptor) []*htlcPacket { +// processRemoteSettleFails accepts a batch of settle/fail payment descriptors +// after receiving a revocation from the remote party, and reprocesses them in +// the context of the provided forwarding package. Any settles or fails that +// have already been acknowledged in the forwarding package will not be sent to +// the switch. +func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg, + settleFails []*lnwallet.PaymentDescriptor) { - var ( - needUpdate bool - packetsToForward []*htlcPacket - ) + if len(settleFails) == 0 { + return + } + + log.Debugf("ChannelLink(%v): settle-fail-filter %v", + l.ShortChanID(), fwdPkg.SettleFailFilter) + + var switchPackets []*htlcPacket + for i, pd := range settleFails { + // Skip any settles or fails that have already been acknowledged + // by the incoming link that originated the forwarded Add. + if fwdPkg.SettleFailFilter.Contains(uint16(i)) { + continue + } - for _, pd := range paymentDescriptors { - // TODO(roasbeef): rework log entries to a shared - // interface. switch pd.EntryType { // A settle for an HTLC we previously forwarded HTLC has been @@ -1423,7 +1676,7 @@ func (l *channelLink) processLockedInHtlcs( settlePacket := &htlcPacket{ outgoingChanID: l.ShortChanID(), outgoingHTLCID: pd.ParentIndex, - amount: pd.Amount, + destRef: pd.DestRef, htlc: &lnwire.UpdateFulfillHTLC{ PaymentPreimage: pd.RPreimage, }, @@ -1432,7 +1685,7 @@ func (l *channelLink) processLockedInHtlcs( // Add the packet to the batch to be forwarded, and // notify the overflow queue that a spare spot has been // freed up within the commitment state. - packetsToForward = append(packetsToForward, settlePacket) + switchPackets = append(switchPackets, settlePacket) l.overflowQueue.SignalFreeSlot() // A failureCode message for a previously forwarded HTLC has @@ -1445,7 +1698,7 @@ func (l *channelLink) processLockedInHtlcs( failPacket := &htlcPacket{ outgoingChanID: l.ShortChanID(), outgoingHTLCID: pd.ParentIndex, - amount: pd.Amount, + destRef: pd.DestRef, htlc: &lnwire.UpdateFailHTLC{ Reason: lnwire.OpaqueReason(pd.FailReason), }, @@ -1454,8 +1707,83 @@ func (l *channelLink) processLockedInHtlcs( // Add the packet to the batch to be forwarded, and // notify the overflow queue that a spare spot has been // freed up within the commitment state. - packetsToForward = append(packetsToForward, failPacket) + switchPackets = append(switchPackets, failPacket) l.overflowQueue.SignalFreeSlot() + } + } + + go l.forwardBatch(switchPackets...) +} + +// processRemoteAdds serially processes each of the Add payment descriptors +// which have been "locked-in" by receiving a revocation from the remote party. +// The forwarding package provided instructs how to process this batch, +// indicating whether this is the first time these Adds are being processed, or +// whether we are reprocessing as a result of a failure or restart. Adds that +// have already been acknowledged in the forwarding package will be ignored. +func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, + lockedInHtlcs []*lnwallet.PaymentDescriptor) bool { + + l.tracef("processing %d remote adds for height %d", + len(lockedInHtlcs), fwdPkg.Height) + + decodeReqs := make([]DecodeHopIteratorRequest, 0, len(lockedInHtlcs)) + for _, pd := range lockedInHtlcs { + switch pd.EntryType { + + // TODO(conner): remove type switch? + case lnwallet.Add: + // Before adding the new htlc to the state machine, + // parse the onion object in order to obtain the + // routing information with DecodeHopIterator function + // which process the Sphinx packet. + onionReader := bytes.NewReader(pd.OnionBlob) + + req := DecodeHopIteratorRequest{ + OnionReader: onionReader, + RHash: pd.RHash[:], + IncomingCltv: pd.Timeout, + } + + decodeReqs = append(decodeReqs, req) + } + } + + // Atomically decode the incoming htlcs, simultaneously checking for + // replay attempts. A particular index in the returned, spare list of + // channel iterators should only be used if the failure code at the same + // index is lnwire.FailCodeNone. + decodeResps, sphinxErr := l.cfg.DecodeHopIterators( + fwdPkg.ID(), decodeReqs, + ) + if sphinxErr != nil { + l.errorf("unable to decode hop iterators: %v", sphinxErr) + l.fail(ErrInternalLinkFailure.Error()) + return false + } + + var ( + needUpdate bool + switchPackets []*htlcPacket + ) + + for i, pd := range lockedInHtlcs { + idx := uint16(i) + + if fwdPkg.State == channeldb.FwdStateProcessed && + fwdPkg.AckFilter.Contains(idx) { + + // If this index is already found in the ack filter, the + // response to this forwarding decision has already been + // committed by one of our commitment txns. ADDs in this + // state are waiting for the rest of the fwding package + // to get acked before being garbage collected. + continue + } + + // TODO(roasbeef): rework log entries to a shared + // interface. + switch pd.EntryType { // An incoming HTLC add has been full-locked in. As a result we // can now examine the forwarding details of the HTLC, and the @@ -1472,23 +1800,13 @@ func (l *channelLink) processLockedInHtlcs( // parse the onion object in order to obtain the // routing information with DecodeHopIterator function // which process the Sphinx packet. - // - // We include the payment hash of the htlc as it's - // authenticated within the Sphinx packet itself as - // associated data in order to thwart attempts a replay - // attacks. In the case of a replay, an attacker is - // *forced* to use the same payment hash twice, thereby - // losing their money entirely. - onionReader := bytes.NewReader(onionBlob[:]) - chanIterator, failureCode := l.cfg.DecodeHopIterator( - onionReader, pd.RHash[:], pd.Timeout, - ) + chanIterator, failureCode := decodeResps[i].Result() if failureCode != lnwire.CodeNone { // If we're unable to process the onion blob // than we should send the malformed htlc error // to payment sender. l.sendMalformedHTLCError(pd.HtlcIndex, failureCode, - onionBlob[:]) + onionBlob[:], pd.SourceRef) needUpdate = true log.Errorf("unable to decode onion hop "+ @@ -1507,7 +1825,7 @@ func (l *channelLink) processLockedInHtlcs( // than we should send the malformed htlc error // to payment sender. l.sendMalformedHTLCError(pd.HtlcIndex, failureCode, - onionBlob[:]) + onionBlob[:], pd.SourceRef) needUpdate = true log.Errorf("unable to decode onion "+ @@ -1520,6 +1838,7 @@ func (l *channelLink) processLockedInHtlcs( fwdInfo := chanIterator.ForwardingInstructions() switch fwdInfo.NextHop { case exitHop: + if l.cfg.DebugHTLC && l.cfg.HodlHTLC { log.Warnf("hodl HTLC mode enabled, " + "will not attempt to settle " + @@ -1538,7 +1857,8 @@ func (l *channelLink) processLockedInHtlcs( pd.Timeout, heightNow) failure := lnwire.FailFinalIncorrectCltvExpiry{} - l.sendHTLCError(pd.HtlcIndex, &failure, obfuscator) + l.sendHTLCError(pd.HtlcIndex, &failure, + obfuscator, pd.SourceRef) needUpdate = true continue } @@ -1553,23 +1873,34 @@ func (l *channelLink) processLockedInHtlcs( log.Errorf("unable to query invoice registry: "+ " %v", err) failure := lnwire.FailUnknownPaymentHash{} - l.sendHTLCError(pd.HtlcIndex, failure, obfuscator) + l.sendHTLCError(pd.HtlcIndex, failure, + obfuscator, pd.SourceRef) needUpdate = true continue } - // If this invoice has already been settled, - // then we'll reject it as we don't allow an - // invoice to be paid twice. - if invoice.Terms.Settled == true { - log.Warnf("Rejecting duplicate "+ + // If the invoice is already settled, we choose + // to accept the payment to simplify failure + // recovery. + // + // NOTE: Though our recovery and forwarding logic is + // predominately batched, settling invoices + // happens iteratively. We may reject one of of + // two payments for the same rhash at first, but + // then restart and reject both after seeing + // that the invoice has been settled. Without + // any record of which one settles first, it is + // ambiguous as to which one actually settled + // the invoice. Thus, by accepting all payments, + // we eliminate the race condition that can lead + // to this inconsistency. + // + // TODO(conner): track ownership of settlements + // to properly recover from failures? or add + // batch invoice settlement + if invoice.Terms.Settled { + log.Warnf("Accepting duplicate "+ "payment for hash=%x", pd.RHash[:]) - failure := lnwire.FailUnknownPaymentHash{} - l.sendHTLCError( - pd.HtlcIndex, failure, obfuscator, - ) - needUpdate = true - continue } // If we're not currently in debug mode, and @@ -1591,7 +1922,8 @@ func (l *channelLink) processLockedInHtlcs( "amount: expected %v, received %v", invoice.Terms.Value, pd.Amount) failure := lnwire.FailIncorrectPaymentAmount{} - l.sendHTLCError(pd.HtlcIndex, failure, obfuscator) + l.sendHTLCError(pd.HtlcIndex, failure, + obfuscator, pd.SourceRef) needUpdate = true continue } @@ -1618,7 +1950,8 @@ func (l *channelLink) processLockedInHtlcs( fwdInfo.AmountToForward) failure := lnwire.FailIncorrectPaymentAmount{} - l.sendHTLCError(pd.HtlcIndex, failure, obfuscator) + l.sendHTLCError(pd.HtlcIndex, failure, + obfuscator, pd.SourceRef) needUpdate = true continue } @@ -1640,7 +1973,9 @@ func (l *channelLink) processLockedInHtlcs( failure := lnwire.NewFinalIncorrectCltvExpiry( fwdInfo.OutgoingCTLV, ) - l.sendHTLCError(pd.HtlcIndex, failure, obfuscator) + l.sendHTLCError(pd.HtlcIndex, + failure, obfuscator, + pd.SourceRef) needUpdate = true continue case pd.Timeout != fwdInfo.OutgoingCTLV: @@ -1652,28 +1987,33 @@ func (l *channelLink) processLockedInHtlcs( failure := lnwire.NewFinalIncorrectCltvExpiry( fwdInfo.OutgoingCTLV, ) - l.sendHTLCError(pd.HtlcIndex, failure, obfuscator) + l.sendHTLCError(pd.HtlcIndex, + failure, obfuscator, + pd.SourceRef) needUpdate = true continue } } preimage := invoice.Terms.PaymentPreimage - err = l.channel.SettleHTLC(preimage, pd.HtlcIndex, nil, nil, nil) + err = l.channel.SettleHTLC(preimage, + pd.HtlcIndex, pd.SourceRef, nil, nil) if err != nil { l.fail("unable to settle htlc: %v", err) - return nil + return false } - // Notify the invoiceRegistry of the invoices - // we just settled with this latest commitment + // Notify the invoiceRegistry of the invoices we + // just settled with this latest commitment // update. err = l.cfg.Registry.SettleInvoice(invoiceHash) if err != nil { l.fail("unable to settle invoice: %v", err) - return nil + return false } + l.infof("Settling %x as exit hop", pd.RHash) + // HTLC was successfully settled locally send // notification about it remote peer. l.cfg.Peer.SendMessage(&lnwire.UpdateFulfillHTLC{ @@ -1688,6 +2028,53 @@ func (l *channelLink) processLockedInHtlcs( // constraints have been properly met by by this // incoming HTLC. default: + switch fwdPkg.State { + case channeldb.FwdStateProcessed: + if !fwdPkg.FwdFilter.Contains(idx) { + // This add was not forwarded on + // the previous processing + // phase, run it through our + // validation pipeline to + // reproduce an error. This may + // trigger a different error due + // to expiring timelocks, but we + // expect that an error will be + // reproduced. + break + } + + addMsg := &lnwire.UpdateAddHTLC{ + Expiry: fwdInfo.OutgoingCTLV, + Amount: fwdInfo.AmountToForward, + PaymentHash: pd.RHash, + } + + // Finally, we'll encode the onion packet for + // the _next_ hop using the hop iterator + // decoded for the current hop. + buf := bytes.NewBuffer(addMsg.OnionBlob[0:0]) + + // We know this cannot fail, as this ADD + // was marked forwarded in a previous + // round of processing. + chanIterator.EncodeNextHop(buf) + + updatePacket := &htlcPacket{ + incomingChanID: l.ShortChanID(), + incomingHTLCID: pd.HtlcIndex, + outgoingChanID: fwdInfo.NextHop, + sourceRef: pd.SourceRef, + incomingAmount: pd.Amount, + amount: addMsg.Amount, + htlc: addMsg, + obfuscator: obfuscator, + } + switchPackets = append(switchPackets, + updatePacket) + + continue + } + // We want to avoid forwarding an HTLC which // will expire in the near future, so we'll // reject an HTLC if its expiration time is too @@ -1707,7 +2094,8 @@ func (l *channelLink) processLockedInHtlcs( failure = lnwire.NewExpiryTooSoon(*update) } - l.sendHTLCError(pd.HtlcIndex, failure, obfuscator) + l.sendHTLCError(pd.HtlcIndex, failure, + obfuscator, pd.SourceRef) needUpdate = true continue } @@ -1734,7 +2122,8 @@ func (l *channelLink) processLockedInHtlcs( pd.Amount, *update) } - l.sendHTLCError(pd.HtlcIndex, failure, obfuscator) + l.sendHTLCError(pd.HtlcIndex, failure, + obfuscator, pd.SourceRef) needUpdate = true continue } @@ -1779,7 +2168,8 @@ func (l *channelLink) processLockedInHtlcs( *update) } - l.sendHTLCError(pd.HtlcIndex, failure, obfuscator) + l.sendHTLCError(pd.HtlcIndex, failure, + obfuscator, pd.SourceRef) needUpdate = true continue } @@ -1806,12 +2196,13 @@ func (l *channelLink) processLockedInHtlcs( if err != nil { l.fail("unable to create channel update "+ "while handling the error: %v", err) - return nil + return false } failure := lnwire.NewIncorrectCltvExpiry( pd.Timeout, *update) - l.sendHTLCError(pd.HtlcIndex, failure, obfuscator) + l.sendHTLCError(pd.HtlcIndex, failure, + obfuscator, pd.SourceRef) needUpdate = true continue } @@ -1838,42 +2229,107 @@ func (l *channelLink) processLockedInHtlcs( "remaining route %v", err) failure := lnwire.NewTemporaryChannelFailure(nil) - l.sendHTLCError(pd.HtlcIndex, failure, obfuscator) + l.sendHTLCError(pd.HtlcIndex, failure, + obfuscator, pd.SourceRef) needUpdate = true continue } - updatePacket := &htlcPacket{ - incomingChanID: l.ShortChanID(), - incomingHTLCID: pd.HtlcIndex, - outgoingChanID: fwdInfo.NextHop, - incomingHtlcAmt: pd.Amount, - amount: addMsg.Amount, - htlc: addMsg, - obfuscator: obfuscator, + // Now that this add has been reprocessed, only + // append it to our list of packets to forward + // to the switch this is the first time + // processing the add. If the fwd pkg has + // already been processed, then we entered the + // above section to recreate a previous error. + // If the packet had previously been forwarded, + // it would have been added to switchPackets at + // the top of this section. + if fwdPkg.State == channeldb.FwdStateLockedIn { + updatePacket := &htlcPacket{ + incomingChanID: l.ShortChanID(), + incomingHTLCID: pd.HtlcIndex, + outgoingChanID: fwdInfo.NextHop, + sourceRef: pd.SourceRef, + incomingAmount: pd.Amount, + amount: addMsg.Amount, + htlc: addMsg, + obfuscator: obfuscator, + } + + fwdPkg.FwdFilter.Set(idx) + switchPackets = append(switchPackets, + updatePacket) } - packetsToForward = append(packetsToForward, updatePacket) } } } - if needUpdate { - // With all the settle/cancel updates added to the local and - // remote HTLC logs, initiate a state transition by updating - // the remote commitment chain. - if err := l.updateCommitTx(); err != nil { - l.fail("unable to update commitment: %v", err) - return nil + // Commit the htlcs we are intending to forward if this package has not + // been fully processed. + if fwdPkg.State == channeldb.FwdStateLockedIn { + err := l.channel.SetFwdFilter(fwdPkg.Height, fwdPkg.FwdFilter) + if err != nil { + l.fail("unable to set fwd filter: %v", err) + return false } } - return packetsToForward + if len(switchPackets) == 0 { + return needUpdate + } + + l.debugf("forwarding %d packets to switch", len(switchPackets)) + + go l.forwardBatch(switchPackets...) + + return needUpdate +} + +// forwardBatch forwards the given htlcPackets to the switch, and waits on the +// err chan for the individual responses. This method is intended to be spawned +// as a goroutine so the responses can be handled in the background. +func (l *channelLink) forwardBatch(packets ...*htlcPacket) { + // Don't forward packets for which we already have a response in our + // mailbox. This could happen if a packet fails and is buffered in the + // mailbox, and the incoming link flaps. + var filteredPkts = make([]*htlcPacket, 0, len(packets)) + for _, pkt := range packets { + if l.mailBox.HasPacket(pkt.inKey()) { + continue + } + + filteredPkts = append(filteredPkts, pkt) + } + + errChan := l.cfg.ForwardPackets(filteredPkts...) + l.handleBatchFwdErrs(errChan) +} + +// handleBatchFwdErrs waits on the given errChan until it is closed, logging the +// errors returned from any unsuccessful forwarding attempts. +func (l *channelLink) handleBatchFwdErrs(errChan chan error) { + for { + err, ok := <-errChan + if !ok { + // Err chan has been drained or switch is shutting down. + // Either way, return. + return + } + + if err == nil { + continue + } + + l.errorf("unhandled error while forwarding htlc packet over "+ + "htlcswitch: %v", err) + } } // sendHTLCError functions cancels HTLC and send cancel message back to the // peer from which HTLC was received. func (l *channelLink) sendHTLCError(htlcIndex uint64, - failure lnwire.FailureMessage, e ErrorEncrypter) { + failure lnwire.FailureMessage, e ErrorEncrypter, + sourceRef *channeldb.AddRef) { reason, err := e.EncryptFirstHop(failure) if err != nil { @@ -1881,7 +2337,7 @@ func (l *channelLink) sendHTLCError(htlcIndex uint64, return } - err = l.channel.FailHTLC(htlcIndex, reason, nil, nil, nil) + err = l.channel.FailHTLC(htlcIndex, reason, sourceRef, nil, nil) if err != nil { log.Errorf("unable cancel htlc: %v", err) return @@ -1897,10 +2353,10 @@ func (l *channelLink) sendHTLCError(htlcIndex uint64, // sendMalformedHTLCError helper function which sends the malformed HTLC update // to the payment sender. func (l *channelLink) sendMalformedHTLCError(htlcIndex uint64, - code lnwire.FailCode, onionBlob []byte) { + code lnwire.FailCode, onionBlob []byte, sourceRef *channeldb.AddRef) { shaOnionBlob := sha256.Sum256(onionBlob) - err := l.channel.MalformedFailHTLC(htlcIndex, code, shaOnionBlob, nil) + err := l.channel.MalformedFailHTLC(htlcIndex, code, shaOnionBlob, sourceRef) if err != nil { log.Errorf("unable cancel htlc: %v", err) return @@ -1921,3 +2377,33 @@ func (l *channelLink) fail(format string, a ...interface{}) { log.Error(reason) go l.cfg.Peer.Disconnect(reason) } + +// infof prefixes the channel's identifier before printing to info log. +func (l *channelLink) infof(format string, a ...interface{}) { + msg := fmt.Sprintf(format, a...) + log.Infof("ChannelLink(%s) %s", l.ShortChanID(), msg) +} + +// debugf prefixes the channel's identifier before printing to debug log. +func (l *channelLink) debugf(format string, a ...interface{}) { + msg := fmt.Sprintf(format, a...) + log.Debugf("ChannelLink(%s) %s", l.ShortChanID(), msg) +} + +// warnf prefixes the channel's identifier before printing to warn log. +func (l *channelLink) warnf(format string, a ...interface{}) { + msg := fmt.Sprintf(format, a...) + log.Warnf("ChannelLink(%s) %s", l.ShortChanID(), msg) +} + +// errorf prefixes the channel's identifier before printing to error log. +func (l *channelLink) errorf(format string, a ...interface{}) { + msg := fmt.Sprintf(format, a...) + log.Errorf("ChannelLink(%s) %s", l.ShortChanID(), msg) +} + +// tracef prefixes the channel's identifier before printing to trace log. +func (l *channelLink) tracef(format string, a ...interface{}) { + msg := fmt.Sprintf(format, a...) + log.Tracef("ChannelLink(%s) %s", l.ShortChanID(), msg) +}