htlcswitch/link: batches processing of locked in htlcs

This commit is contained in:
Conner Fromknecht 2017-11-26 23:20:17 -08:00
parent 2c3d35bb40
commit 101ad09e9f
No known key found for this signature in database
GPG Key ID: 39DE78FBE6ACB0EF

@ -31,6 +31,10 @@ const (
expiryGraceDelta = 2
)
// ErrInternalLinkFailure is a generic error returned to the remote party so as
// to obfuscate the true failure.
var ErrInternalLinkFailure = errors.New("internal link failure")
// ForwardingPolicy describes the set of constraints that a given ChannelLink
// is to adhere to when forwarding HTLC's. For each incoming HTLC, this set of
// constraints will be consulted in order to ensure that adequate fees are
@ -119,13 +123,21 @@ type ChannelLinkConfig struct {
// targeted at a given ChannelLink concrete interface implementation.
FwrdingPolicy ForwardingPolicy
// Switch is a subsystem which is used to forward the incoming HTLC
// packets according to the encoded hop forwarding information
// contained in the forwarding blob within each HTLC.
//
// TODO(roasbeef): remove in favor of simple ForwardPacket closure func
// Circuits provides restricted access to the switch's circuit map,
// allowing the link to open and close circuits.
Circuits CircuitModifier
// Switch provides a reference to the HTLC switch, we only use this in
// testing to access circuit operations not typically exposed by the
// CircuitModifier.
// TODO(conner): remove after refactoring htlcswitch testing framework.
Switch *Switch
// ForwardPackets attempts to forward the batch of htlcs through the
// switch. Any failed packets will be returned to the provided
// ChannelLink.
ForwardPackets func(...*htlcPacket) chan error
// DecodeHopIterator function is responsible for decoding HTLC Sphinx
// onion blob, and creating hop iterator which will give us next
// destination of HTLC.
@ -209,9 +221,20 @@ type ChannelLinkConfig struct {
// coalesced into a single commit.
BatchTicker Ticker
// FwdPkgGCTicker is the ticker determining the frequency at which
// garbage collection of forwarding packages occurs. We use a time-based
// approach, as opposed to block epochs, as to not hinder syncing.
FwdPkgGCTicker Ticker
// BatchSize is the max size of a batch of updates done to the link
// before we do a state update.
BatchSize uint32
// UnsafeReplay will cause a link to replay the adds in its latest
// commitment txn after the link is restarted. This should only be used
// in testing, it is here to ensure the sphinx replay detection on the
// receiving node is persistent.
UnsafeReplay bool
}
// channelLink is the service which drives a channel's commitment update
@ -237,6 +260,13 @@ type channelLink struct {
// use this information to govern decisions based on HTLC timeouts.
bestHeight uint32
// keystoneBatch represents a volatile list of keystones that must be
// written before attempting to sign the next commitment txn.
keystoneBatch []Keystone
openedCircuits []CircuitKey
closedCircuits []CircuitKey
// channel is a lightning network channel to which we apply htlc
// updates.
channel *lnwallet.LightningChannel
@ -252,11 +282,15 @@ type channelLink struct {
// been processed because of the commitment transaction overflow.
overflowQueue *packetQueue
// startMailBox directs whether or not to start the mailbox when
// starting the link. It may have already been started by the switch.
startMailBox bool
// mailBox is the main interface between the outside world and the
// link. All incoming messages will be sent over this mailBox. Messages
// include new updates from our connected peer, and new packets to be
// forwarded sent by the switch.
mailBox *memoryMailBox
mailBox MailBox
// upstream is a channel that new messages sent from the remote peer to
// the local peer will be sent across.
@ -295,11 +329,10 @@ type channelLink struct {
func NewChannelLink(cfg ChannelLinkConfig, channel *lnwallet.LightningChannel,
currentHeight uint32) ChannelLink {
link := &channelLink{
return &channelLink{
cfg: cfg,
channel: channel,
shortChanID: channel.ShortChanID(),
mailBox: newMemoryMailBox(),
linkControl: make(chan interface{}),
// TODO(roasbeef): just do reserve here?
logCommitTimer: time.NewTimer(300 * time.Millisecond),
@ -308,11 +341,6 @@ func NewChannelLink(cfg ChannelLinkConfig, channel *lnwallet.LightningChannel,
htlcUpdates: make(chan []channeldb.HTLC),
quit: make(chan struct{}),
}
link.upstream = link.mailBox.MessageOutBox()
link.downstream = link.mailBox.PacketOutBox()
return link
}
// A compile time check to ensure channelLink implements the ChannelLink
@ -347,7 +375,7 @@ func (l *channelLink) Start() error {
}
}()
l.mailBox.Start()
l.mailBox.ResetMessages()
l.overflowQueue.Start()
l.wg.Add(1)
@ -374,7 +402,6 @@ func (l *channelLink) Stop() {
l.channel.Stop()
l.mailBox.Stop()
l.overflowQueue.Stop()
close(l.quit)
@ -500,10 +527,16 @@ func (l *channelLink) syncChanStates() error {
log.Infof("Received re-establishment message from remote side "+
"for channel(%v)", l.channel.ChannelPoint())
var (
openedCircuits []CircuitKey
closedCircuits []CircuitKey
)
// We've just received a ChnSync message from the remote party,
// so we'll process the message in order to determine if we
// need to re-transmit any messages to the remote party.
msgsToReSend, _, _, err = l.channel.ProcessChanSyncMsg(remoteChanSyncMsg)
msgsToReSend, openedCircuits, closedCircuits, err =
l.channel.ProcessChanSyncMsg(remoteChanSyncMsg)
if err != nil {
// TODO(roasbeef): check concrete type of error, act
// accordingly
@ -511,6 +544,17 @@ func (l *channelLink) syncChanStates() error {
"message: %v", err)
}
// Repopulate any identifiers for circuits that may have been
// opened or unclosed.
l.openedCircuits = openedCircuits
l.closedCircuits = closedCircuits
// Ensure that all packets have been have been removed from the
// link's mailbox.
if err := l.ackDownStreamPackets(true); err != nil {
return err
}
if len(msgsToReSend) > 0 {
log.Infof("Sending %v updates to synchronize the "+
"state for ChannelPoint(%v)", len(msgsToReSend),
@ -532,79 +576,128 @@ func (l *channelLink) syncChanStates() error {
"deadline")
}
// In order to prep for the fragment below, we'll note if we
// retransmitted any HTLC's settles earlier. We'll track them by the
// HTLC index of the remote party in order to avoid erroneously sending
// a duplicate settle.
htlcsSettled := make(map[uint64]struct{})
for _, msg := range msgsToReSend {
settleMsg, ok := msg.(*lnwire.UpdateFulfillHTLC)
if !ok {
// If this isn't a settle message, then we'll skip it.
continue
}
return nil
}
// Otherwise, we'll note the ID of the HTLC we're settling so we
// don't duplicate it below.
htlcsSettled[settleMsg.ID] = struct{}{}
}
// Now that we've synchronized our state, we'll check to see if
// there're any HTLC's that we received, but weren't able to settle
// directly the last time we were active. If we find any, then we'll
// send the settle message, then being to initiate a state transition.
//
// TODO(roasbeef): can later just inspect forwarding package
activeHTLCs := l.channel.ActiveHtlcs()
for _, htlc := range activeHTLCs {
if !htlc.Incoming {
continue
}
// Before we attempt to settle this HTLC, we'll check to see if
// we just re-sent it as part of the channel sync. If so, then
// we'll skip it.
if _, ok := htlcsSettled[htlc.HtlcIndex]; ok {
continue
}
// Now we'll check to if we we actually know the preimage if we
// don't then we'll skip it.
preimage, ok := l.cfg.PreimageCache.LookupPreimage(htlc.RHash[:])
if !ok {
continue
}
// At this point, we've found an unsettled HTLC that we know
// the preimage to, so we'll send a settle message to the
// remote party.
var p [32]byte
copy(p[:], preimage)
err := l.channel.SettleHTLC(p, htlc.HtlcIndex, nil, nil, nil)
// resolveFwdPkgs loads any forwarding packages for this link from disk, and
// reprocesses them in order. The primary goal is to make sure that any HTLCs we
// previously received are reinstated in memory, and forwarded to the switch if
// necessary. After a restart, this will also delete any previously completed
// packages.
func (l *channelLink) resolveFwdPkgs() error {
fwdPkgs, err := l.channel.LoadFwdPkgs()
if err != nil {
l.fail("unable to settle htlc: %v", err)
return err
}
// We'll now mark the HTLC as settled in the invoice database,
// then send the settle message to the remote party.
err = l.cfg.Registry.SettleInvoice(htlc.RHash)
l.debugf("loaded %d fwd pks", len(fwdPkgs))
var needUpdate bool
for _, fwdPkg := range fwdPkgs {
hasUpdate, err := l.resolveFwdPkg(fwdPkg)
if err != nil {
l.fail("unable to settle invoice: %v", err)
return err
}
l.batchCounter++
l.cfg.Peer.SendMessage(&lnwire.UpdateFulfillHTLC{
ChanID: l.ChanID(),
ID: htlc.HtlcIndex,
PaymentPreimage: p,
})
needUpdate = needUpdate || hasUpdate
}
// If any of our reprocessing steps require an update to the commitment
// txn, we initiate a state transition to capture all relevant changes.
if needUpdate {
return l.updateCommitTx()
}
return nil
}
// resolveFwdPkg interprets the FwdState of the provided package, either
// reprocesses any outstanding htlcs in the package, or performs garbage
// collection on the package.
func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) (bool, error) {
// Remove any completed packages to clear up space.
if fwdPkg.State == channeldb.FwdStateCompleted {
l.debugf("removing completed fwd pkg for height=%d",
fwdPkg.Height)
err := l.channel.RemoveFwdPkg(fwdPkg.Height)
if err != nil {
l.errorf("unable to remove fwd pkg for height=%d: %v",
fwdPkg.Height, err)
return false, err
}
}
// Otherwise this is either a new package or one has gone through
// processing, but contains htlcs that need to be restored in memory. We
// replay this forwarding package to make sure our local mem state is
// resurrected, we mimic any original responses back to the remote
// party, and reforward the relevant HTLCs to the switch.
// If the package is fully acked but not completed, it must still have
// settles and fails to propagate.
if !fwdPkg.SettleFailFilter.IsFull() {
settleFails := lnwallet.PayDescsFromRemoteLogUpdates(
fwdPkg.Source, fwdPkg.Height, fwdPkg.SettleFails,
)
l.processRemoteSettleFails(fwdPkg, settleFails)
}
// Finally, replay *ALL ADDS* in this forwarding package. The downstream
// logic is able to filter out any duplicates, but we must shove the
// entire, original set of adds down the pipeline so that the batch of
// adds presented to the sphinx router does not ever change.
var needUpdate bool
if !fwdPkg.AckFilter.IsFull() {
adds := lnwallet.PayDescsFromRemoteLogUpdates(
fwdPkg.Source, fwdPkg.Height, fwdPkg.Adds,
)
needUpdate = l.processRemoteAdds(fwdPkg, adds)
}
return needUpdate, nil
}
// fwdPkgGarbager periodically reads all forwarding packages from disk and
// removes those that can be discarded. It is safe to do this entirely in the
// background, since all state is coordinated on disk. This also ensures the
// link can continue to process messages and interleave database accesses.
//
// NOTE: This MUST be run as a goroutine.
func (l *channelLink) fwdPkgGarbager() {
defer l.wg.Done()
fwdPkgGcTick := l.cfg.FwdPkgGCTicker.Start()
defer l.cfg.FwdPkgGCTicker.Stop()
for {
select {
case <-fwdPkgGcTick:
fwdPkgs, err := l.channel.LoadFwdPkgs()
if err != nil {
l.warnf("unable to load fwdpkgs for gc: %v", err)
continue
}
// TODO(conner): batch removal of forward packages.
for _, fwdPkg := range fwdPkgs {
if fwdPkg.State != channeldb.FwdStateCompleted {
continue
}
err = l.channel.RemoveFwdPkg(fwdPkg.Height)
if err != nil {
l.warnf("unable to remove fwd pkg "+
"for height=%d: %v",
fwdPkg.Height, err)
}
}
case <-l.quit:
return
}
}
}
// htlcManager is the primary goroutine which drives a channel's commitment
// update state-machine in response to messages received via several channels.
// This goroutine reads messages from the upstream (remote) peer, and also from
@ -625,6 +718,24 @@ func (l *channelLink) htlcManager() {
log.Infof("HTLC manager for ChannelPoint(%v) started, "+
"bandwidth=%v", l.channel.ChannelPoint(), l.Bandwidth())
// Before handling any messages, revert any circuits that were marked
// open in the switch's circuit map, but did not make it into a
// commitment txn. We use the next local htlc index as the cut off
// point, since all indexes below that are committed.
//
// NOTE: This is automatically done by the switch when it starts up, but
// is necessary to prevent inconsistencies in the case that the link
// flaps. This is a result of a link's life-cycle being shorter than
// that of the switch.
localHtlcIndex := l.channel.LocalHtlcIndex()
err := l.cfg.Circuits.TrimOpenCircuits(l.ShortChanID(), localHtlcIndex)
if err != nil {
l.errorf("unable to trim circuits above local htlc index %d: %v",
localHtlcIndex, err)
l.fail(ErrInternalLinkFailure.Error())
return
}
// TODO(roasbeef): need to call wipe chan whenever D/C?
// If this isn't the first time that this channel link has been
@ -634,11 +745,34 @@ func (l *channelLink) htlcManager() {
if l.cfg.SyncStates {
// TODO(roasbeef): need to ensure haven't already settled?
if err := l.syncChanStates(); err != nil {
l.errorf("unable to synchronize channel states: %v", err)
l.fail(err.Error())
return
}
}
// With the channel states synced, we now reset the mailbox to ensure we
// start processing all unacked packets in order. This is done here to
// ensure that all acknowledgments that occur during channel
// resynchronization have taken affect, causing us only to pull unacked
// packets after starting to read from the downstream mailbox.
l.mailBox.ResetPackets()
// After cleaning up any memory pertaining to incoming packets, we now
// replay our forwarding packages to handle any htlcs that can be
// processed locally, or need to be forwarded out to the switch.
if err := l.resolveFwdPkgs(); err != nil {
l.errorf("unable to resolve fwd pkgs: %v", err)
l.fail(ErrInternalLinkFailure.Error())
return
}
// With our link's in-memory state fully reconstructed, spawn a
// goroutine to manage the reclamation of disk space occupied by
// completed forwarding packages.
l.wg.Add(1)
go l.fwdPkgGarbager()
batchTick := l.cfg.BatchTicker.Start()
defer l.cfg.BatchTicker.Stop()
@ -815,11 +949,13 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
var isSettle bool
switch htlc := pkt.htlc.(type) {
case *lnwire.UpdateAddHTLC:
// A new payment has been initiated via the downstream channel,
// so we add the new HTLC to our local log, then update the
// commitment chains.
htlc.ChanID = l.ChanID()
index, err := l.channel.AddHTLC(htlc, nil)
openCircuitRef := pkt.inKey()
index, err := l.channel.AddHTLC(htlc, &openCircuitRef)
if err != nil {
switch err {
@ -871,17 +1007,28 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
failPkt := &htlcPacket{
incomingChanID: pkt.incomingChanID,
incomingHTLCID: pkt.incomingHTLCID,
amount: htlc.Amount,
isRouted: true,
circuit: pkt.circuit,
sourceRef: pkt.sourceRef,
hasSource: true,
localFailure: localFailure,
htlc: &lnwire.UpdateFailHTLC{
Reason: reason,
},
}
// TODO(roasbeef): need to identify if sent
// from switch so don't need to obfuscate
go l.cfg.Switch.forward(failPkt)
go l.forwardBatch(failPkt)
// Remove this packet from the link's mailbox,
// this prevents it from being reprocessed if
// the link restarts and resets it mailbox. If
// this response doesn't make it back to the
// originating link, it will be rejected upon
// attempting to reforward the Add to the
// switch, since the circuit was never fully
// opened, and the forwarding package shows it
// as unacknowledged.
l.mailBox.AckPacket(pkt.inKey())
return
}
}
@ -890,39 +1037,41 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
"local_log_index=%v, batch_size=%v",
htlc.PaymentHash[:], index, l.batchCounter+1)
// Create circuit (remember the path) in order to forward
// settle/fail packet back.
l.cfg.Switch.addCircuit(&PaymentCircuit{
PaymentHash: htlc.PaymentHash,
IncomingChanID: pkt.incomingChanID,
IncomingHTLCID: pkt.incomingHTLCID,
IncomingAmt: pkt.incomingHtlcAmt,
OutgoingChanID: l.ShortChanID(),
OutgoingHTLCID: index,
OutgoingAmt: htlc.Amount,
ErrorEncrypter: pkt.obfuscator,
})
pkt.outgoingChanID = l.ShortChanID()
pkt.outgoingHTLCID = index
htlc.ID = index
l.debugf("Queueing keystone of ADD open circuit: %s->%s",
pkt.inKey(), pkt.outKey())
l.openedCircuits = append(l.openedCircuits, pkt.inKey())
l.keystoneBatch = append(l.keystoneBatch, pkt.keystone())
l.cfg.Peer.SendMessage(htlc)
case *lnwire.UpdateFulfillHTLC:
// An HTLC we forward to the switch has just settled somewhere
// upstream. Therefore we settle the HTLC within the our local
// state machine.
err := l.channel.SettleHTLC(
closedCircuitRef := pkt.inKey()
if err := l.channel.SettleHTLC(
htlc.PaymentPreimage,
pkt.incomingHTLCID,
nil,
nil,
nil,
)
if err != nil {
pkt.sourceRef,
pkt.destRef,
&closedCircuitRef,
); err != nil {
// TODO(roasbeef): broadcast on-chain
l.fail("unable to settle incoming HTLC: %v", err)
return
}
l.debugf("Queueing removal of SETTLE closed circuit: %s->%s",
pkt.inKey(), pkt.outKey())
l.closedCircuits = append(l.closedCircuits, pkt.inKey())
// With the HTLC settled, we'll need to populate the wire
// message to target the specific channel and HTLC to be
// cancelled.
@ -937,18 +1086,23 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
case *lnwire.UpdateFailHTLC:
// An HTLC cancellation has been triggered somewhere upstream,
// we'll remove then HTLC from our local state machine.
err := l.channel.FailHTLC(
closedCircuitRef := pkt.inKey()
if err := l.channel.FailHTLC(
pkt.incomingHTLCID,
htlc.Reason,
nil,
nil,
nil,
)
if err != nil {
pkt.sourceRef,
pkt.destRef,
&closedCircuitRef,
); err != nil {
log.Errorf("unable to cancel HTLC: %v", err)
return
}
l.debugf("Queueing removal of FAIL closed circuit: %s->%s",
pkt.inKey(), pkt.outKey())
l.closedCircuits = append(l.closedCircuits, pkt.inKey())
// With the HTLC removed, we'll need to populate the wire
// message to target the specific channel and HTLC to be
// cancelled. The "Reason" field will have already been set
@ -1141,32 +1295,21 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
// We've received a revocation from the remote chain, if valid,
// this moves the remote chain forward, and expands our
// revocation window.
_, adds, settleFails, err := l.channel.ReceiveRevocation(msg)
fwdPkg, adds, settleFails, err := l.channel.ReceiveRevocation(msg)
if err != nil {
l.fail("unable to accept revocation: %v", err)
return
}
// After we treat HTLCs as included in both remote/local
// commitment transactions they might be safely propagated over
// htlc switch or settled if our node was last node in htlc
// path.
htlcs := append(settleFails, adds...)
htlcsToForward := l.processLockedInHtlcs(htlcs)
go func() {
log.Debugf("ChannelPoint(%v) forwarding %v HTLC's",
l.channel.ChannelPoint(), len(htlcsToForward))
for _, packet := range htlcsToForward {
if err := l.cfg.Switch.forward(packet); err != nil {
// TODO(roasbeef): cancel back htlc
// under certain conditions?
log.Errorf("channel link(%v): "+
"unhandled error while forwarding "+
"htlc packet over htlc "+
"switch: %v", l, err)
l.processRemoteSettleFails(fwdPkg, settleFails)
needUpdate := l.processRemoteAdds(fwdPkg, adds)
if needUpdate {
if err := l.updateCommitTx(); err != nil {
l.fail("unable to update commitment: %v", err)
return
}
}
}()
case *lnwire.UpdateFee:
// We received fee update from peer. If we are the initiator we
@ -1179,10 +1322,97 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
}
}
// ackDownStreamPackets is responsible for removing htlcs from a link's
// mailbox for packets delivered from server, and cleaning up any circuits
// closed by signing a previous commitment txn. This method ensures that the
// circuits are removed from the circuit map before removing them from the
// link's mailbox, otherwise it could be possible for some circuit to be missed
// if this link flaps.
//
// The `forgive` flag allows this method to tolerate restarts, and ignores
// errors that could be caused by a previous circuit deletion. Under normal
// operation, this is set to false so that we would fail the link if we were
// unable to remove a circuit.
func (l *channelLink) ackDownStreamPackets(forgive bool) error {
// First, remove the downstream Add packets that were included in the
// previous commitment signature. This will prevent the Adds from being
// replayed if this link disconnects.
for _, inKey := range l.openedCircuits {
// In order to test the sphinx replay logic of the remote party,
// unsafe replay does not acknowledge the packets from the
// mailbox. We can then force a replay of any Add packets held
// in memory by disconnecting and reconnecting the link.
if l.cfg.UnsafeReplay {
continue
}
l.debugf("Removing Add packet %s from mailbox", inKey)
l.mailBox.AckPacket(inKey)
}
// Now, we will delete all circuits closed by the previous commitment
// signature, which is the result of downstream Settle/Fail packets. We
// batch them here to ensure circuits are closed atomically and for
// performance.
err := l.cfg.Circuits.DeleteCircuits(l.closedCircuits...)
switch err {
case nil:
// Successful deletion.
case ErrUnknownCircuit:
if forgive {
// After a restart, we may have already removed this
// circuit. Since it shouldn't be possible for a circuit
// to be closed by different htlcs, we assume this error
// signals that the whole batch was successfully
// removed.
l.warnf("Forgiving unknown circuit error after " +
"attempting deletion, circuit was probably " +
"removed before shutting down.")
break
}
return err
default:
l.errorf("unable to delete %d circuits: %v",
len(l.closedCircuits), err)
return err
}
// With the circuits removed from memory and disk, we now ack any
// Settle/Fails in the mailbox to ensure they do not get redelivered
// after startup. If forgive is enabled and we've reached this point,
// the circuits must have been removed at some point, so it is now safe
// to unqueue the corresponding Settle/Fails.
for _, inKey := range l.closedCircuits {
l.debugf("Removing Fail/Settle packet %s from mailbox", inKey)
l.mailBox.AckPacket(inKey)
}
// Lastly, reset our buffers to be empty while keeping any acquired
// growth in the backing array.
l.openedCircuits = l.openedCircuits[:0]
l.closedCircuits = l.closedCircuits[:0]
return nil
}
// updateCommitTx signs, then sends an update to the remote peer adding a new
// commitment to their commitment chain which includes all the latest updates
// we've received+processed up to this point.
func (l *channelLink) updateCommitTx() error {
// Preemptively write all pending keystones to disk, just in case the
// HTLCs we have in memory are included in the subsequent attempt to
// sign a commitment state.
err := l.cfg.Circuits.OpenCircuits(l.keystoneBatch...)
if err != nil {
return err
}
// Reset the batch, but keep the backing buffer to avoid reallocating.
l.keystoneBatch = l.keystoneBatch[:0]
theirCommitSig, htlcSigs, err := l.channel.SignNextCommitment()
if err == lnwallet.ErrNoWindow {
log.Tracef("revocation window exhausted, unable to send %v",
@ -1192,6 +1422,10 @@ func (l *channelLink) updateCommitTx() error {
return err
}
if err := l.ackDownStreamPackets(false); err != nil {
return err
}
commitSig := &lnwire.CommitSig{
ChanID: l.ChanID(),
CommitSig: theirCommitSig,
@ -1231,8 +1465,6 @@ func (l *channelLink) Peer() Peer {
//
// NOTE: Part of the ChannelLink interface.
func (l *channelLink) ShortChanID() lnwire.ShortChannelID {
l.RLock()
defer l.RUnlock()
return l.shortChanID
}
@ -1301,6 +1533,17 @@ func (l *channelLink) Bandwidth() lnwire.MilliSatoshi {
return linkBandwidth - reserve
}
// AttachMailBox updates the current mailbox used by this link, and hooks up the
// mailbox's message and packet outboxes to the link's upstream and downstream
// chans, respectively.
func (l *channelLink) AttachMailBox(mailbox MailBox) {
l.Lock()
l.mailBox = mailbox
l.upstream = mailbox.MessageOutBox()
l.downstream = mailbox.PacketOutBox()
l.Unlock()
}
// policyUpdate is a message sent to a channel link when an outside sub-system
// wishes to update the current forwarding policy.
type policyUpdate struct {
@ -1357,8 +1600,11 @@ func (l *channelLink) String() string {
// another peer or if the update was created by user
//
// NOTE: Part of the ChannelLink interface.
func (l *channelLink) HandleSwitchPacket(packet *htlcPacket) {
l.mailBox.AddPacket(packet)
func (l *channelLink) HandleSwitchPacket(pkt *htlcPacket) error {
l.tracef("received switch packet inkey=%v, outkey=%v",
pkt.inKey(), pkt.outKey())
l.mailBox.AddPacket(pkt)
return nil
}
// HandleChannelUpdate handles the htlc requests as settle/add/fail which sent
@ -1379,8 +1625,8 @@ func (l *channelLink) updateChannelFee(feePerKw lnwallet.SatPerKWeight) error {
// We skip sending the UpdateFee message if the channel is not
// currently eligible to forward messages.
if !l.EligibleToForward() {
log.Debugf("ChannelPoint(%v): skipping fee update for " +
"inactive channel")
log.Debugf("ChannelPoint(%v): skipping fee update for "+
"inactive channel", l.ChanID())
return nil
}
@ -1398,22 +1644,29 @@ func (l *channelLink) updateChannelFee(feePerKw lnwallet.SatPerKWeight) error {
return l.updateCommitTx()
}
// processLockedInHtlcs serially processes each of the log updates which have
// been "locked-in". An HTLC is considered locked-in once it has been fully
// committed to in both the remote and local commitment state. Once a channel
// updates is locked-in, then it can be acted upon, meaning: settling HTLCs,
// cancelling them, or forwarding new HTLCs to the next hop.
func (l *channelLink) processLockedInHtlcs(
paymentDescriptors []*lnwallet.PaymentDescriptor) []*htlcPacket {
// processRemoteSettleFails accepts a batch of settle/fail payment descriptors
// after receiving a revocation from the remote party, and reprocesses them in
// the context of the provided forwarding package. Any settles or fails that
// have already been acknowledged in the forwarding package will not be sent to
// the switch.
func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg,
settleFails []*lnwallet.PaymentDescriptor) {
var (
needUpdate bool
packetsToForward []*htlcPacket
)
if len(settleFails) == 0 {
return
}
log.Debugf("ChannelLink(%v): settle-fail-filter %v",
l.ShortChanID(), fwdPkg.SettleFailFilter)
var switchPackets []*htlcPacket
for i, pd := range settleFails {
// Skip any settles or fails that have already been acknowledged
// by the incoming link that originated the forwarded Add.
if fwdPkg.SettleFailFilter.Contains(uint16(i)) {
continue
}
for _, pd := range paymentDescriptors {
// TODO(roasbeef): rework log entries to a shared
// interface.
switch pd.EntryType {
// A settle for an HTLC we previously forwarded HTLC has been
@ -1423,7 +1676,7 @@ func (l *channelLink) processLockedInHtlcs(
settlePacket := &htlcPacket{
outgoingChanID: l.ShortChanID(),
outgoingHTLCID: pd.ParentIndex,
amount: pd.Amount,
destRef: pd.DestRef,
htlc: &lnwire.UpdateFulfillHTLC{
PaymentPreimage: pd.RPreimage,
},
@ -1432,7 +1685,7 @@ func (l *channelLink) processLockedInHtlcs(
// Add the packet to the batch to be forwarded, and
// notify the overflow queue that a spare spot has been
// freed up within the commitment state.
packetsToForward = append(packetsToForward, settlePacket)
switchPackets = append(switchPackets, settlePacket)
l.overflowQueue.SignalFreeSlot()
// A failureCode message for a previously forwarded HTLC has
@ -1445,7 +1698,7 @@ func (l *channelLink) processLockedInHtlcs(
failPacket := &htlcPacket{
outgoingChanID: l.ShortChanID(),
outgoingHTLCID: pd.ParentIndex,
amount: pd.Amount,
destRef: pd.DestRef,
htlc: &lnwire.UpdateFailHTLC{
Reason: lnwire.OpaqueReason(pd.FailReason),
},
@ -1454,8 +1707,83 @@ func (l *channelLink) processLockedInHtlcs(
// Add the packet to the batch to be forwarded, and
// notify the overflow queue that a spare spot has been
// freed up within the commitment state.
packetsToForward = append(packetsToForward, failPacket)
switchPackets = append(switchPackets, failPacket)
l.overflowQueue.SignalFreeSlot()
}
}
go l.forwardBatch(switchPackets...)
}
// processRemoteAdds serially processes each of the Add payment descriptors
// which have been "locked-in" by receiving a revocation from the remote party.
// The forwarding package provided instructs how to process this batch,
// indicating whether this is the first time these Adds are being processed, or
// whether we are reprocessing as a result of a failure or restart. Adds that
// have already been acknowledged in the forwarding package will be ignored.
func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
lockedInHtlcs []*lnwallet.PaymentDescriptor) bool {
l.tracef("processing %d remote adds for height %d",
len(lockedInHtlcs), fwdPkg.Height)
decodeReqs := make([]DecodeHopIteratorRequest, 0, len(lockedInHtlcs))
for _, pd := range lockedInHtlcs {
switch pd.EntryType {
// TODO(conner): remove type switch?
case lnwallet.Add:
// Before adding the new htlc to the state machine,
// parse the onion object in order to obtain the
// routing information with DecodeHopIterator function
// which process the Sphinx packet.
onionReader := bytes.NewReader(pd.OnionBlob)
req := DecodeHopIteratorRequest{
OnionReader: onionReader,
RHash: pd.RHash[:],
IncomingCltv: pd.Timeout,
}
decodeReqs = append(decodeReqs, req)
}
}
// Atomically decode the incoming htlcs, simultaneously checking for
// replay attempts. A particular index in the returned, spare list of
// channel iterators should only be used if the failure code at the same
// index is lnwire.FailCodeNone.
decodeResps, sphinxErr := l.cfg.DecodeHopIterators(
fwdPkg.ID(), decodeReqs,
)
if sphinxErr != nil {
l.errorf("unable to decode hop iterators: %v", sphinxErr)
l.fail(ErrInternalLinkFailure.Error())
return false
}
var (
needUpdate bool
switchPackets []*htlcPacket
)
for i, pd := range lockedInHtlcs {
idx := uint16(i)
if fwdPkg.State == channeldb.FwdStateProcessed &&
fwdPkg.AckFilter.Contains(idx) {
// If this index is already found in the ack filter, the
// response to this forwarding decision has already been
// committed by one of our commitment txns. ADDs in this
// state are waiting for the rest of the fwding package
// to get acked before being garbage collected.
continue
}
// TODO(roasbeef): rework log entries to a shared
// interface.
switch pd.EntryType {
// An incoming HTLC add has been full-locked in. As a result we
// can now examine the forwarding details of the HTLC, and the
@ -1472,23 +1800,13 @@ func (l *channelLink) processLockedInHtlcs(
// parse the onion object in order to obtain the
// routing information with DecodeHopIterator function
// which process the Sphinx packet.
//
// We include the payment hash of the htlc as it's
// authenticated within the Sphinx packet itself as
// associated data in order to thwart attempts a replay
// attacks. In the case of a replay, an attacker is
// *forced* to use the same payment hash twice, thereby
// losing their money entirely.
onionReader := bytes.NewReader(onionBlob[:])
chanIterator, failureCode := l.cfg.DecodeHopIterator(
onionReader, pd.RHash[:], pd.Timeout,
)
chanIterator, failureCode := decodeResps[i].Result()
if failureCode != lnwire.CodeNone {
// If we're unable to process the onion blob
// than we should send the malformed htlc error
// to payment sender.
l.sendMalformedHTLCError(pd.HtlcIndex, failureCode,
onionBlob[:])
onionBlob[:], pd.SourceRef)
needUpdate = true
log.Errorf("unable to decode onion hop "+
@ -1507,7 +1825,7 @@ func (l *channelLink) processLockedInHtlcs(
// than we should send the malformed htlc error
// to payment sender.
l.sendMalformedHTLCError(pd.HtlcIndex, failureCode,
onionBlob[:])
onionBlob[:], pd.SourceRef)
needUpdate = true
log.Errorf("unable to decode onion "+
@ -1520,6 +1838,7 @@ func (l *channelLink) processLockedInHtlcs(
fwdInfo := chanIterator.ForwardingInstructions()
switch fwdInfo.NextHop {
case exitHop:
if l.cfg.DebugHTLC && l.cfg.HodlHTLC {
log.Warnf("hodl HTLC mode enabled, " +
"will not attempt to settle " +
@ -1538,7 +1857,8 @@ func (l *channelLink) processLockedInHtlcs(
pd.Timeout, heightNow)
failure := lnwire.FailFinalIncorrectCltvExpiry{}
l.sendHTLCError(pd.HtlcIndex, &failure, obfuscator)
l.sendHTLCError(pd.HtlcIndex, &failure,
obfuscator, pd.SourceRef)
needUpdate = true
continue
}
@ -1553,23 +1873,34 @@ func (l *channelLink) processLockedInHtlcs(
log.Errorf("unable to query invoice registry: "+
" %v", err)
failure := lnwire.FailUnknownPaymentHash{}
l.sendHTLCError(pd.HtlcIndex, failure, obfuscator)
l.sendHTLCError(pd.HtlcIndex, failure,
obfuscator, pd.SourceRef)
needUpdate = true
continue
}
// If this invoice has already been settled,
// then we'll reject it as we don't allow an
// invoice to be paid twice.
if invoice.Terms.Settled == true {
log.Warnf("Rejecting duplicate "+
// If the invoice is already settled, we choose
// to accept the payment to simplify failure
// recovery.
//
// NOTE: Though our recovery and forwarding logic is
// predominately batched, settling invoices
// happens iteratively. We may reject one of of
// two payments for the same rhash at first, but
// then restart and reject both after seeing
// that the invoice has been settled. Without
// any record of which one settles first, it is
// ambiguous as to which one actually settled
// the invoice. Thus, by accepting all payments,
// we eliminate the race condition that can lead
// to this inconsistency.
//
// TODO(conner): track ownership of settlements
// to properly recover from failures? or add
// batch invoice settlement
if invoice.Terms.Settled {
log.Warnf("Accepting duplicate "+
"payment for hash=%x", pd.RHash[:])
failure := lnwire.FailUnknownPaymentHash{}
l.sendHTLCError(
pd.HtlcIndex, failure, obfuscator,
)
needUpdate = true
continue
}
// If we're not currently in debug mode, and
@ -1591,7 +1922,8 @@ func (l *channelLink) processLockedInHtlcs(
"amount: expected %v, received %v",
invoice.Terms.Value, pd.Amount)
failure := lnwire.FailIncorrectPaymentAmount{}
l.sendHTLCError(pd.HtlcIndex, failure, obfuscator)
l.sendHTLCError(pd.HtlcIndex, failure,
obfuscator, pd.SourceRef)
needUpdate = true
continue
}
@ -1618,7 +1950,8 @@ func (l *channelLink) processLockedInHtlcs(
fwdInfo.AmountToForward)
failure := lnwire.FailIncorrectPaymentAmount{}
l.sendHTLCError(pd.HtlcIndex, failure, obfuscator)
l.sendHTLCError(pd.HtlcIndex, failure,
obfuscator, pd.SourceRef)
needUpdate = true
continue
}
@ -1640,7 +1973,9 @@ func (l *channelLink) processLockedInHtlcs(
failure := lnwire.NewFinalIncorrectCltvExpiry(
fwdInfo.OutgoingCTLV,
)
l.sendHTLCError(pd.HtlcIndex, failure, obfuscator)
l.sendHTLCError(pd.HtlcIndex,
failure, obfuscator,
pd.SourceRef)
needUpdate = true
continue
case pd.Timeout != fwdInfo.OutgoingCTLV:
@ -1652,28 +1987,33 @@ func (l *channelLink) processLockedInHtlcs(
failure := lnwire.NewFinalIncorrectCltvExpiry(
fwdInfo.OutgoingCTLV,
)
l.sendHTLCError(pd.HtlcIndex, failure, obfuscator)
l.sendHTLCError(pd.HtlcIndex,
failure, obfuscator,
pd.SourceRef)
needUpdate = true
continue
}
}
preimage := invoice.Terms.PaymentPreimage
err = l.channel.SettleHTLC(preimage, pd.HtlcIndex, nil, nil, nil)
err = l.channel.SettleHTLC(preimage,
pd.HtlcIndex, pd.SourceRef, nil, nil)
if err != nil {
l.fail("unable to settle htlc: %v", err)
return nil
return false
}
// Notify the invoiceRegistry of the invoices
// we just settled with this latest commitment
// Notify the invoiceRegistry of the invoices we
// just settled with this latest commitment
// update.
err = l.cfg.Registry.SettleInvoice(invoiceHash)
if err != nil {
l.fail("unable to settle invoice: %v", err)
return nil
return false
}
l.infof("Settling %x as exit hop", pd.RHash)
// HTLC was successfully settled locally send
// notification about it remote peer.
l.cfg.Peer.SendMessage(&lnwire.UpdateFulfillHTLC{
@ -1688,6 +2028,53 @@ func (l *channelLink) processLockedInHtlcs(
// constraints have been properly met by by this
// incoming HTLC.
default:
switch fwdPkg.State {
case channeldb.FwdStateProcessed:
if !fwdPkg.FwdFilter.Contains(idx) {
// This add was not forwarded on
// the previous processing
// phase, run it through our
// validation pipeline to
// reproduce an error. This may
// trigger a different error due
// to expiring timelocks, but we
// expect that an error will be
// reproduced.
break
}
addMsg := &lnwire.UpdateAddHTLC{
Expiry: fwdInfo.OutgoingCTLV,
Amount: fwdInfo.AmountToForward,
PaymentHash: pd.RHash,
}
// Finally, we'll encode the onion packet for
// the _next_ hop using the hop iterator
// decoded for the current hop.
buf := bytes.NewBuffer(addMsg.OnionBlob[0:0])
// We know this cannot fail, as this ADD
// was marked forwarded in a previous
// round of processing.
chanIterator.EncodeNextHop(buf)
updatePacket := &htlcPacket{
incomingChanID: l.ShortChanID(),
incomingHTLCID: pd.HtlcIndex,
outgoingChanID: fwdInfo.NextHop,
sourceRef: pd.SourceRef,
incomingAmount: pd.Amount,
amount: addMsg.Amount,
htlc: addMsg,
obfuscator: obfuscator,
}
switchPackets = append(switchPackets,
updatePacket)
continue
}
// We want to avoid forwarding an HTLC which
// will expire in the near future, so we'll
// reject an HTLC if its expiration time is too
@ -1707,7 +2094,8 @@ func (l *channelLink) processLockedInHtlcs(
failure = lnwire.NewExpiryTooSoon(*update)
}
l.sendHTLCError(pd.HtlcIndex, failure, obfuscator)
l.sendHTLCError(pd.HtlcIndex, failure,
obfuscator, pd.SourceRef)
needUpdate = true
continue
}
@ -1734,7 +2122,8 @@ func (l *channelLink) processLockedInHtlcs(
pd.Amount, *update)
}
l.sendHTLCError(pd.HtlcIndex, failure, obfuscator)
l.sendHTLCError(pd.HtlcIndex, failure,
obfuscator, pd.SourceRef)
needUpdate = true
continue
}
@ -1779,7 +2168,8 @@ func (l *channelLink) processLockedInHtlcs(
*update)
}
l.sendHTLCError(pd.HtlcIndex, failure, obfuscator)
l.sendHTLCError(pd.HtlcIndex, failure,
obfuscator, pd.SourceRef)
needUpdate = true
continue
}
@ -1806,12 +2196,13 @@ func (l *channelLink) processLockedInHtlcs(
if err != nil {
l.fail("unable to create channel update "+
"while handling the error: %v", err)
return nil
return false
}
failure := lnwire.NewIncorrectCltvExpiry(
pd.Timeout, *update)
l.sendHTLCError(pd.HtlcIndex, failure, obfuscator)
l.sendHTLCError(pd.HtlcIndex, failure,
obfuscator, pd.SourceRef)
needUpdate = true
continue
}
@ -1838,42 +2229,107 @@ func (l *channelLink) processLockedInHtlcs(
"remaining route %v", err)
failure := lnwire.NewTemporaryChannelFailure(nil)
l.sendHTLCError(pd.HtlcIndex, failure, obfuscator)
l.sendHTLCError(pd.HtlcIndex, failure,
obfuscator, pd.SourceRef)
needUpdate = true
continue
}
// Now that this add has been reprocessed, only
// append it to our list of packets to forward
// to the switch this is the first time
// processing the add. If the fwd pkg has
// already been processed, then we entered the
// above section to recreate a previous error.
// If the packet had previously been forwarded,
// it would have been added to switchPackets at
// the top of this section.
if fwdPkg.State == channeldb.FwdStateLockedIn {
updatePacket := &htlcPacket{
incomingChanID: l.ShortChanID(),
incomingHTLCID: pd.HtlcIndex,
outgoingChanID: fwdInfo.NextHop,
incomingHtlcAmt: pd.Amount,
sourceRef: pd.SourceRef,
incomingAmount: pd.Amount,
amount: addMsg.Amount,
htlc: addMsg,
obfuscator: obfuscator,
}
packetsToForward = append(packetsToForward, updatePacket)
fwdPkg.FwdFilter.Set(idx)
switchPackets = append(switchPackets,
updatePacket)
}
}
}
}
if needUpdate {
// With all the settle/cancel updates added to the local and
// remote HTLC logs, initiate a state transition by updating
// the remote commitment chain.
if err := l.updateCommitTx(); err != nil {
l.fail("unable to update commitment: %v", err)
return nil
// Commit the htlcs we are intending to forward if this package has not
// been fully processed.
if fwdPkg.State == channeldb.FwdStateLockedIn {
err := l.channel.SetFwdFilter(fwdPkg.Height, fwdPkg.FwdFilter)
if err != nil {
l.fail("unable to set fwd filter: %v", err)
return false
}
}
return packetsToForward
if len(switchPackets) == 0 {
return needUpdate
}
l.debugf("forwarding %d packets to switch", len(switchPackets))
go l.forwardBatch(switchPackets...)
return needUpdate
}
// forwardBatch forwards the given htlcPackets to the switch, and waits on the
// err chan for the individual responses. This method is intended to be spawned
// as a goroutine so the responses can be handled in the background.
func (l *channelLink) forwardBatch(packets ...*htlcPacket) {
// Don't forward packets for which we already have a response in our
// mailbox. This could happen if a packet fails and is buffered in the
// mailbox, and the incoming link flaps.
var filteredPkts = make([]*htlcPacket, 0, len(packets))
for _, pkt := range packets {
if l.mailBox.HasPacket(pkt.inKey()) {
continue
}
filteredPkts = append(filteredPkts, pkt)
}
errChan := l.cfg.ForwardPackets(filteredPkts...)
l.handleBatchFwdErrs(errChan)
}
// handleBatchFwdErrs waits on the given errChan until it is closed, logging the
// errors returned from any unsuccessful forwarding attempts.
func (l *channelLink) handleBatchFwdErrs(errChan chan error) {
for {
err, ok := <-errChan
if !ok {
// Err chan has been drained or switch is shutting down.
// Either way, return.
return
}
if err == nil {
continue
}
l.errorf("unhandled error while forwarding htlc packet over "+
"htlcswitch: %v", err)
}
}
// sendHTLCError functions cancels HTLC and send cancel message back to the
// peer from which HTLC was received.
func (l *channelLink) sendHTLCError(htlcIndex uint64,
failure lnwire.FailureMessage, e ErrorEncrypter) {
failure lnwire.FailureMessage, e ErrorEncrypter,
sourceRef *channeldb.AddRef) {
reason, err := e.EncryptFirstHop(failure)
if err != nil {
@ -1881,7 +2337,7 @@ func (l *channelLink) sendHTLCError(htlcIndex uint64,
return
}
err = l.channel.FailHTLC(htlcIndex, reason, nil, nil, nil)
err = l.channel.FailHTLC(htlcIndex, reason, sourceRef, nil, nil)
if err != nil {
log.Errorf("unable cancel htlc: %v", err)
return
@ -1897,10 +2353,10 @@ func (l *channelLink) sendHTLCError(htlcIndex uint64,
// sendMalformedHTLCError helper function which sends the malformed HTLC update
// to the payment sender.
func (l *channelLink) sendMalformedHTLCError(htlcIndex uint64,
code lnwire.FailCode, onionBlob []byte) {
code lnwire.FailCode, onionBlob []byte, sourceRef *channeldb.AddRef) {
shaOnionBlob := sha256.Sum256(onionBlob)
err := l.channel.MalformedFailHTLC(htlcIndex, code, shaOnionBlob, nil)
err := l.channel.MalformedFailHTLC(htlcIndex, code, shaOnionBlob, sourceRef)
if err != nil {
log.Errorf("unable cancel htlc: %v", err)
return
@ -1921,3 +2377,33 @@ func (l *channelLink) fail(format string, a ...interface{}) {
log.Error(reason)
go l.cfg.Peer.Disconnect(reason)
}
// infof prefixes the channel's identifier before printing to info log.
func (l *channelLink) infof(format string, a ...interface{}) {
msg := fmt.Sprintf(format, a...)
log.Infof("ChannelLink(%s) %s", l.ShortChanID(), msg)
}
// debugf prefixes the channel's identifier before printing to debug log.
func (l *channelLink) debugf(format string, a ...interface{}) {
msg := fmt.Sprintf(format, a...)
log.Debugf("ChannelLink(%s) %s", l.ShortChanID(), msg)
}
// warnf prefixes the channel's identifier before printing to warn log.
func (l *channelLink) warnf(format string, a ...interface{}) {
msg := fmt.Sprintf(format, a...)
log.Warnf("ChannelLink(%s) %s", l.ShortChanID(), msg)
}
// errorf prefixes the channel's identifier before printing to error log.
func (l *channelLink) errorf(format string, a ...interface{}) {
msg := fmt.Sprintf(format, a...)
log.Errorf("ChannelLink(%s) %s", l.ShortChanID(), msg)
}
// tracef prefixes the channel's identifier before printing to trace log.
func (l *channelLink) tracef(format string, a ...interface{}) {
msg := fmt.Sprintf(format, a...)
log.Tracef("ChannelLink(%s) %s", l.ShortChanID(), msg)
}