diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 7d1e28c9..aee12639 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -16,7 +16,6 @@ import ( "github.com/roasbeef/btcd/chaincfg/chainhash" "github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcutil" - "github.com/Masterminds/glide/cfg" ) // ChannelLinkConfig defines the configuration for the channel link. ALL @@ -156,48 +155,30 @@ func (l *channelLink) Stop() { l.wg.Wait() } -// htlcManager is the primary goroutine which drives a channel's commitment +// htlcHandler is the primary goroutine which drives a channel's commitment // update state-machine in response to messages received via several channels. -// The htlcManager reads messages from the upstream (remote) peer, and also -// from several possible downstream channels managed by the htlcSwitch. In the -// event that an htlc needs to be forwarded, then send-only htlcPlex chan is -// used which sends htlc packets to the switch for forwarding. Additionally, -// the htlcManager handles acting upon all timeouts for any active HTLCs, -// manages the channel's revocation window, and also the htlc trickle -// queue+timer for this active channels. -func (p *peer) htlcManager(channel *lnwallet.LightningChannel, - htlcPlex chan<- *htlcPacket, downstreamLink <-chan *htlcPacket, - upstreamLink <-chan lnwire.Message) { +// This goroutine reads messages from the upstream (remote) peer, and also from +// downstream channel managed by the channel link. In the event that an htlc +// needs to be forwarded, then send-only forward handler is used which sends +// htlc packets to the switch. Additionally, the this goroutine handles acting +// upon all timeouts for any active HTLCs, manages the channel's revocation +// window, and also the htlc trickle queue+timer for this active channels. +// NOTE: Should be started as goroutine. +func (l *channelLink) htlcHandler() { + defer l.wg.Done() - chanStats := channel.StateSnapshot() - peerLog.Infof("HTLC manager for ChannelPoint(%v) started, "+ - "our_balance=%v, their_balance=%v, chain_height=%v", - channel.ChannelPoint(), chanStats.LocalBalance, - chanStats.RemoteBalance, chanStats.NumUpdates) + log.Infof("HTLC manager for ChannelPoint(%v) started, "+ + "bandwidth=%v", l.channel.ChannelPoint(), l.getBandwidth()) // A new session for this active channel has just started, therefore we // need to send our initial revocation window to the remote peer. for i := 0; i < lnwallet.InitialRevocationWindow; i++ { - rev, err := channel.ExtendRevocationWindow() + rev, err := l.channel.ExtendRevocationWindow() if err != nil { - peerLog.Errorf("unable to expand revocation window: %v", err) + log.Errorf("unable to expand revocation window: %v", err) continue } - p.queueMsg(rev, nil) - } - - chanPoint := channel.ChannelPoint() - state := &commitmentState{ - channel: channel, - chanPoint: chanPoint, - chanID: lnwire.NewChanIDFromOutPoint(chanPoint), - clearedHTCLs: make(map[uint64]*pendingPayment), - htlcsToSettle: make(map[uint64]*channeldb.Invoice), - htlcsToCancel: make(map[uint64]lnwire.FailCode), - cancelReasons: make(map[uint64]lnwire.FailCode), - pendingCircuits: make(map[uint64]*sphinx.ProcessedPacket), - sphinx: p.server.sphinx, - switchChan: htlcPlex, + l.cfg.Peer.SendMessage(rev) } // TODO(roasbeef): check to see if able to settle any currently pending @@ -213,24 +194,23 @@ func (p *peer) htlcManager(channel *lnwallet.LightningChannel, out: for { select { - case <-channel.UnilateralCloseSignal: + case <-l.channel.UnilateralCloseSignal: // TODO(roasbeef): need to send HTLC outputs to nursery - peerLog.Warnf("Remote peer has closed ChannelPoint(%v) on-chain", - state.chanPoint) - if err := wipeChannel(p, channel); err != nil { - peerLog.Errorf("unable to wipe channel %v", err) + log.Warnf("Remote peer has closed ChannelPoint(%v) on-chain", + l.channel.ChannelPoint()) + if err := l.cfg.Peer.WipeChannel(l.channel); err != nil { + log.Errorf("unable to wipe channel %v", err) } - p.server.breachArbiter.settledContracts <- state.chanPoint - + l.cfg.SettledContracts <- l.channel.ChannelPoint() break out - case <-channel.ForceCloseSignal: + case <-l.channel.ForceCloseSignal: // TODO(roasbeef): path never taken now that server // force closes's directly? - peerLog.Warnf("ChannelPoint(%v) has been force "+ + log.Warnf("ChannelPoint(%v) has been force "+ "closed, disconnecting from peerID(%x)", - state.chanPoint, p.id) + l.channel.ChannelPoint(), l.cfg.Peer.ID()) break out case <-logCommitTimer.C: @@ -238,22 +218,22 @@ out: // update in some time, check to see if we have any // pending updates we need to commit due to our // commitment chains being desynchronized. - if state.channel.FullySynced() && - len(state.htlcsToSettle) == 0 { + if l.channel.FullySynced() && + len(l.htlcsToSettle) == 0 { continue } - if err := p.updateCommitTx(state); err != nil { - peerLog.Errorf("unable to update commitment: %v", + if err := l.updateCommitTx(); err != nil { + log.Errorf("unable to update commitment: %v", err) - p.Disconnect() + l.cfg.Peer.Disconnect() break out } case <-batchTimer.C: // If the current batch is empty, then we have no work // here. - if len(state.pendingBatch) == 0 { + if len(l.pendingBatch) == 0 { continue } @@ -262,73 +242,68 @@ out: // If the send was unsuccessful, then abandon the // update, waiting for the revocation window to open // up. - if err := p.updateCommitTx(state); err != nil { - peerLog.Errorf("unable to update "+ + if err := l.updateCommitTx(); err != nil { + log.Errorf("unable to update "+ "commitment: %v", err) - p.Disconnect() + l.cfg.Peer.Disconnect() break out } - case pkt := <-downstreamLink: - p.handleDownStreamPkt(state, pkt) + case pkt := <-l.downstream: + l.handleDownStreamPkt(pkt) - case msg, ok := <-upstreamLink: - // If the upstream message link is closed, this signals - // that the channel itself is being closed, therefore - // we exit. - if !ok { - break out + case msg := <-l.upstream: + l.handleUpstreamMsg(msg) + + case cmd := <-l.control: + switch cmd := cmd.(type) { + case *getBandwidthCmd: + cmd.done <- l.getBandwidth() } - p.handleUpstreamMsg(state, msg) - case <-p.quit: + case <-l.quit: break out } } - p.wg.Done() - peerLog.Tracef("htlcManager for peer %v done", p) + log.Infof("channel link(%v): htlc handler closed", l) } // handleDownStreamPkt processes an HTLC packet sent from the downstream HTLC // Switch. Possible messages sent by the switch include requests to forward new // HTLCs, timeout previously cleared HTLCs, and finally to settle currently // cleared HTLCs with the upstream peer. -func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) { +func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket) { var isSettle bool - switch htlc := pkt.msg.(type) { + switch htlc := pkt.htlc.(type) { case *lnwire.UpdateAddHTLC: // A new payment has been initiated via the // downstream channel, so we add the new HTLC // to our local log, then update the commitment // chains. - htlc.ChanID = state.chanID - index, err := state.channel.AddHTLC(htlc) + htlc.ChanID = l.ChanID() + index, err := l.channel.AddHTLC(htlc) if err != nil { // TODO: possibly perform fallback/retry logic // depending on type of error - peerLog.Errorf("Adding HTLC rejected: %v", err) - pkt.err <- err - close(pkt.done) // The HTLC was unable to be added to the state // machine, as a result, we'll signal the switch to // cancel the pending payment. - // TODO(roasbeef): need to update link as well if local - // HTLC? - state.switchChan <- &htlcPacket{ - amt: htlc.Amount, - msg: &lnwire.UpdateFailHTLC{ + go l.cfg.Switch.forward(newFailPacket(l.ChanID(), + &lnwire.UpdateFailHTLC{ Reason: []byte{byte(0)}, - }, - srcLink: state.chanID, - } + }, htlc.PaymentHash, htlc.Amount)) + + log.Errorf("unable to handle downstream add HTLC: %v", + err) return } + htlc.ID = index - p.queueMsg(htlc, nil) + l.cfg.Peer.SendMessage(htlc) - state.pendingBatch = append(state.pendingBatch, &pendingPayment{ + l.pendingBatch = append(l.pendingBatch, &pendingPayment{ htlc: htlc, index: index, preImage: pkt.preImage, @@ -341,31 +316,32 @@ func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) { // upstream. Therefore we settle the HTLC within the our local // state machine. pre := htlc.PaymentPreimage - logIndex, err := state.channel.SettleHTLC(pre) + logIndex, err := l.channel.SettleHTLC(pre) if err != nil { // TODO(roasbeef): broadcast on-chain - peerLog.Errorf("settle for incoming HTLC rejected: %v", err) - p.Disconnect() + log.Errorf("settle for incoming HTLC "+ + "rejected: %v", err) + l.cfg.Peer.Disconnect() return } // With the HTLC settled, we'll need to populate the wire // message to target the specific channel and HTLC to be // cancelled. - htlc.ChanID = state.chanID + htlc.ChanID = l.ChanID() htlc.ID = logIndex // Then we send the HTLC settle message to the connected peer // so we can continue the propagation of the settle message. - p.queueMsg(htlc, nil) + l.cfg.Peer.SendMessage(htlc) isSettle = true case *lnwire.UpdateFailHTLC: // An HTLC cancellation has been triggered somewhere upstream, // we'll remove then HTLC from our local state machine. - logIndex, err := state.channel.FailHTLC(pkt.payHash) + logIndex, err := l.channel.FailHTLC(pkt.payHash) if err != nil { - peerLog.Errorf("unable to cancel HTLC: %v", err) + log.Errorf("unable to cancel HTLC: %v", err) return } @@ -373,23 +349,23 @@ func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) { // message to target the specific channel and HTLC to be // cancelled. The "Reason" field will have already been set // within the switch. - htlc.ChanID = state.chanID + htlc.ChanID = l.ChanID() htlc.ID = logIndex // Finally, we send the HTLC message to the peer which // initially created the HTLC. - p.queueMsg(htlc, nil) + l.cfg.Peer.SendMessage(htlc) isSettle = true } // If this newly added update exceeds the min batch size for adds, or // this is a settle request, then initiate an update. // TODO(roasbeef): enforce max HTLCs in flight limit - if len(state.pendingBatch) >= 10 || isSettle { - if err := p.updateCommitTx(state); err != nil { - peerLog.Errorf("unable to update "+ + if len(l.pendingBatch) >= 10 || isSettle { + if err := l.updateCommitTx(); err != nil { + log.Errorf("unable to update "+ "commitment: %v", err) - p.Disconnect() + l.cfg.Peer.Disconnect() return } } @@ -398,28 +374,29 @@ func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) { // handleUpstreamMsg processes wire messages related to commitment state // updates from the upstream peer. The upstream peer is the peer whom we have a // direct channel with, updating our respective commitment chains. -func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { - switch htlcPkt := msg.(type) { +func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { + switch msg := msg.(type) { // TODO(roasbeef): timeouts // * fail if can't parse sphinx mix-header case *lnwire.UpdateAddHTLC: // Before adding the new HTLC to the state machine, parse the // onion object in order to obtain the routing information. - blobReader := bytes.NewReader(htlcPkt.OnionBlob[:]) + blobReader := bytes.NewReader(msg.OnionBlob[:]) onionPkt := &sphinx.OnionPacket{} if err := onionPkt.Decode(blobReader); err != nil { - peerLog.Errorf("unable to decode onion pkt: %v", err) - p.Disconnect() + log.Errorf("unable to decode onion pkt: %v", err) + l.cfg.Peer.Disconnect() return } // We just received an add request from an upstream peer, so we // add it to our state machine, then add the HTLC to our // "settle" list in the event that we know the preimage - index, err := state.channel.ReceiveHTLC(htlcPkt) + index, err := l.channel.ReceiveHTLC(msg) if err != nil { - peerLog.Errorf("Receiving HTLC rejected: %v", err) - p.Disconnect() + log.Errorf("unable to handle upstream add HTLC: %v", + err) + l.cfg.Peer.Disconnect() return } @@ -432,14 +409,15 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { // a replay attacks. In the case of a replay, an attacker is // *forced* to use the same payment hash twice, thereby losing // their money entirely. - rHash := htlcPkt.PaymentHash[:] - sphinxPacket, err := state.sphinx.ProcessOnionPacket(onionPkt, rHash) + rHash := msg.PaymentHash[:] + sphinxPacket, err := l.sphinx.ProcessOnionPacket(onionPkt, rHash) if err != nil { // If we're unable to parse the Sphinx packet, then // we'll cancel the HTLC after the current commitment // transition. - peerLog.Errorf("unable to process onion pkt: %v", err) - state.htlcsToCancel[index] = lnwire.SphinxParseError + log.Errorf("unable to process onion pkt: %v", + err) + l.htlcsToCancel[index] = lnwire.SphinxParseError return } @@ -448,31 +426,31 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { // attempt to see if we have an invoice locally which'll allow // us to settle this HTLC. case sphinx.ExitNode: - rHash := htlcPkt.PaymentHash - invoice, err := p.server.invoices.LookupInvoice(rHash) + rHash := msg.PaymentHash + invoice, err := l.cfg.Registry.LookupInvoice(rHash) if err != nil { // If we're the exit node, but don't recognize // the payment hash, then we'll fail the HTLC // on the next state transition. - peerLog.Errorf("unable to settle HTLC, "+ + log.Errorf("unable to settle HTLC, "+ "payment hash (%x) unrecognized", rHash[:]) - state.htlcsToCancel[index] = lnwire.UnknownPaymentHash + l.htlcsToCancel[index] = lnwire.UnknownPaymentHash return } // If we're not currently in debug mode, and the // extended HTLC doesn't meet the value requested, then // we'll fail the HTLC. - if !cfg.DebugHTLC && htlcPkt.Amount < invoice.Terms.Value { - peerLog.Errorf("rejecting HTLC due to incorrect "+ + if !l.cfg.DebugHTLC && msg.Amount < invoice.Terms.Value { + log.Errorf("rejecting HTLC due to incorrect "+ "amount: expected %v, received %v", - invoice.Terms.Value, htlcPkt.Amount) - state.htlcsToCancel[index] = lnwire.IncorrectValue + invoice.Terms.Value, msg.Amount) + l.htlcsToCancel[index] = lnwire.IncorrectValue } else { // Otherwise, everything is in order and we'll // settle the HTLC after the current state // transition. - state.htlcsToSettle[index] = invoice + l.htlcsToSettle[index] = invoice } // There are additional hops left within this route, so we @@ -481,68 +459,70 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { // switch, we'll attach the routing information so the switch // can finalize the circuit. case sphinx.MoreHops: - state.pendingCircuits[index] = sphinxPacket + l.pendingCircuits[index] = sphinxPacket default: - peerLog.Errorf("mal formed onion packet") - state.htlcsToCancel[index] = lnwire.SphinxParseError + log.Errorf("malformed onion packet") + l.htlcsToCancel[index] = lnwire.SphinxParseError } case *lnwire.UpdateFufillHTLC: - pre := htlcPkt.PaymentPreimage - idx := htlcPkt.ID - if err := state.channel.ReceiveHTLCSettle(pre, idx); err != nil { + pre := msg.PaymentPreimage + idx := msg.ID + if err := l.channel.ReceiveHTLCSettle(pre, idx); err != nil { // TODO(roasbeef): broadcast on-chain - peerLog.Errorf("settle for outgoing HTLC rejected: %v", err) - p.Disconnect() + log.Errorf("unable to handle upstream settle "+ + "HTLC: %v", err) + l.cfg.Peer.Disconnect() return } // TODO(roasbeef): add preimage to DB in order to swipe // repeated r-values case *lnwire.UpdateFailHTLC: - idx := htlcPkt.ID - if err := state.channel.ReceiveFailHTLC(idx); err != nil { - peerLog.Errorf("unable to recv HTLC cancel: %v", err) - p.Disconnect() + idx := msg.ID + if err := l.channel.ReceiveFailHTLC(idx); err != nil { + log.Errorf("unable to handle upstream fail HTLC: "+ + "%v", err) + l.cfg.Peer.Disconnect() return } - state.cancelReasons[idx] = lnwire.FailCode(htlcPkt.Reason[0]) + l.cancelReasons[idx] = lnwire.FailCode(msg.Reason[0]) case *lnwire.CommitSig: // We just received a new update to our local commitment chain, // validate this new commitment, closing the link if invalid. // TODO(roasbeef): redundant re-serialization - sig := htlcPkt.CommitSig.Serialize() - if err := state.channel.ReceiveNewCommitment(sig); err != nil { - peerLog.Errorf("unable to accept new commitment: %v", err) - p.Disconnect() + sig := msg.CommitSig.Serialize() + if err := l.channel.ReceiveNewCommitment(sig); err != nil { + log.Errorf("unable to accept new commitment: %v", err) + l.cfg.Peer.Disconnect() return } // As we've just just accepted a new state, we'll now // immediately send the remote peer a revocation for our prior // state. - nextRevocation, err := state.channel.RevokeCurrentCommitment() + nextRevocation, err := l.channel.RevokeCurrentCommitment() if err != nil { - peerLog.Errorf("unable to revoke commitment: %v", err) + log.Errorf("unable to revoke commitment: %v", err) return } - p.queueMsg(nextRevocation, nil) + l.cfg.Peer.SendMessage(nextRevocation) // If both commitment chains are fully synced from our PoV, // then we don't need to reply with a signature as both sides - // already have a commitment with the latest accepted state. - if state.channel.FullySynced() { + // already have a commitment with the latest accepted l. + if l.channel.FullySynced() { return } // Otherwise, the remote party initiated the state transition, // so we'll reply with a signature to provide them with their - // version of the latest commitment state. - if err := p.updateCommitTx(state); err != nil { - peerLog.Errorf("unable to update commitment: %v", err) - p.Disconnect() + // version of the latest commitment l. + if err := l.updateCommitTx(); err != nil { + log.Errorf("unable to update commitment: %v", err) + l.cfg.Peer.Disconnect() return } @@ -550,10 +530,10 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { // We've received a revocation from the remote chain, if valid, // this moves the remote chain forward, and expands our // revocation window. - htlcsToForward, err := state.channel.ReceiveRevocation(htlcPkt) + htlcsToForward, err := l.channel.ReceiveRevocation(msg) if err != nil { - peerLog.Errorf("unable to accept revocation: %v", err) - p.Disconnect() + log.Errorf("unable to accept revocation: %v", err) + l.cfg.Peer.Disconnect() return } @@ -566,7 +546,7 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { cancelledHtlcs := make(map[uint64]struct{}) for _, htlc := range htlcsToForward { parentIndex := htlc.ParentIndex - if p, ok := state.clearedHTCLs[parentIndex]; ok { + if p, ok := l.clearedHTCLs[parentIndex]; ok { switch htlc.EntryType { // If the HTLC was settled successfully, then // we return a nil error as well as the payment @@ -575,17 +555,17 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { p.preImage <- htlc.RPreimage p.err <- nil - // Otherwise, the HTLC failed, so we propagate - // the error back to the potential caller. + // Otherwise, the HTLC failed, so we propagate + // the error back to the potential caller. case lnwallet.Fail: - errMsg := state.cancelReasons[parentIndex] + errMsg := l.cancelReasons[parentIndex] p.preImage <- [32]byte{} p.err <- errors.New(errMsg.String()) } close(p.done) - delete(state.clearedHTCLs, htlc.ParentIndex) + delete(l.clearedHTCLs, htlc.ParentIndex) } // TODO(roasbeef): rework log entries to a shared @@ -597,24 +577,24 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { // If we can settle this HTLC within our local state // update log, then send the update entry to the remote // party. - invoice, ok := state.htlcsToSettle[htlc.Index] + invoice, ok := l.htlcsToSettle[htlc.Index] if ok { preimage := invoice.Terms.PaymentPreimage - logIndex, err := state.channel.SettleHTLC(preimage) + logIndex, err := l.channel.SettleHTLC(preimage) if err != nil { - peerLog.Errorf("unable to settle htlc: %v", err) - p.Disconnect() + log.Errorf("unable to settle htlc: %v", err) + l.cfg.Peer.Disconnect() continue } settleMsg := &lnwire.UpdateFufillHTLC{ - ChanID: state.chanID, + ChanID: l.chanID, ID: logIndex, PaymentPreimage: preimage, } - p.queueMsg(settleMsg, nil) + l.cfg.Peer.SendMessage(settleMsg) - delete(state.htlcsToSettle, htlc.Index) + delete(l.htlcsToSettle, htlc.Index) settledPayments[htlc.RHash] = struct{}{} bandwidthUpdate += htlc.Amount @@ -625,25 +605,25 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { // cancellation, then immediately cancel the HTLC as // it's now locked in within both commitment // transactions. - reason, ok := state.htlcsToCancel[htlc.Index] + reason, ok := l.htlcsToCancel[htlc.Index] if !ok { continue } - logIndex, err := state.channel.FailHTLC(htlc.RHash) + logIndex, err := l.channel.FailHTLC(htlc.RHash) if err != nil { - peerLog.Errorf("unable to cancel htlc: %v", err) - p.Disconnect() + log.Errorf("unable to cancel htlc: %v", err) + l.cfg.Peer.Disconnect() continue } cancelMsg := &lnwire.UpdateFailHTLC{ - ChanID: state.chanID, + ChanID: l.chanID, ID: logIndex, Reason: []byte{byte(reason)}, } - p.queueMsg(cancelMsg, nil) - delete(state.htlcsToCancel, htlc.Index) + l.cfg.Peer.SendMessage(cancelMsg) + delete(l.htlcsToCancel, htlc.Index) cancelledHtlcs[htlc.Index] = struct{}{} } @@ -660,23 +640,23 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { continue } - onionPkt := state.pendingCircuits[htlc.Index] - delete(state.pendingCircuits, htlc.Index) + onionPkt := l.pendingCircuits[htlc.Index] + delete(l.pendingCircuits, htlc.Index) - reason := state.cancelReasons[htlc.ParentIndex] - delete(state.cancelReasons, htlc.ParentIndex) + reason := l.cancelReasons[htlc.ParentIndex] + delete(l.cancelReasons, htlc.ParentIndex) // Send this fully activated HTLC to the htlc // switch to continue the chained clear/settle. - pkt, err := logEntryToHtlcPkt(state.chanID, + pkt, err := logEntryToHtlcPkt(l.chanID, htlc, onionPkt, reason) if err != nil { - peerLog.Errorf("unable to make htlc pkt: %v", + log.Errorf("unable to make htlc pkt: %v", err) continue } - state.switchChan <- pkt + l.switchChan <- pkt } }() @@ -696,9 +676,9 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { // With all the settle updates added to the local and remote // HTLC logs, initiate a state transition by updating the // remote commitment chain. - if err := p.updateCommitTx(state); err != nil { - peerLog.Errorf("unable to update commitment: %v", err) - p.Disconnect() + if err := l.updateCommitTx(); err != nil { + log.Errorf("unable to update commitment: %v", err) + l.cfg.Peer.Disconnect() return } @@ -706,9 +686,9 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { // with this latest commitment update. // TODO(roasbeef): wait until next transition? for invoice := range settledPayments { - err := p.server.invoices.SettleInvoice(chainhash.Hash(invoice)) + err := l.cfg.Registry.SettleInvoice(chainhash.Hash(invoice)) if err != nil { - peerLog.Errorf("unable to settle invoice: %v", err) + log.Errorf("unable to settle invoice: %v", err) } } } @@ -717,11 +697,11 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { // updateCommitTx signs, then sends an update to the remote peer adding a new // commitment to their commitment chain which includes all the latest updates // we've received+processed up to this point. -func (p *peer) updateCommitTx(state *commitmentState) error { - sigTheirs, err := state.channel.SignNextCommitment() +func (l *channelLink) updateCommitTx() error { + sigTheirs, err := l.channel.SignNextCommitment() if err == lnwallet.ErrNoWindow { - peerLog.Tracef("revocation window exhausted, unable to send %v", - len(state.pendingBatch)) + log.Tracef("revocation window exhausted, unable to send %v", + len(l.pendingBatch)) return nil } else if err != nil { return err @@ -733,21 +713,21 @@ func (p *peer) updateCommitTx(state *commitmentState) error { } commitSig := &lnwire.CommitSig{ - ChanID: state.chanID, + ChanID: l.ChanID(), CommitSig: parsedSig, } - p.queueMsg(commitSig, nil) + l.cfg.Peer.SendMessage(commitSig) // As we've just cleared out a batch, move all pending updates to the // map of cleared HTLCs, clearing out the set of pending updates. - for _, update := range state.pendingBatch { - state.clearedHTCLs[update.index] = update + for _, update := range l.pendingBatch { + l.clearedHTCLs[update.index] = update } // Finally, clear our the current batch, and flip the pendingUpdate // bool to indicate were waiting for a commitment signature. // TODO(roasbeef): re-slice instead to avoid GC? - state.pendingBatch = nil + l.pendingBatch = nil return nil } @@ -795,11 +775,8 @@ func logEntryToHtlcPkt(chanID lnwire.ChannelID, pd *lnwallet.PaymentDescriptor, pkt.payHash = pd.RHash } - pkt.amt = pd.Amount - pkt.msg = msg - - pkt.srcLink = chanID - pkt.onion = onionPkt + pkt.htlc = msg + pkt.src = chanID return pkt, nil }