htlcswitch: start using config in channel link

Step №3 in making htlcManager (aka channelLink) testable:
Apply the channel link config inside the channel link itself.
This commit is contained in:
Andrey Samokhvalov 2017-05-04 00:03:47 +03:00 committed by Olaoluwa Osuntokun
parent 2eea76375c
commit db30571efe

@ -16,7 +16,6 @@ import (
"github.com/roasbeef/btcd/chaincfg/chainhash" "github.com/roasbeef/btcd/chaincfg/chainhash"
"github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil" "github.com/roasbeef/btcutil"
"github.com/Masterminds/glide/cfg"
) )
// ChannelLinkConfig defines the configuration for the channel link. ALL // ChannelLinkConfig defines the configuration for the channel link. ALL
@ -156,48 +155,30 @@ func (l *channelLink) Stop() {
l.wg.Wait() l.wg.Wait()
} }
// htlcManager is the primary goroutine which drives a channel's commitment // htlcHandler is the primary goroutine which drives a channel's commitment
// update state-machine in response to messages received via several channels. // update state-machine in response to messages received via several channels.
// The htlcManager reads messages from the upstream (remote) peer, and also // This goroutine reads messages from the upstream (remote) peer, and also from
// from several possible downstream channels managed by the htlcSwitch. In the // downstream channel managed by the channel link. In the event that an htlc
// event that an htlc needs to be forwarded, then send-only htlcPlex chan is // needs to be forwarded, then send-only forward handler is used which sends
// used which sends htlc packets to the switch for forwarding. Additionally, // htlc packets to the switch. Additionally, the this goroutine handles acting
// the htlcManager handles acting upon all timeouts for any active HTLCs, // upon all timeouts for any active HTLCs, manages the channel's revocation
// manages the channel's revocation window, and also the htlc trickle // window, and also the htlc trickle queue+timer for this active channels.
// queue+timer for this active channels. // NOTE: Should be started as goroutine.
func (p *peer) htlcManager(channel *lnwallet.LightningChannel, func (l *channelLink) htlcHandler() {
htlcPlex chan<- *htlcPacket, downstreamLink <-chan *htlcPacket, defer l.wg.Done()
upstreamLink <-chan lnwire.Message) {
chanStats := channel.StateSnapshot() log.Infof("HTLC manager for ChannelPoint(%v) started, "+
peerLog.Infof("HTLC manager for ChannelPoint(%v) started, "+ "bandwidth=%v", l.channel.ChannelPoint(), l.getBandwidth())
"our_balance=%v, their_balance=%v, chain_height=%v",
channel.ChannelPoint(), chanStats.LocalBalance,
chanStats.RemoteBalance, chanStats.NumUpdates)
// A new session for this active channel has just started, therefore we // A new session for this active channel has just started, therefore we
// need to send our initial revocation window to the remote peer. // need to send our initial revocation window to the remote peer.
for i := 0; i < lnwallet.InitialRevocationWindow; i++ { for i := 0; i < lnwallet.InitialRevocationWindow; i++ {
rev, err := channel.ExtendRevocationWindow() rev, err := l.channel.ExtendRevocationWindow()
if err != nil { if err != nil {
peerLog.Errorf("unable to expand revocation window: %v", err) log.Errorf("unable to expand revocation window: %v", err)
continue continue
} }
p.queueMsg(rev, nil) l.cfg.Peer.SendMessage(rev)
}
chanPoint := channel.ChannelPoint()
state := &commitmentState{
channel: channel,
chanPoint: chanPoint,
chanID: lnwire.NewChanIDFromOutPoint(chanPoint),
clearedHTCLs: make(map[uint64]*pendingPayment),
htlcsToSettle: make(map[uint64]*channeldb.Invoice),
htlcsToCancel: make(map[uint64]lnwire.FailCode),
cancelReasons: make(map[uint64]lnwire.FailCode),
pendingCircuits: make(map[uint64]*sphinx.ProcessedPacket),
sphinx: p.server.sphinx,
switchChan: htlcPlex,
} }
// TODO(roasbeef): check to see if able to settle any currently pending // TODO(roasbeef): check to see if able to settle any currently pending
@ -213,24 +194,23 @@ func (p *peer) htlcManager(channel *lnwallet.LightningChannel,
out: out:
for { for {
select { select {
case <-channel.UnilateralCloseSignal: case <-l.channel.UnilateralCloseSignal:
// TODO(roasbeef): need to send HTLC outputs to nursery // TODO(roasbeef): need to send HTLC outputs to nursery
peerLog.Warnf("Remote peer has closed ChannelPoint(%v) on-chain", log.Warnf("Remote peer has closed ChannelPoint(%v) on-chain",
state.chanPoint) l.channel.ChannelPoint())
if err := wipeChannel(p, channel); err != nil { if err := l.cfg.Peer.WipeChannel(l.channel); err != nil {
peerLog.Errorf("unable to wipe channel %v", err) log.Errorf("unable to wipe channel %v", err)
} }
p.server.breachArbiter.settledContracts <- state.chanPoint l.cfg.SettledContracts <- l.channel.ChannelPoint()
break out break out
case <-channel.ForceCloseSignal: case <-l.channel.ForceCloseSignal:
// TODO(roasbeef): path never taken now that server // TODO(roasbeef): path never taken now that server
// force closes's directly? // force closes's directly?
peerLog.Warnf("ChannelPoint(%v) has been force "+ log.Warnf("ChannelPoint(%v) has been force "+
"closed, disconnecting from peerID(%x)", "closed, disconnecting from peerID(%x)",
state.chanPoint, p.id) l.channel.ChannelPoint(), l.cfg.Peer.ID())
break out break out
case <-logCommitTimer.C: case <-logCommitTimer.C:
@ -238,22 +218,22 @@ out:
// update in some time, check to see if we have any // update in some time, check to see if we have any
// pending updates we need to commit due to our // pending updates we need to commit due to our
// commitment chains being desynchronized. // commitment chains being desynchronized.
if state.channel.FullySynced() && if l.channel.FullySynced() &&
len(state.htlcsToSettle) == 0 { len(l.htlcsToSettle) == 0 {
continue continue
} }
if err := p.updateCommitTx(state); err != nil { if err := l.updateCommitTx(); err != nil {
peerLog.Errorf("unable to update commitment: %v", log.Errorf("unable to update commitment: %v",
err) err)
p.Disconnect() l.cfg.Peer.Disconnect()
break out break out
} }
case <-batchTimer.C: case <-batchTimer.C:
// If the current batch is empty, then we have no work // If the current batch is empty, then we have no work
// here. // here.
if len(state.pendingBatch) == 0 { if len(l.pendingBatch) == 0 {
continue continue
} }
@ -262,73 +242,68 @@ out:
// If the send was unsuccessful, then abandon the // If the send was unsuccessful, then abandon the
// update, waiting for the revocation window to open // update, waiting for the revocation window to open
// up. // up.
if err := p.updateCommitTx(state); err != nil { if err := l.updateCommitTx(); err != nil {
peerLog.Errorf("unable to update "+ log.Errorf("unable to update "+
"commitment: %v", err) "commitment: %v", err)
p.Disconnect() l.cfg.Peer.Disconnect()
break out break out
} }
case pkt := <-downstreamLink: case pkt := <-l.downstream:
p.handleDownStreamPkt(state, pkt) l.handleDownStreamPkt(pkt)
case msg, ok := <-upstreamLink: case msg := <-l.upstream:
// If the upstream message link is closed, this signals l.handleUpstreamMsg(msg)
// that the channel itself is being closed, therefore
// we exit. case cmd := <-l.control:
if !ok { switch cmd := cmd.(type) {
break out case *getBandwidthCmd:
cmd.done <- l.getBandwidth()
} }
p.handleUpstreamMsg(state, msg) case <-l.quit:
case <-p.quit:
break out break out
} }
} }
p.wg.Done() log.Infof("channel link(%v): htlc handler closed", l)
peerLog.Tracef("htlcManager for peer %v done", p)
} }
// handleDownStreamPkt processes an HTLC packet sent from the downstream HTLC // handleDownStreamPkt processes an HTLC packet sent from the downstream HTLC
// Switch. Possible messages sent by the switch include requests to forward new // Switch. Possible messages sent by the switch include requests to forward new
// HTLCs, timeout previously cleared HTLCs, and finally to settle currently // HTLCs, timeout previously cleared HTLCs, and finally to settle currently
// cleared HTLCs with the upstream peer. // cleared HTLCs with the upstream peer.
func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) { func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket) {
var isSettle bool var isSettle bool
switch htlc := pkt.msg.(type) { switch htlc := pkt.htlc.(type) {
case *lnwire.UpdateAddHTLC: case *lnwire.UpdateAddHTLC:
// A new payment has been initiated via the // A new payment has been initiated via the
// downstream channel, so we add the new HTLC // downstream channel, so we add the new HTLC
// to our local log, then update the commitment // to our local log, then update the commitment
// chains. // chains.
htlc.ChanID = state.chanID htlc.ChanID = l.ChanID()
index, err := state.channel.AddHTLC(htlc) index, err := l.channel.AddHTLC(htlc)
if err != nil { if err != nil {
// TODO: possibly perform fallback/retry logic // TODO: possibly perform fallback/retry logic
// depending on type of error // depending on type of error
peerLog.Errorf("Adding HTLC rejected: %v", err)
pkt.err <- err
close(pkt.done)
// The HTLC was unable to be added to the state // The HTLC was unable to be added to the state
// machine, as a result, we'll signal the switch to // machine, as a result, we'll signal the switch to
// cancel the pending payment. // cancel the pending payment.
// TODO(roasbeef): need to update link as well if local go l.cfg.Switch.forward(newFailPacket(l.ChanID(),
// HTLC? &lnwire.UpdateFailHTLC{
state.switchChan <- &htlcPacket{
amt: htlc.Amount,
msg: &lnwire.UpdateFailHTLC{
Reason: []byte{byte(0)}, Reason: []byte{byte(0)},
}, }, htlc.PaymentHash, htlc.Amount))
srcLink: state.chanID,
} log.Errorf("unable to handle downstream add HTLC: %v",
err)
return return
} }
htlc.ID = index
p.queueMsg(htlc, nil) l.cfg.Peer.SendMessage(htlc)
state.pendingBatch = append(state.pendingBatch, &pendingPayment{ l.pendingBatch = append(l.pendingBatch, &pendingPayment{
htlc: htlc, htlc: htlc,
index: index, index: index,
preImage: pkt.preImage, preImage: pkt.preImage,
@ -341,31 +316,32 @@ func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) {
// upstream. Therefore we settle the HTLC within the our local // upstream. Therefore we settle the HTLC within the our local
// state machine. // state machine.
pre := htlc.PaymentPreimage pre := htlc.PaymentPreimage
logIndex, err := state.channel.SettleHTLC(pre) logIndex, err := l.channel.SettleHTLC(pre)
if err != nil { if err != nil {
// TODO(roasbeef): broadcast on-chain // TODO(roasbeef): broadcast on-chain
peerLog.Errorf("settle for incoming HTLC rejected: %v", err) log.Errorf("settle for incoming HTLC "+
p.Disconnect() "rejected: %v", err)
l.cfg.Peer.Disconnect()
return return
} }
// With the HTLC settled, we'll need to populate the wire // With the HTLC settled, we'll need to populate the wire
// message to target the specific channel and HTLC to be // message to target the specific channel and HTLC to be
// cancelled. // cancelled.
htlc.ChanID = state.chanID htlc.ChanID = l.ChanID()
htlc.ID = logIndex htlc.ID = logIndex
// Then we send the HTLC settle message to the connected peer // Then we send the HTLC settle message to the connected peer
// so we can continue the propagation of the settle message. // so we can continue the propagation of the settle message.
p.queueMsg(htlc, nil) l.cfg.Peer.SendMessage(htlc)
isSettle = true isSettle = true
case *lnwire.UpdateFailHTLC: case *lnwire.UpdateFailHTLC:
// An HTLC cancellation has been triggered somewhere upstream, // An HTLC cancellation has been triggered somewhere upstream,
// we'll remove then HTLC from our local state machine. // we'll remove then HTLC from our local state machine.
logIndex, err := state.channel.FailHTLC(pkt.payHash) logIndex, err := l.channel.FailHTLC(pkt.payHash)
if err != nil { if err != nil {
peerLog.Errorf("unable to cancel HTLC: %v", err) log.Errorf("unable to cancel HTLC: %v", err)
return return
} }
@ -373,23 +349,23 @@ func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) {
// message to target the specific channel and HTLC to be // message to target the specific channel and HTLC to be
// cancelled. The "Reason" field will have already been set // cancelled. The "Reason" field will have already been set
// within the switch. // within the switch.
htlc.ChanID = state.chanID htlc.ChanID = l.ChanID()
htlc.ID = logIndex htlc.ID = logIndex
// Finally, we send the HTLC message to the peer which // Finally, we send the HTLC message to the peer which
// initially created the HTLC. // initially created the HTLC.
p.queueMsg(htlc, nil) l.cfg.Peer.SendMessage(htlc)
isSettle = true isSettle = true
} }
// If this newly added update exceeds the min batch size for adds, or // If this newly added update exceeds the min batch size for adds, or
// this is a settle request, then initiate an update. // this is a settle request, then initiate an update.
// TODO(roasbeef): enforce max HTLCs in flight limit // TODO(roasbeef): enforce max HTLCs in flight limit
if len(state.pendingBatch) >= 10 || isSettle { if len(l.pendingBatch) >= 10 || isSettle {
if err := p.updateCommitTx(state); err != nil { if err := l.updateCommitTx(); err != nil {
peerLog.Errorf("unable to update "+ log.Errorf("unable to update "+
"commitment: %v", err) "commitment: %v", err)
p.Disconnect() l.cfg.Peer.Disconnect()
return return
} }
} }
@ -398,28 +374,29 @@ func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) {
// handleUpstreamMsg processes wire messages related to commitment state // handleUpstreamMsg processes wire messages related to commitment state
// updates from the upstream peer. The upstream peer is the peer whom we have a // updates from the upstream peer. The upstream peer is the peer whom we have a
// direct channel with, updating our respective commitment chains. // direct channel with, updating our respective commitment chains.
func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
switch htlcPkt := msg.(type) { switch msg := msg.(type) {
// TODO(roasbeef): timeouts // TODO(roasbeef): timeouts
// * fail if can't parse sphinx mix-header // * fail if can't parse sphinx mix-header
case *lnwire.UpdateAddHTLC: case *lnwire.UpdateAddHTLC:
// Before adding the new HTLC to the state machine, parse the // Before adding the new HTLC to the state machine, parse the
// onion object in order to obtain the routing information. // onion object in order to obtain the routing information.
blobReader := bytes.NewReader(htlcPkt.OnionBlob[:]) blobReader := bytes.NewReader(msg.OnionBlob[:])
onionPkt := &sphinx.OnionPacket{} onionPkt := &sphinx.OnionPacket{}
if err := onionPkt.Decode(blobReader); err != nil { if err := onionPkt.Decode(blobReader); err != nil {
peerLog.Errorf("unable to decode onion pkt: %v", err) log.Errorf("unable to decode onion pkt: %v", err)
p.Disconnect() l.cfg.Peer.Disconnect()
return return
} }
// We just received an add request from an upstream peer, so we // We just received an add request from an upstream peer, so we
// add it to our state machine, then add the HTLC to our // add it to our state machine, then add the HTLC to our
// "settle" list in the event that we know the preimage // "settle" list in the event that we know the preimage
index, err := state.channel.ReceiveHTLC(htlcPkt) index, err := l.channel.ReceiveHTLC(msg)
if err != nil { if err != nil {
peerLog.Errorf("Receiving HTLC rejected: %v", err) log.Errorf("unable to handle upstream add HTLC: %v",
p.Disconnect() err)
l.cfg.Peer.Disconnect()
return return
} }
@ -432,14 +409,15 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
// a replay attacks. In the case of a replay, an attacker is // a replay attacks. In the case of a replay, an attacker is
// *forced* to use the same payment hash twice, thereby losing // *forced* to use the same payment hash twice, thereby losing
// their money entirely. // their money entirely.
rHash := htlcPkt.PaymentHash[:] rHash := msg.PaymentHash[:]
sphinxPacket, err := state.sphinx.ProcessOnionPacket(onionPkt, rHash) sphinxPacket, err := l.sphinx.ProcessOnionPacket(onionPkt, rHash)
if err != nil { if err != nil {
// If we're unable to parse the Sphinx packet, then // If we're unable to parse the Sphinx packet, then
// we'll cancel the HTLC after the current commitment // we'll cancel the HTLC after the current commitment
// transition. // transition.
peerLog.Errorf("unable to process onion pkt: %v", err) log.Errorf("unable to process onion pkt: %v",
state.htlcsToCancel[index] = lnwire.SphinxParseError err)
l.htlcsToCancel[index] = lnwire.SphinxParseError
return return
} }
@ -448,31 +426,31 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
// attempt to see if we have an invoice locally which'll allow // attempt to see if we have an invoice locally which'll allow
// us to settle this HTLC. // us to settle this HTLC.
case sphinx.ExitNode: case sphinx.ExitNode:
rHash := htlcPkt.PaymentHash rHash := msg.PaymentHash
invoice, err := p.server.invoices.LookupInvoice(rHash) invoice, err := l.cfg.Registry.LookupInvoice(rHash)
if err != nil { if err != nil {
// If we're the exit node, but don't recognize // If we're the exit node, but don't recognize
// the payment hash, then we'll fail the HTLC // the payment hash, then we'll fail the HTLC
// on the next state transition. // on the next state transition.
peerLog.Errorf("unable to settle HTLC, "+ log.Errorf("unable to settle HTLC, "+
"payment hash (%x) unrecognized", rHash[:]) "payment hash (%x) unrecognized", rHash[:])
state.htlcsToCancel[index] = lnwire.UnknownPaymentHash l.htlcsToCancel[index] = lnwire.UnknownPaymentHash
return return
} }
// If we're not currently in debug mode, and the // If we're not currently in debug mode, and the
// extended HTLC doesn't meet the value requested, then // extended HTLC doesn't meet the value requested, then
// we'll fail the HTLC. // we'll fail the HTLC.
if !cfg.DebugHTLC && htlcPkt.Amount < invoice.Terms.Value { if !l.cfg.DebugHTLC && msg.Amount < invoice.Terms.Value {
peerLog.Errorf("rejecting HTLC due to incorrect "+ log.Errorf("rejecting HTLC due to incorrect "+
"amount: expected %v, received %v", "amount: expected %v, received %v",
invoice.Terms.Value, htlcPkt.Amount) invoice.Terms.Value, msg.Amount)
state.htlcsToCancel[index] = lnwire.IncorrectValue l.htlcsToCancel[index] = lnwire.IncorrectValue
} else { } else {
// Otherwise, everything is in order and we'll // Otherwise, everything is in order and we'll
// settle the HTLC after the current state // settle the HTLC after the current state
// transition. // transition.
state.htlcsToSettle[index] = invoice l.htlcsToSettle[index] = invoice
} }
// There are additional hops left within this route, so we // There are additional hops left within this route, so we
@ -481,68 +459,70 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
// switch, we'll attach the routing information so the switch // switch, we'll attach the routing information so the switch
// can finalize the circuit. // can finalize the circuit.
case sphinx.MoreHops: case sphinx.MoreHops:
state.pendingCircuits[index] = sphinxPacket l.pendingCircuits[index] = sphinxPacket
default: default:
peerLog.Errorf("mal formed onion packet") log.Errorf("malformed onion packet")
state.htlcsToCancel[index] = lnwire.SphinxParseError l.htlcsToCancel[index] = lnwire.SphinxParseError
} }
case *lnwire.UpdateFufillHTLC: case *lnwire.UpdateFufillHTLC:
pre := htlcPkt.PaymentPreimage pre := msg.PaymentPreimage
idx := htlcPkt.ID idx := msg.ID
if err := state.channel.ReceiveHTLCSettle(pre, idx); err != nil { if err := l.channel.ReceiveHTLCSettle(pre, idx); err != nil {
// TODO(roasbeef): broadcast on-chain // TODO(roasbeef): broadcast on-chain
peerLog.Errorf("settle for outgoing HTLC rejected: %v", err) log.Errorf("unable to handle upstream settle "+
p.Disconnect() "HTLC: %v", err)
l.cfg.Peer.Disconnect()
return return
} }
// TODO(roasbeef): add preimage to DB in order to swipe // TODO(roasbeef): add preimage to DB in order to swipe
// repeated r-values // repeated r-values
case *lnwire.UpdateFailHTLC: case *lnwire.UpdateFailHTLC:
idx := htlcPkt.ID idx := msg.ID
if err := state.channel.ReceiveFailHTLC(idx); err != nil { if err := l.channel.ReceiveFailHTLC(idx); err != nil {
peerLog.Errorf("unable to recv HTLC cancel: %v", err) log.Errorf("unable to handle upstream fail HTLC: "+
p.Disconnect() "%v", err)
l.cfg.Peer.Disconnect()
return return
} }
state.cancelReasons[idx] = lnwire.FailCode(htlcPkt.Reason[0]) l.cancelReasons[idx] = lnwire.FailCode(msg.Reason[0])
case *lnwire.CommitSig: case *lnwire.CommitSig:
// We just received a new update to our local commitment chain, // We just received a new update to our local commitment chain,
// validate this new commitment, closing the link if invalid. // validate this new commitment, closing the link if invalid.
// TODO(roasbeef): redundant re-serialization // TODO(roasbeef): redundant re-serialization
sig := htlcPkt.CommitSig.Serialize() sig := msg.CommitSig.Serialize()
if err := state.channel.ReceiveNewCommitment(sig); err != nil { if err := l.channel.ReceiveNewCommitment(sig); err != nil {
peerLog.Errorf("unable to accept new commitment: %v", err) log.Errorf("unable to accept new commitment: %v", err)
p.Disconnect() l.cfg.Peer.Disconnect()
return return
} }
// As we've just just accepted a new state, we'll now // As we've just just accepted a new state, we'll now
// immediately send the remote peer a revocation for our prior // immediately send the remote peer a revocation for our prior
// state. // state.
nextRevocation, err := state.channel.RevokeCurrentCommitment() nextRevocation, err := l.channel.RevokeCurrentCommitment()
if err != nil { if err != nil {
peerLog.Errorf("unable to revoke commitment: %v", err) log.Errorf("unable to revoke commitment: %v", err)
return return
} }
p.queueMsg(nextRevocation, nil) l.cfg.Peer.SendMessage(nextRevocation)
// If both commitment chains are fully synced from our PoV, // If both commitment chains are fully synced from our PoV,
// then we don't need to reply with a signature as both sides // then we don't need to reply with a signature as both sides
// already have a commitment with the latest accepted state. // already have a commitment with the latest accepted l.
if state.channel.FullySynced() { if l.channel.FullySynced() {
return return
} }
// Otherwise, the remote party initiated the state transition, // Otherwise, the remote party initiated the state transition,
// so we'll reply with a signature to provide them with their // so we'll reply with a signature to provide them with their
// version of the latest commitment state. // version of the latest commitment l.
if err := p.updateCommitTx(state); err != nil { if err := l.updateCommitTx(); err != nil {
peerLog.Errorf("unable to update commitment: %v", err) log.Errorf("unable to update commitment: %v", err)
p.Disconnect() l.cfg.Peer.Disconnect()
return return
} }
@ -550,10 +530,10 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
// We've received a revocation from the remote chain, if valid, // We've received a revocation from the remote chain, if valid,
// this moves the remote chain forward, and expands our // this moves the remote chain forward, and expands our
// revocation window. // revocation window.
htlcsToForward, err := state.channel.ReceiveRevocation(htlcPkt) htlcsToForward, err := l.channel.ReceiveRevocation(msg)
if err != nil { if err != nil {
peerLog.Errorf("unable to accept revocation: %v", err) log.Errorf("unable to accept revocation: %v", err)
p.Disconnect() l.cfg.Peer.Disconnect()
return return
} }
@ -566,7 +546,7 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
cancelledHtlcs := make(map[uint64]struct{}) cancelledHtlcs := make(map[uint64]struct{})
for _, htlc := range htlcsToForward { for _, htlc := range htlcsToForward {
parentIndex := htlc.ParentIndex parentIndex := htlc.ParentIndex
if p, ok := state.clearedHTCLs[parentIndex]; ok { if p, ok := l.clearedHTCLs[parentIndex]; ok {
switch htlc.EntryType { switch htlc.EntryType {
// If the HTLC was settled successfully, then // If the HTLC was settled successfully, then
// we return a nil error as well as the payment // we return a nil error as well as the payment
@ -575,17 +555,17 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
p.preImage <- htlc.RPreimage p.preImage <- htlc.RPreimage
p.err <- nil p.err <- nil
// Otherwise, the HTLC failed, so we propagate // Otherwise, the HTLC failed, so we propagate
// the error back to the potential caller. // the error back to the potential caller.
case lnwallet.Fail: case lnwallet.Fail:
errMsg := state.cancelReasons[parentIndex] errMsg := l.cancelReasons[parentIndex]
p.preImage <- [32]byte{} p.preImage <- [32]byte{}
p.err <- errors.New(errMsg.String()) p.err <- errors.New(errMsg.String())
} }
close(p.done) close(p.done)
delete(state.clearedHTCLs, htlc.ParentIndex) delete(l.clearedHTCLs, htlc.ParentIndex)
} }
// TODO(roasbeef): rework log entries to a shared // TODO(roasbeef): rework log entries to a shared
@ -597,24 +577,24 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
// If we can settle this HTLC within our local state // If we can settle this HTLC within our local state
// update log, then send the update entry to the remote // update log, then send the update entry to the remote
// party. // party.
invoice, ok := state.htlcsToSettle[htlc.Index] invoice, ok := l.htlcsToSettle[htlc.Index]
if ok { if ok {
preimage := invoice.Terms.PaymentPreimage preimage := invoice.Terms.PaymentPreimage
logIndex, err := state.channel.SettleHTLC(preimage) logIndex, err := l.channel.SettleHTLC(preimage)
if err != nil { if err != nil {
peerLog.Errorf("unable to settle htlc: %v", err) log.Errorf("unable to settle htlc: %v", err)
p.Disconnect() l.cfg.Peer.Disconnect()
continue continue
} }
settleMsg := &lnwire.UpdateFufillHTLC{ settleMsg := &lnwire.UpdateFufillHTLC{
ChanID: state.chanID, ChanID: l.chanID,
ID: logIndex, ID: logIndex,
PaymentPreimage: preimage, PaymentPreimage: preimage,
} }
p.queueMsg(settleMsg, nil) l.cfg.Peer.SendMessage(settleMsg)
delete(state.htlcsToSettle, htlc.Index) delete(l.htlcsToSettle, htlc.Index)
settledPayments[htlc.RHash] = struct{}{} settledPayments[htlc.RHash] = struct{}{}
bandwidthUpdate += htlc.Amount bandwidthUpdate += htlc.Amount
@ -625,25 +605,25 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
// cancellation, then immediately cancel the HTLC as // cancellation, then immediately cancel the HTLC as
// it's now locked in within both commitment // it's now locked in within both commitment
// transactions. // transactions.
reason, ok := state.htlcsToCancel[htlc.Index] reason, ok := l.htlcsToCancel[htlc.Index]
if !ok { if !ok {
continue continue
} }
logIndex, err := state.channel.FailHTLC(htlc.RHash) logIndex, err := l.channel.FailHTLC(htlc.RHash)
if err != nil { if err != nil {
peerLog.Errorf("unable to cancel htlc: %v", err) log.Errorf("unable to cancel htlc: %v", err)
p.Disconnect() l.cfg.Peer.Disconnect()
continue continue
} }
cancelMsg := &lnwire.UpdateFailHTLC{ cancelMsg := &lnwire.UpdateFailHTLC{
ChanID: state.chanID, ChanID: l.chanID,
ID: logIndex, ID: logIndex,
Reason: []byte{byte(reason)}, Reason: []byte{byte(reason)},
} }
p.queueMsg(cancelMsg, nil) l.cfg.Peer.SendMessage(cancelMsg)
delete(state.htlcsToCancel, htlc.Index) delete(l.htlcsToCancel, htlc.Index)
cancelledHtlcs[htlc.Index] = struct{}{} cancelledHtlcs[htlc.Index] = struct{}{}
} }
@ -660,23 +640,23 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
continue continue
} }
onionPkt := state.pendingCircuits[htlc.Index] onionPkt := l.pendingCircuits[htlc.Index]
delete(state.pendingCircuits, htlc.Index) delete(l.pendingCircuits, htlc.Index)
reason := state.cancelReasons[htlc.ParentIndex] reason := l.cancelReasons[htlc.ParentIndex]
delete(state.cancelReasons, htlc.ParentIndex) delete(l.cancelReasons, htlc.ParentIndex)
// Send this fully activated HTLC to the htlc // Send this fully activated HTLC to the htlc
// switch to continue the chained clear/settle. // switch to continue the chained clear/settle.
pkt, err := logEntryToHtlcPkt(state.chanID, pkt, err := logEntryToHtlcPkt(l.chanID,
htlc, onionPkt, reason) htlc, onionPkt, reason)
if err != nil { if err != nil {
peerLog.Errorf("unable to make htlc pkt: %v", log.Errorf("unable to make htlc pkt: %v",
err) err)
continue continue
} }
state.switchChan <- pkt l.switchChan <- pkt
} }
}() }()
@ -696,9 +676,9 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
// With all the settle updates added to the local and remote // With all the settle updates added to the local and remote
// HTLC logs, initiate a state transition by updating the // HTLC logs, initiate a state transition by updating the
// remote commitment chain. // remote commitment chain.
if err := p.updateCommitTx(state); err != nil { if err := l.updateCommitTx(); err != nil {
peerLog.Errorf("unable to update commitment: %v", err) log.Errorf("unable to update commitment: %v", err)
p.Disconnect() l.cfg.Peer.Disconnect()
return return
} }
@ -706,9 +686,9 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
// with this latest commitment update. // with this latest commitment update.
// TODO(roasbeef): wait until next transition? // TODO(roasbeef): wait until next transition?
for invoice := range settledPayments { for invoice := range settledPayments {
err := p.server.invoices.SettleInvoice(chainhash.Hash(invoice)) err := l.cfg.Registry.SettleInvoice(chainhash.Hash(invoice))
if err != nil { if err != nil {
peerLog.Errorf("unable to settle invoice: %v", err) log.Errorf("unable to settle invoice: %v", err)
} }
} }
} }
@ -717,11 +697,11 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
// updateCommitTx signs, then sends an update to the remote peer adding a new // updateCommitTx signs, then sends an update to the remote peer adding a new
// commitment to their commitment chain which includes all the latest updates // commitment to their commitment chain which includes all the latest updates
// we've received+processed up to this point. // we've received+processed up to this point.
func (p *peer) updateCommitTx(state *commitmentState) error { func (l *channelLink) updateCommitTx() error {
sigTheirs, err := state.channel.SignNextCommitment() sigTheirs, err := l.channel.SignNextCommitment()
if err == lnwallet.ErrNoWindow { if err == lnwallet.ErrNoWindow {
peerLog.Tracef("revocation window exhausted, unable to send %v", log.Tracef("revocation window exhausted, unable to send %v",
len(state.pendingBatch)) len(l.pendingBatch))
return nil return nil
} else if err != nil { } else if err != nil {
return err return err
@ -733,21 +713,21 @@ func (p *peer) updateCommitTx(state *commitmentState) error {
} }
commitSig := &lnwire.CommitSig{ commitSig := &lnwire.CommitSig{
ChanID: state.chanID, ChanID: l.ChanID(),
CommitSig: parsedSig, CommitSig: parsedSig,
} }
p.queueMsg(commitSig, nil) l.cfg.Peer.SendMessage(commitSig)
// As we've just cleared out a batch, move all pending updates to the // As we've just cleared out a batch, move all pending updates to the
// map of cleared HTLCs, clearing out the set of pending updates. // map of cleared HTLCs, clearing out the set of pending updates.
for _, update := range state.pendingBatch { for _, update := range l.pendingBatch {
state.clearedHTCLs[update.index] = update l.clearedHTCLs[update.index] = update
} }
// Finally, clear our the current batch, and flip the pendingUpdate // Finally, clear our the current batch, and flip the pendingUpdate
// bool to indicate were waiting for a commitment signature. // bool to indicate were waiting for a commitment signature.
// TODO(roasbeef): re-slice instead to avoid GC? // TODO(roasbeef): re-slice instead to avoid GC?
state.pendingBatch = nil l.pendingBatch = nil
return nil return nil
} }
@ -795,11 +775,8 @@ func logEntryToHtlcPkt(chanID lnwire.ChannelID, pd *lnwallet.PaymentDescriptor,
pkt.payHash = pd.RHash pkt.payHash = pd.RHash
} }
pkt.amt = pd.Amount pkt.htlc = msg
pkt.msg = msg pkt.src = chanID
pkt.srcLink = chanID
pkt.onion = onionPkt
return pkt, nil return pkt, nil
} }