htlcswicth: start use htlcswitch and channel link inside lnd

In current commit big shift have been made in direction of unit testable
payments scenarios. Previosly two additional structures have been added
which had been spreaded in the lnd package before, and now we apply
them in the lnd itself:

1. ChannelLink - is an interface which represents the subsystem for
managing the incoming htlc requests, applying the changes to the
channel, and also propagating/forwarding it to htlc switch.

2. Switch - is a central messaging bus for all incoming/outgoing htlc's.
The goal of the switch is forward the incoming/outgoing htlc messages
from one channel to another, and also propagate the settle/fail htlc
messages back to original requester.

With this abtractions the folowing schema becomes nearly complete:

abstraction
    ^
    |
    | - - - - - - - - - - - - Lightning - - - - - - - - - - - - -
    |
    | (Switch)		        (Switch)		  (Switch)
    |  Alice <-- channel link --> Bob <-- channel link --> Carol
    |
    | - - - - - - - - - - - - - TCP - - - - - - - - - - - - - - -
    |
    |  (Peer) 		        (Peer)	                  (Peer)
    |  Alice <----- tcp conn --> Bob <---- tcp conn -----> Carol
This commit is contained in:
Andrey Samokhvalov 2017-05-02 23:04:58 +03:00 committed by Olaoluwa Osuntokun
parent 79feebea80
commit c4955258f1
6 changed files with 142 additions and 1837 deletions

@ -7,6 +7,7 @@ import (
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet"
"github.com/roasbeef/btcd/chaincfg/chainhash" "github.com/roasbeef/btcd/chaincfg/chainhash"
"github.com/roasbeef/btcd/txscript" "github.com/roasbeef/btcd/txscript"
@ -27,8 +28,8 @@ type breachArbiter struct {
db *channeldb.DB db *channeldb.DB
notifier chainntnfs.ChainNotifier notifier chainntnfs.ChainNotifier
chainIO lnwallet.BlockChainIO chainIO lnwallet.BlockChainIO
htlcSwitch *htlcSwitch
estimator lnwallet.FeeEstimator estimator lnwallet.FeeEstimator
htlcSwitch *htlcswitch.Switch
// breachObservers is a map which tracks all the active breach // breachObservers is a map which tracks all the active breach
// observers we're currently managing. The key of the map is the // observers we're currently managing. The key of the map is the
@ -64,7 +65,7 @@ type breachArbiter struct {
// newBreachArbiter creates a new instance of a breachArbiter initialized with // newBreachArbiter creates a new instance of a breachArbiter initialized with
// its dependent objects. // its dependent objects.
func newBreachArbiter(wallet *lnwallet.LightningWallet, db *channeldb.DB, func newBreachArbiter(wallet *lnwallet.LightningWallet, db *channeldb.DB,
notifier chainntnfs.ChainNotifier, h *htlcSwitch, notifier chainntnfs.ChainNotifier, h *htlcswitch.Switch,
chain lnwallet.BlockChainIO, fe lnwallet.FeeEstimator) *breachArbiter { chain lnwallet.BlockChainIO, fe lnwallet.FeeEstimator) *breachArbiter {
return &breachArbiter{ return &breachArbiter{
@ -482,7 +483,7 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel,
// breached in order to ensure any incoming or outgoing // breached in order to ensure any incoming or outgoing
// multi-hop HTLCs aren't sent over this link, nor any other // multi-hop HTLCs aren't sent over this link, nor any other
// links associated with this peer. // links associated with this peer.
b.htlcSwitch.CloseLink(chanPoint, CloseBreach) b.htlcSwitch.CloseLink(chanPoint, htlcswitch.CloseBreach)
chanInfo := contract.StateSnapshot() chanInfo := contract.StateSnapshot()
closeInfo := &channeldb.ChannelCloseSummary{ closeInfo := &channeldb.ChannelCloseSummary{
ChanPoint: *chanPoint, ChanPoint: *chanPoint,

@ -1,952 +0,0 @@
package main
import (
"crypto/sha256"
"encoding/hex"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lightning-onion"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/chaincfg/chainhash"
"github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil"
"golang.org/x/crypto/ripemd160"
)
const (
// htlcQueueSize...
// buffer bloat ;)
htlcQueueSize = 50
)
var (
zeroBytes [32]byte
)
// boundedLinkChan is a simple wrapper around a link's communication channel
// that bounds the total flow into and through the channel. Channels attached
// the link have a value which defines the max number of pending HTLC's present
// within the commitment transaction. Using this struct we establish a
// synchronization primitive that ensure we don't send additional htlcPackets
// to a link if the max limit has een reached. Once HTLC's are cleared from the
// commitment transaction, slots are freed up and more can proceed.
type boundedLinkChan struct {
// slots is a buffered channel whose buffer is the total number of
// outstanding HTLC's we can add to a link's commitment transaction.
// This channel is essentially used as a semaphore.
slots chan struct{}
// linkChan is a channel that is connected to the channel state machine
// for a link. The switch will send adds, settles, and cancels over
// this channel.
linkChan chan *htlcPacket
}
// newBoundedChan makes a new boundedLinkChan that has numSlots free slots that
// are depleted on each send until a slot is re-stored. linkChan is the
// underlying channel that will be sent upon.
func newBoundedLinkChan(numSlots uint32,
linkChan chan *htlcPacket) *boundedLinkChan {
b := &boundedLinkChan{
slots: make(chan struct{}, numSlots),
linkChan: linkChan,
}
b.restoreSlots(numSlots)
return b
}
// sendAndConsume sends a packet to the linkChan and consumes a single token in
// the process.
//
// TODO(roasbeef): add error fall through case?
func (b *boundedLinkChan) sendAndConsume(pkt *htlcPacket) {
<-b.slots
b.linkChan <- pkt
}
// sendAndRestore sends a packet to the linkChan and consumes a single token in
// the process. This method is called when the switch sends either a cancel or
// settle HTLC message to the link.
func (b *boundedLinkChan) sendAndRestore(pkt *htlcPacket) {
b.linkChan <- pkt
b.slots <- struct{}{}
}
// consumeSlot consumes a single slot from the bounded channel. This method is
// called once the switch receives a new htlc add message from a link right
// before forwarding it to the next hop.
func (b *boundedLinkChan) consumeSlot() {
<-b.slots
}
// restoreSlot restores a single slots to the bounded channel. This method is
// called once the switch receives an HTLC cancel or settle from a link.
func (b *boundedLinkChan) restoreSlot() {
b.slots <- struct{}{}
}
// restoreSlots adds numSlots additional slots to the bounded channel.
func (b *boundedLinkChan) restoreSlots(numSlots uint32) {
for i := uint32(0); i < numSlots; i++ {
b.slots <- struct{}{}
}
}
// link represents an active channel capable of forwarding HTLCs. Each
// active channel registered with the htlc switch creates a new link which will
// be used for forwarding outgoing HTLCs. The link also has additional
// metadata such as the current available bandwidth of the link (in satoshis)
// which aid the switch in optimally forwarding HTLCs.
type link struct {
chanID lnwire.ChannelID
capacity btcutil.Amount
availableBandwidth int64 // atomic
peer *peer
*boundedLinkChan
}
// htlcPacket is a wrapper around an lnwire message which adds, times out, or
// settles an active HTLC. The dest field denotes the name of the interface to
// forward this htlcPacket on.
type htlcPacket struct {
sync.RWMutex
dest chainhash.Hash
srcLink lnwire.ChannelID
onion *sphinx.ProcessedPacket
msg lnwire.Message
// TODO(roasbeef): refactor and add type to pkt message
payHash [32]byte
amt btcutil.Amount
preImage chan [32]byte
err chan error
done chan struct{}
}
// circuitKey uniquely identifies an active Sphinx (onion routing) circuit
// between two open channels. Currently, the rHash of the HTLC which created
// the circuit is used to uniquely identify each circuit.
// TODO(roasbeef): need to also add in the settle/clear channel points in order
// to support fragmenting payments on the link layer: 1 to N, N to N, etc.
type circuitKey [32]byte
// paymentCircuit represents an active Sphinx (onion routing) circuit between
// two active links within the htlcSwitch. A payment circuit is created once a
// link forwards an HTLC add request which initiates the creation of the
// circuit. The onion routing information contained within this message is
// used to identify the settle/clear ends of the circuit. A circuit may be
// re-used (not torndown) in the case that multiple HTLCs with the send RHash
// are sent.
type paymentCircuit struct {
refCount uint32
// clear is the link the htlcSwitch will forward the HTLC add message
// that initiated the circuit to. Once the message is forwarded, the
// payment circuit is considered "active" from the POV of the switch as
// both the incoming/outgoing channels have the cleared HTLC within
// their latest state.
clear *link
// settle is the link the htlcSwitch will forward the HTLC settle it
// receives from the outgoing peer to. Once the switch forwards the
// settle message to this link, the payment circuit is considered
// complete unless the reference count on the circuit is greater than
// 1.
settle *link
}
// htlcSwitch is a central messaging bus for all incoming/outgoing HTLCs.
// Connected peers with active channels are treated as named interfaces which
// refer to active channels as links. A link is the switch's message
// communication point with the goroutine that manages an active channel. New
// links are registered each time a channel is created, and unregistered once
// the channel is closed. The switch manages the hand-off process for multi-hop
// HTLCs, forwarding HTLCs initiated from within the daemon, and additionally
// splitting up incoming/outgoing HTLCs to a particular interface amongst many
// links (payment fragmentation).
// TODO(roasbeef): active sphinx circuits need to be synced to disk
type htlcSwitch struct {
started int32 // atomic
shutdown int32 // atomic
// chanIndex maps a channel's ID to a link which contains additional
// information about the channel, and additionally houses a pointer to
// the peer managing the channel.
chanIndexMtx sync.RWMutex
chanIndex map[lnwire.ChannelID]*link
// interfaces maps a node's ID to the set of links (active channels) we
// currently have open with that peer.
// TODO(roasbeef): combine w/ onionIndex?
interfaceMtx sync.RWMutex
interfaces map[chainhash.Hash][]*link
// onionIndex is an index used to properly forward a message to the
// next hop within a Sphinx circuit. Within the sphinx packets, the
// "next-hop" destination is encoded as the hash160 of the node's
// public key serialized in compressed format.
onionMtx sync.RWMutex
onionIndex map[[ripemd160.Size]byte][]*link
// paymentCircuits maps a circuit key to an active payment circuit
// amongst two open channels. This map is used to properly clear/settle
// onion routed payments within the network.
paymentCircuits map[circuitKey]*paymentCircuit
// linkControl is a channel used by connected links to notify the
// switch of a non-multi-hop triggered link state update.
linkControl chan interface{}
// outgoingPayments is a channel that outgoing payments initiated by
// the RPC system.
outgoingPayments chan *htlcPacket
// htlcPlex is the channel which all connected links use to coordinate
// the setup/teardown of Sphinx (onion routing) payment circuits.
// Active links forward any add/settle messages over this channel each
// state transition, sending new adds/settles which are fully locked
// in.
htlcPlex chan *htlcPacket
// TODO(roasbeef): sampler to log sat/sec and tx/sec
wg sync.WaitGroup
quit chan struct{}
}
// newHtlcSwitch creates a new htlcSwitch.
func newHtlcSwitch() *htlcSwitch {
return &htlcSwitch{
chanIndex: make(map[lnwire.ChannelID]*link),
interfaces: make(map[chainhash.Hash][]*link),
onionIndex: make(map[[ripemd160.Size]byte][]*link),
paymentCircuits: make(map[circuitKey]*paymentCircuit),
linkControl: make(chan interface{}),
htlcPlex: make(chan *htlcPacket, htlcQueueSize),
outgoingPayments: make(chan *htlcPacket, htlcQueueSize),
quit: make(chan struct{}),
}
}
// Start starts all helper goroutines required for the operation of the switch.
func (h *htlcSwitch) Start() error {
if !atomic.CompareAndSwapInt32(&h.started, 0, 1) {
return nil
}
hswcLog.Tracef("Starting HTLC switch")
h.wg.Add(2)
go h.networkAdmin()
go h.htlcForwarder()
return nil
}
// Stop gracefully stops all active helper goroutines, then waits until they've
// exited.
func (h *htlcSwitch) Stop() error {
if !atomic.CompareAndSwapInt32(&h.shutdown, 0, 1) {
return nil
}
hswcLog.Infof("HLTC switch shutting down")
close(h.quit)
h.wg.Wait()
return nil
}
// SendHTLC queues a HTLC packet for forwarding over the designated interface.
// In the event that the interface has insufficient capacity for the payment,
// an error is returned. Additionally, if the interface cannot be found, an
// alternative error is returned.
func (h *htlcSwitch) SendHTLC(htlcPkt *htlcPacket) ([32]byte, error) {
htlcPkt.err = make(chan error, 1)
htlcPkt.done = make(chan struct{})
htlcPkt.preImage = make(chan [32]byte, 1)
h.outgoingPayments <- htlcPkt
return <-htlcPkt.preImage, <-htlcPkt.err
}
// htlcForwarder is responsible for optimally forwarding (and possibly
// fragmenting) incoming/outgoing HTLCs amongst all active interfaces and
// their links. The duties of the forwarder are similar to that of a network
// switch, in that it facilitates multi-hop payments by acting as a central
// messaging bus. The switch communicates will active links to create, manage,
// and tear down active onion routed payments. Each active channel is modeled
// as networked device with metadata such as the available payment bandwidth,
// and total link capacity.
func (h *htlcSwitch) htlcForwarder() {
// TODO(roasbeef): track pending payments here instead of within each peer?
// Examine settles/timeouts from htlcPlex. Add src to htlcPacket, key by
// (src, htlcKey).
// TODO(roasbeef): cleared vs settled distinction
var (
deltaNumUpdates, totalNumUpdates uint64
deltaSatSent, deltaSatRecv btcutil.Amount
totalSatSent, totalSatRecv btcutil.Amount
)
logTicker := time.NewTicker(10 * time.Second)
out:
for {
select {
case htlcPkt := <-h.outgoingPayments:
dest := htlcPkt.dest
h.interfaceMtx.RLock()
chanInterface, ok := h.interfaces[dest]
h.interfaceMtx.RUnlock()
if !ok {
err := fmt.Errorf("Unable to locate link %x",
dest[:])
hswcLog.Errorf(err.Error())
htlcPkt.preImage <- zeroBytes
htlcPkt.err <- err
continue
}
wireMsg := htlcPkt.msg.(*lnwire.UpdateAddHTLC)
amt := wireMsg.Amount
// Handle this send request in a distinct goroutine in
// order to avoid a possible deadlock between the htlc
// switch and channel's htlc manager.
for _, link := range chanInterface {
// TODO(roasbeef): implement HTLC fragmentation
// * avoid full channel depletion at higher
// level (here) instead of within state
// machine?
if atomic.LoadInt64(&link.availableBandwidth) < int64(amt) {
continue
}
hswcLog.Tracef("Sending %v to %x", amt, dest[:])
go func() {
link.sendAndConsume(htlcPkt)
<-htlcPkt.done
link.restoreSlot()
}()
n := atomic.AddInt64(&link.availableBandwidth,
-int64(amt))
hswcLog.Tracef("Decrementing link %v bandwidth to %v",
link.chanID, n)
continue out
}
hswcLog.Errorf("Unable to send payment, insufficient capacity")
htlcPkt.preImage <- zeroBytes
htlcPkt.err <- fmt.Errorf("Insufficient capacity")
case pkt := <-h.htlcPlex:
// TODO(roasbeef): properly account with cleared vs settled
deltaNumUpdates++
hswcLog.Tracef("plex packet: %v", newLogClosure(func() string {
if pkt.onion != nil {
pkt.onion.Packet.Header.EphemeralKey.Curve = nil
}
return spew.Sdump(pkt)
}))
switch wireMsg := pkt.msg.(type) {
// A link has just forwarded us a new HTLC, therefore
// we initiate the payment circuit within our internal
// state so we can properly forward the ultimate settle
// message.
case *lnwire.UpdateAddHTLC:
payHash := wireMsg.PaymentHash
// Create the two ends of the payment circuit
// required to ensure completion of this new
// payment.
nextHop := pkt.onion.NextHop
h.onionMtx.RLock()
clearLink, ok := h.onionIndex[nextHop]
h.onionMtx.RUnlock()
if !ok {
hswcLog.Errorf("unable to find dest end of "+
"circuit: %x", nextHop)
// We were unable to locate the
// next-hop as encoded within the
// Sphinx packet. Therefore, we send a
// cancellation message back to the
// source of the packet so they can
// propagate the message back to the
// origin.
cancelPkt := &htlcPacket{
payHash: payHash,
msg: &lnwire.UpdateFailHTLC{
Reason: []byte{uint8(lnwire.UnknownDestination)},
},
err: make(chan error, 1),
}
h.chanIndexMtx.RLock()
cancelLink := h.chanIndex[pkt.srcLink]
h.chanIndexMtx.RUnlock()
cancelLink.linkChan <- cancelPkt
continue
}
h.chanIndexMtx.RLock()
settleLink := h.chanIndex[pkt.srcLink]
h.chanIndexMtx.RUnlock()
// As the link now has a new HTLC that's been
// propagated to us, we'll consume a slot from
// it's bounded channel.
settleLink.consumeSlot()
// If the link we're attempting to forward the
// HTLC over has insufficient capacity, then
// we'll cancel the HTLC as the payment cannot
// succeed.
linkBandwidth := atomic.LoadInt64(&clearLink[0].availableBandwidth)
if linkBandwidth < int64(wireMsg.Amount) {
hswcLog.Errorf("unable to forward HTLC "+
"link %v has insufficient "+
"capacity, have %v need %v",
clearLink[0].chanID, linkBandwidth,
int64(wireMsg.Amount))
pkt := &htlcPacket{
payHash: payHash,
msg: &lnwire.UpdateFailHTLC{
Reason: []byte{uint8(lnwire.InsufficientCapacity)},
},
err: make(chan error, 1),
}
// Send the cancel message along the
// link, restoring a slot in the
// bounded channel in the process.
settleLink.sendAndRestore(pkt)
continue
}
// Examine the circuit map to see if this
// circuit is already in use or not. If so,
// then we'll simply increment the reference
// count. Otherwise, we'll create a new circuit
// from scratch.
//
// TODO(roasbeef): include dest+src+amt in key
cKey := circuitKey(wireMsg.PaymentHash)
circuit, ok := h.paymentCircuits[cKey]
if ok {
hswcLog.Debugf("Increasing ref_count "+
"of circuit: %x, from %v to %v",
wireMsg.PaymentHash,
circuit.refCount,
circuit.refCount+1)
circuit.refCount++
} else {
hswcLog.Debugf("Creating onion "+
"circuit for %x: %v<->%v",
cKey[:], clearLink[0].chanID,
settleLink.chanID)
circuit = &paymentCircuit{
clear: clearLink[0],
settle: settleLink,
refCount: 1,
}
h.paymentCircuits[cKey] = circuit
}
// With the circuit initiated, send the htlcPkt
// to the clearing link within the circuit to
// continue propagating the HTLC across the
// network.
circuit.clear.sendAndConsume(&htlcPacket{
msg: wireMsg,
preImage: make(chan [32]byte, 1),
err: make(chan error, 1),
done: make(chan struct{}),
})
// Reduce the available bandwidth for the link
// as it will clear the above HTLC, increasing
// the limbo balance within the channel.
n := atomic.AddInt64(&circuit.clear.availableBandwidth,
-int64(pkt.amt))
hswcLog.Tracef("Decrementing link %v bandwidth to %v",
circuit.clear.chanID, n)
deltaSatRecv += pkt.amt
// We've just received a settle message which means we
// can finalize the payment circuit by forwarding the
// settle msg to the link which initially created the
// circuit.
case *lnwire.UpdateFufillHTLC:
rHash := sha256.Sum256(wireMsg.PaymentPreimage[:])
var cKey circuitKey
copy(cKey[:], rHash[:])
// If we initiated the payment then there won't
// be an active circuit to continue propagating
// the settle over. Therefore, we exit early.
circuit, ok := h.paymentCircuits[cKey]
if !ok {
hswcLog.Debugf("No existing circuit "+
"for %x to settle", rHash[:])
deltaSatSent += pkt.amt
continue
}
circuit.clear.restoreSlot()
circuit.settle.sendAndRestore(&htlcPacket{
msg: wireMsg,
err: make(chan error, 1),
})
// Increase the available bandwidth for the
// link as it will settle the above HTLC,
// subtracting from the limbo balance and
// incrementing its local balance.
n := atomic.AddInt64(&circuit.settle.availableBandwidth,
int64(pkt.amt))
hswcLog.Tracef("Incrementing link %v bandwidth to %v",
circuit.settle.chanID, n)
deltaSatSent += pkt.amt
if circuit.refCount--; circuit.refCount == 0 {
hswcLog.Debugf("Closing completed onion "+
"circuit for %x: %v<->%v", rHash[:],
circuit.clear.chanID,
circuit.settle.chanID)
delete(h.paymentCircuits, cKey)
}
// We've just received an HTLC cancellation triggered
// by an upstream peer somewhere within the ultimate
// route. In response, we'll terminate the payment
// circuit and propagate the error backwards.
case *lnwire.UpdateFailHTLC:
// In order to properly handle the error, we'll
// need to look up the original circuit that
// the incoming HTLC created.
circuit, ok := h.paymentCircuits[pkt.payHash]
if !ok {
hswcLog.Debugf("No existing circuit "+
"for %x to cancel", pkt.payHash)
continue
}
circuit.clear.restoreSlot()
// Since an outgoing HTLC we sent on the clear
// link has been cancelled, we update the
// bandwidth of the clear link, restoring the
// value of the HTLC worth.
n := atomic.AddInt64(&circuit.clear.availableBandwidth,
int64(pkt.amt))
hswcLog.Debugf("HTLC %x has been cancelled, "+
"incrementing link %v bandwidth to %v", pkt.payHash,
circuit.clear.chanID, n)
// With our link info updated, we now continue
// the error propagation by sending the
// cancellation message over the link that sent
// us the incoming HTLC.
circuit.settle.sendAndRestore(&htlcPacket{
msg: wireMsg,
payHash: pkt.payHash,
err: make(chan error, 1),
})
if circuit.refCount--; circuit.refCount == 0 {
hswcLog.Debugf("Closing cancelled onion "+
"circuit for %x: %v<->%v", pkt.payHash,
circuit.clear.chanID,
circuit.settle.chanID)
delete(h.paymentCircuits, pkt.payHash)
}
}
case <-logTicker.C:
if deltaNumUpdates == 0 {
continue
}
oldSatSent := totalSatRecv
oldSatRecv := totalSatRecv
oldNumUpdates := totalNumUpdates
newSatSent := oldSatRecv + deltaSatSent
newSatRecv := totalSatRecv + deltaSatRecv
newNumUpdates := totalNumUpdates + deltaNumUpdates
satSent := newSatSent - oldSatSent
satRecv := newSatRecv - oldSatRecv
numUpdates := newNumUpdates - oldNumUpdates
hswcLog.Infof("Sent %v satoshis, received %v satoshis in "+
"the last 10 seconds (%v tx/sec)",
satSent.ToUnit(btcutil.AmountSatoshi),
satRecv.ToUnit(btcutil.AmountSatoshi),
numUpdates)
totalSatSent += deltaSatSent
deltaSatSent = 0
totalSatRecv += deltaSatRecv
deltaSatRecv = 0
totalNumUpdates += deltaNumUpdates
deltaNumUpdates = 0
case <-h.quit:
break out
}
}
h.wg.Done()
}
// networkAdmin is responsible for handling requests to register, unregister,
// and close any link. In the event that an unregister request leaves an
// interface with no active links, that interface is garbage collected.
func (h *htlcSwitch) networkAdmin() {
out:
for {
select {
case msg := <-h.linkControl:
switch req := msg.(type) {
case *closeLinkReq:
h.handleCloseLink(req)
case *registerLinkMsg:
h.handleRegisterLink(req)
case *unregisterLinkMsg:
h.handleUnregisterLink(req)
case *linkInfoUpdateMsg:
h.handleLinkUpdate(req)
}
case <-h.quit:
break out
}
}
h.wg.Done()
}
// handleRegisterLink registers a new link within the channel index, and also
// adds the link to the existing set of links for the target interface.
func (h *htlcSwitch) handleRegisterLink(req *registerLinkMsg) {
chanPoint := req.linkInfo.ChannelPoint
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
newLink := &link{
capacity: req.linkInfo.Capacity,
availableBandwidth: int64(req.linkInfo.LocalBalance),
peer: req.peer,
chanID: chanID,
}
// To ensure we never accidentally cause an HTLC overflow, we'll limit,
// we'll use this buffered channel as as semaphore in order to limit
// the number of outstanding HTLC's we extend to the target link.
//const numSlots = (lnwallet.MaxHTLCNumber / 2) - 1
const numSlots = lnwallet.MaxHTLCNumber - 5
newLink.boundedLinkChan = newBoundedLinkChan(numSlots, req.linkChan)
// First update the channel index with this new channel point. The
// channel index will be used to quickly lookup channels in order to:
// close them, update their link capacity, or possibly during multi-hop
// HTLC forwarding.
h.chanIndexMtx.Lock()
h.chanIndex[chanID] = newLink
h.chanIndexMtx.Unlock()
interfaceID := req.peer.lightningID
h.interfaceMtx.Lock()
h.interfaces[interfaceID] = append(h.interfaces[interfaceID], newLink)
h.interfaceMtx.Unlock()
// Next, update the onion index which is used to look up the
// settle/clear links during multi-hop payments and to dispatch
// outgoing payments initiated by a local subsystem.
var onionID [ripemd160.Size]byte
copy(onionID[:], btcutil.Hash160(req.peer.addr.IdentityKey.SerializeCompressed()))
h.onionMtx.Lock()
h.onionIndex[onionID] = h.interfaces[interfaceID]
h.onionMtx.Unlock()
hswcLog.Infof("registering new link, interface=%x, onion_link=%x, "+
"chan_point=%v, capacity=%v", interfaceID[:], onionID,
chanPoint, newLink.capacity)
if req.done != nil {
req.done <- struct{}{}
}
}
// handleUnregisterLink unregisters a currently active link. If the deletion of
// this link leaves the interface empty, then the interface entry itself is
// also deleted.
func (h *htlcSwitch) handleUnregisterLink(req *unregisterLinkMsg) {
hswcLog.Debugf("unregistering active link, interface=%v, chan_id=%v",
hex.EncodeToString(req.chanInterface[:]), req.chanID)
chanInterface := req.chanInterface
h.interfaceMtx.RLock()
links := h.interfaces[chanInterface]
h.interfaceMtx.RUnlock()
h.chanIndexMtx.Lock()
defer h.chanIndexMtx.Unlock()
h.onionMtx.Lock()
defer h.onionMtx.Unlock()
// A request with a nil channel point indicates that all the current
// links for this channel should be cleared.
if req.chanID == nil {
hswcLog.Debugf("purging all active links for interface %v",
hex.EncodeToString(chanInterface[:]))
for _, link := range links {
delete(h.chanIndex, link.chanID)
}
links = nil
} else {
delete(h.chanIndex, *req.chanID)
for i := 0; i < len(links); i++ {
chanLink := links[i]
if chanLink.chanID == *req.chanID {
// We perform an in-place delete by sliding
// every element down one, then slicing off the
// last element. Additionally, we update the
// slice reference within the source map to
// ensure full deletion.
copy(links[i:], links[i+1:])
links[len(links)-1] = nil
h.interfaceMtx.Lock()
h.interfaces[chanInterface] = links[:len(links)-1]
h.interfaceMtx.Unlock()
break
}
}
}
if len(links) == 0 {
hswcLog.Debugf("interface %v has no active links, destroying",
hex.EncodeToString(chanInterface[:]))
// Delete the peer from the onion index so that the
// htlcForwarder knows not to attempt to forward any further
// HTLCs in this direction.
var onionID [ripemd160.Size]byte
copy(onionID[:], btcutil.Hash160(req.remoteID))
delete(h.onionIndex, onionID)
// Finally, delete the interface itself so that outgoing
// payments don't select this path.
h.interfaceMtx.Lock()
delete(h.interfaces, chanInterface)
h.interfaceMtx.Unlock()
}
if req.done != nil {
req.done <- struct{}{}
}
}
// handleCloseLink sends a message to the peer responsible for the target
// channel point, instructing it to initiate a cooperative channel closure.
func (h *htlcSwitch) handleCloseLink(req *closeLinkReq) {
chanID := lnwire.NewChanIDFromOutPoint(req.chanPoint)
h.chanIndexMtx.RLock()
targetLink, ok := h.chanIndex[chanID]
h.chanIndexMtx.RUnlock()
if !ok {
req.err <- fmt.Errorf("channel %v not found, or peer "+
"offline", req.chanPoint)
return
}
hswcLog.Debugf("requesting interface %v to close link %v",
hex.EncodeToString(targetLink.peer.lightningID[:]), chanID)
targetLink.peer.localCloseChanReqs <- req
// TODO(roasbeef): if type was CloseBreach initiate force closure with
// all other channels (if any) we have with the remote peer.
}
// handleLinkUpdate processes the link info update message by adjusting the
// channel's available bandwidth by the delta specified within the message.
func (h *htlcSwitch) handleLinkUpdate(req *linkInfoUpdateMsg) {
h.chanIndexMtx.RLock()
link, ok := h.chanIndex[req.targetLink]
h.chanIndexMtx.RUnlock()
if !ok {
hswcLog.Errorf("received link update for non-existent link: %v",
req.targetLink)
return
}
atomic.AddInt64(&link.availableBandwidth, int64(req.bandwidthDelta))
hswcLog.Tracef("adjusting bandwidth of link %v by %v", req.targetLink,
req.bandwidthDelta)
}
// registerLinkMsg is message which requests a new link to be registered.
type registerLinkMsg struct {
peer *peer
linkInfo *channeldb.ChannelSnapshot
linkChan chan *htlcPacket
done chan struct{}
}
// RegisterLink requests the htlcSwitch to register a new active link. The new
// link encapsulates an active channel. The htlc plex channel is returned. The
// plex channel allows the switch to properly de-multiplex incoming/outgoing
// HTLC messages forwarding them to their proper destination in the multi-hop
// settings.
func (h *htlcSwitch) RegisterLink(p *peer, linkInfo *channeldb.ChannelSnapshot,
linkChan chan *htlcPacket) chan *htlcPacket {
done := make(chan struct{}, 1)
req := &registerLinkMsg{p, linkInfo, linkChan, done}
h.linkControl <- req
<-done
return h.htlcPlex
}
// unregisterLinkMsg is a message which requests the active link be unregistered.
type unregisterLinkMsg struct {
chanInterface [32]byte
chanID *lnwire.ChannelID
// remoteID is the identity public key of the node we're removing the
// link between. The public key is expected to be serialized in
// compressed form.
// TODO(roasbeef): redo interface map
remoteID []byte
done chan struct{}
}
// UnregisterLink requests the htlcSwitch to register the new active link. An
// unregistered link will no longer be considered a candidate to forward
// HTLCs.
func (h *htlcSwitch) UnregisterLink(remotePub *btcec.PublicKey,
chanID *lnwire.ChannelID) {
done := make(chan struct{}, 1)
rawPub := remotePub.SerializeCompressed()
h.linkControl <- &unregisterLinkMsg{
chanInterface: sha256.Sum256(rawPub),
chanID: chanID,
remoteID: rawPub,
done: done,
}
<-done
}
// LinkCloseType is an enum which signals the type of channel closure the switch
// should execute.
type LinkCloseType uint8
const (
// CloseRegular indicates a regular cooperative channel closure should
// be attempted.
CloseRegular LinkCloseType = iota
// CloseBreach indicates that a channel breach has been detected, and
// the link should immediately be marked as unavailable.
CloseBreach
)
// closeLinkReq represents a request to close a particular channel specified by
// its outpoint.
type closeLinkReq struct {
CloseType LinkCloseType
chanPoint *wire.OutPoint
updates chan *lnrpc.CloseStatusUpdate
err chan error
}
// CloseLink closes an active link targetted by its channel point. Closing the
// link initiates a cooperative channel closure iff forceClose is false. If
// forceClose is true, then a unilateral channel closure is executed.
// TODO(roasbeef): consolidate with UnregisterLink?
func (h *htlcSwitch) CloseLink(chanPoint *wire.OutPoint,
closeType LinkCloseType) (chan *lnrpc.CloseStatusUpdate, chan error) {
updateChan := make(chan *lnrpc.CloseStatusUpdate, 1)
errChan := make(chan error, 1)
h.linkControl <- &closeLinkReq{
CloseType: closeType,
chanPoint: chanPoint,
updates: updateChan,
err: errChan,
}
return updateChan, errChan
}
// linkInfoUpdateMsg encapsulates a request for the htlc switch to update the
// metadata related to the target link.
type linkInfoUpdateMsg struct {
targetLink lnwire.ChannelID
bandwidthDelta btcutil.Amount
}
// UpdateLink sends a message to the switch to update the available bandwidth
// within the link by the passed satoshi delta. This function may be used when
// re-anchoring to boost the capacity of a channel, or once a peer settles an
// HTLC invoice.
func (h *htlcSwitch) UpdateLink(chanID lnwire.ChannelID, delta btcutil.Amount) {
h.linkControl <- &linkInfoUpdateMsg{
targetLink: chanID,
bandwidthDelta: delta,
}
}

2
log.go

@ -9,6 +9,7 @@ import (
"github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/discovery" "github.com/lightningnetwork/lnd/discovery"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/routing" "github.com/lightningnetwork/lnd/routing"
"github.com/roasbeef/btcd/connmgr" "github.com/roasbeef/btcd/connmgr"
@ -96,6 +97,7 @@ func useLogger(subsystemID string, logger btclog.Logger) {
case "HSWC": case "HSWC":
hswcLog = logger hswcLog = logger
htlcswitch.UseLogger(logger)
case "UTXN": case "UTXN":
utxnLog = logger utxnLog = logger

960
peer.go

@ -1,7 +1,6 @@
package main package main
import ( import (
"bytes"
"container/list" "container/list"
"crypto/sha256" "crypto/sha256"
"fmt" "fmt"
@ -11,11 +10,16 @@ import (
"time" "time"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/go-errors/errors"
"github.com/lightningnetwork/lightning-onion"
"github.com/lightningnetwork/lnd/brontide" "github.com/lightningnetwork/lnd/brontide"
"github.com/btcsuite/fastsha256"
"bytes"
"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
@ -24,7 +28,6 @@ import (
"github.com/roasbeef/btcd/connmgr" "github.com/roasbeef/btcd/connmgr"
"github.com/roasbeef/btcd/txscript" "github.com/roasbeef/btcd/txscript"
"github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil"
) )
var ( var (
@ -121,16 +124,13 @@ type peer struct {
activeChannels map[lnwire.ChannelID]*lnwallet.LightningChannel activeChannels map[lnwire.ChannelID]*lnwallet.LightningChannel
chanSnapshotReqs chan *chanSnapshotReq chanSnapshotReqs chan *chanSnapshotReq
htlcManMtx sync.RWMutex
htlcManagers map[lnwire.ChannelID]chan lnwire.Message
// newChannels is used by the fundingManager to send fully opened // newChannels is used by the fundingManager to send fully opened
// channels to the source peer which handled the funding workflow. // channels to the source peer which handled the funding workflow.
newChannels chan *newChannelMsg newChannels chan *newChannelMsg
// localCloseChanReqs is a channel in which any local requests to close // localCloseChanReqs is a channel in which any local requests to close
// a particular channel are sent over. // a particular channel are sent over.
localCloseChanReqs chan *closeLinkReq localCloseChanReqs chan *htlcswitch.ChanClose
// shutdownChanReqs is used to send the Shutdown messages that initiate // shutdownChanReqs is used to send the Shutdown messages that initiate
// the cooperative close workflow. // the cooperative close workflow.
@ -180,11 +180,10 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server,
outgoingQueue: make(chan outgoinMsg, outgoingQueueLen), outgoingQueue: make(chan outgoinMsg, outgoingQueueLen),
activeChannels: make(map[lnwire.ChannelID]*lnwallet.LightningChannel), activeChannels: make(map[lnwire.ChannelID]*lnwallet.LightningChannel),
htlcManagers: make(map[lnwire.ChannelID]chan lnwire.Message),
chanSnapshotReqs: make(chan *chanSnapshotReq), chanSnapshotReqs: make(chan *chanSnapshotReq),
newChannels: make(chan *newChannelMsg, 1), newChannels: make(chan *newChannelMsg, 1),
localCloseChanReqs: make(chan *closeLinkReq), localCloseChanReqs: make(chan *htlcswitch.ChanClose),
shutdownChanReqs: make(chan *lnwire.Shutdown), shutdownChanReqs: make(chan *lnwire.Shutdown),
closingSignedChanReqs: make(chan *lnwire.ClosingSigned), closingSignedChanReqs: make(chan *lnwire.ClosingSigned),
@ -310,17 +309,20 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
// Register this new channel link with the HTLC Switch. This is // Register this new channel link with the HTLC Switch. This is
// necessary to properly route multi-hop payments, and forward // necessary to properly route multi-hop payments, and forward
// new payments triggered by RPC clients. // new payments triggered by RPC clients.
downstreamLink := make(chan *htlcPacket, 10) sphinxDecoder := htlcswitch.NewSphinxDecoder(p.server.sphinx)
plexChan := p.server.htlcSwitch.RegisterLink(p, link := htlcswitch.NewChannelLink(
dbChan.Snapshot(), downstreamLink) &htlcswitch.ChannelLinkConfig{
Peer: p,
DecodeOnion: sphinxDecoder.Decode,
SettledContracts: p.server.breachArbiter.settledContracts,
DebugHTLC: cfg.DebugHTLC,
Registry: p.server.invoices,
Switch: p.server.htlcSwitch,
}, lnChan)
upstreamLink := make(chan lnwire.Message, 10) if err := p.server.htlcSwitch.AddLink(link); err != nil {
p.htlcManMtx.Lock() return err
p.htlcManagers[chanID] = upstreamLink }
p.htlcManMtx.Unlock()
p.wg.Add(1)
go p.htlcManager(lnChan, plexChan, downstreamLink, upstreamLink)
} }
return nil return nil
@ -488,19 +490,15 @@ out:
if isChanUpdate { if isChanUpdate {
sendUpdate := func() { sendUpdate := func() {
// Dispatch the commitment update message to // Dispatch the commitment update message to the proper
// the proper active goroutine dedicated to // active goroutine dedicated to this channel.
// this channel. link, err := p.server.htlcSwitch.GetLink(targetChan)
p.htlcManMtx.RLock() if err != nil {
channel, ok := p.htlcManagers[targetChan]
p.htlcManMtx.RUnlock()
if !ok {
peerLog.Errorf("recv'd update for unknown "+ peerLog.Errorf("recv'd update for unknown "+
"channel %v from %v", targetChan, p) "channel %v from %v", targetChan, p)
return return
} }
link.HandleChannelUpdate(nextMsg)
channel <- nextMsg
} }
// Check the map of active channel streams, if this map // Check the map of active channel streams, if this map
@ -756,7 +754,7 @@ func (p *peer) channelManager() {
// a cooperative channel close. When an lnwire.Shutdown is received, // a cooperative channel close. When an lnwire.Shutdown is received,
// this allows the node to determine the next step to be taken in the // this allows the node to determine the next step to be taken in the
// workflow. // workflow.
chanShutdowns := make(map[lnwire.ChannelID]*closeLinkReq) chanShutdowns := make(map[lnwire.ChannelID]*htlcswitch.ChanClose)
// shutdownSigs is a map of signatures maintained by the responder in a // shutdownSigs is a map of signatures maintained by the responder in a
// cooperative channel close. This map enables us to respond to // cooperative channel close. This map enables us to respond to
@ -788,26 +786,22 @@ out:
peerLog.Infof("New channel active ChannelPoint(%v) "+ peerLog.Infof("New channel active ChannelPoint(%v) "+
"with peerId(%v)", chanPoint, p.id) "with peerId(%v)", chanPoint, p.id)
// Now that the channel is open, notify the Htlc decoder := htlcswitch.NewSphinxDecoder(p.server.sphinx)
// Switch of a new active link. link := htlcswitch.NewChannelLink(
// TODO(roasbeef): register needs to account for &htlcswitch.ChannelLinkConfig{
// in-flight htlc's on restart Peer: p,
chanSnapShot := newChanReq.channel.StateSnapshot() DecodeOnion: decoder.Decode,
downstreamLink := make(chan *htlcPacket, 10) SettledContracts: p.server.breachArbiter.settledContracts,
plexChan := p.server.htlcSwitch.RegisterLink(p, DebugHTLC: cfg.DebugHTLC,
chanSnapShot, downstreamLink) Registry: p.server.invoices,
Switch: p.server.htlcSwitch,
}, newChanReq.channel)
// With the channel registered to the HtlcSwitch spawn err := p.server.htlcSwitch.AddLink(link)
// a goroutine to handle commitment updates for this if err != nil {
// new channel. peerLog.Errorf("can't register new channel "+
upstreamLink := make(chan lnwire.Message, 10) "link(%v) with peerId(%v)", chanPoint, p.id)
p.htlcManMtx.Lock() }
p.htlcManagers[chanID] = upstreamLink
p.htlcManMtx.Unlock()
p.wg.Add(1)
go p.htlcManager(newChanReq.channel, plexChan,
downstreamLink, upstreamLink)
close(newChanReq.done) close(newChanReq.done)
@ -816,12 +810,12 @@ out:
case req := <-p.localCloseChanReqs: case req := <-p.localCloseChanReqs:
// So we'll first transition the channel to a state of // So we'll first transition the channel to a state of
// pending shutdown. // pending shutdown.
chanID := lnwire.NewChanIDFromOutPoint(req.chanPoint) chanID := lnwire.NewChanIDFromOutPoint(req.ChanPoint)
// We'll only track this shutdown request if this is a // We'll only track this shutdown request if this is a
// regular close request, and not in response to a // regular close request, and not in response to a
// channel breach. // channel breach.
if req.CloseType == CloseRegular { if req.CloseType == htlcswitch.CloseRegular {
chanShutdowns[chanID] = req chanShutdowns[chanID] = req
} }
@ -899,8 +893,8 @@ out:
// //
// TODO(roasbeef): if no more active channels with peer call Remove on connMgr // TODO(roasbeef): if no more active channels with peer call Remove on connMgr
// with peerID // with peerID
func (p *peer) handleLocalClose(req *closeLinkReq) { func (p *peer) handleLocalClose(req *htlcswitch.ChanClose) {
chanID := lnwire.NewChanIDFromOutPoint(req.chanPoint) chanID := lnwire.NewChanIDFromOutPoint(req.ChanPoint)
p.activeChanMtx.RLock() p.activeChanMtx.RLock()
channel, ok := p.activeChannels[chanID] channel, ok := p.activeChannels[chanID]
@ -909,7 +903,7 @@ func (p *peer) handleLocalClose(req *closeLinkReq) {
err := fmt.Errorf("unable to close channel, ChannelID(%v) is "+ err := fmt.Errorf("unable to close channel, ChannelID(%v) is "+
"unknown", chanID) "unknown", chanID)
peerLog.Errorf(err.Error()) peerLog.Errorf(err.Error())
req.err <- err req.Err <- err
return return
} }
@ -917,22 +911,22 @@ func (p *peer) handleLocalClose(req *closeLinkReq) {
// A type of CloseRegular indicates that the user has opted to close // A type of CloseRegular indicates that the user has opted to close
// out this channel on-chain, so we execute the cooperative channel // out this channel on-chain, so we execute the cooperative channel
// closure workflow. // closure workflow.
case CloseRegular: case htlcswitch.CloseRegular:
err := p.sendShutdown(channel) err := p.sendShutdown(channel)
if err != nil { if err != nil {
req.err <- err req.Err <- err
return return
} }
// A type of CloseBreach indicates that the counterparty has breached // A type of CloseBreach indicates that the counterparty has breached
// the channel therefore we need to clean up our local state. // the channel therefore we need to clean up our local state.
case CloseBreach: case htlcswitch.CloseBreach:
peerLog.Infof("ChannelPoint(%v) has been breached, wiping "+ peerLog.Infof("ChannelPoint(%v) has been breached, wiping "+
"channel", req.chanPoint) "channel", req.ChanPoint)
if err := wipeChannel(p, channel); err != nil { if err := p.WipeChannel(channel); err != nil {
peerLog.Infof("Unable to wipe channel after detected "+ peerLog.Infof("Unable to wipe channel after detected "+
"breach: %v", err) "breach: %v", err)
req.err <- err req.Err <- err
return return
} }
return return
@ -1003,8 +997,8 @@ func (p *peer) handleShutdownResponse(msg *lnwire.Shutdown) []byte {
// of an unresponsive remote party, the initiator can either choose to execute // of an unresponsive remote party, the initiator can either choose to execute
// a force closure, or backoff for a period of time, and retry the cooperative // a force closure, or backoff for a period of time, and retry the cooperative
// closure. // closure.
func (p *peer) handleInitClosingSigned(req *closeLinkReq, msg *lnwire.ClosingSigned) { func (p *peer) handleInitClosingSigned(req *htlcswitch.ChanClose, msg *lnwire.ClosingSigned) {
chanID := lnwire.NewChanIDFromOutPoint(req.chanPoint) chanID := lnwire.NewChanIDFromOutPoint(req.ChanPoint)
p.activeChanMtx.RLock() p.activeChanMtx.RLock()
channel, ok := p.activeChannels[chanID] channel, ok := p.activeChannels[chanID]
p.activeChanMtx.RUnlock() p.activeChanMtx.RUnlock()
@ -1012,7 +1006,7 @@ func (p *peer) handleInitClosingSigned(req *closeLinkReq, msg *lnwire.ClosingSig
err := fmt.Errorf("unable to close channel, ChannelID(%v) is "+ err := fmt.Errorf("unable to close channel, ChannelID(%v) is "+
"unknown", chanID) "unknown", chanID)
peerLog.Errorf(err.Error()) peerLog.Errorf(err.Error())
req.err <- err req.Err <- err
return return
} }
@ -1027,7 +1021,7 @@ func (p *peer) handleInitClosingSigned(req *closeLinkReq, msg *lnwire.ClosingSig
// so generate our signature. // so generate our signature.
initiatorSig, proposedFee, err := channel.CreateCloseProposal(feeRate) initiatorSig, proposedFee, err := channel.CreateCloseProposal(feeRate)
if err != nil { if err != nil {
req.err <- err req.Err <- err
return return
} }
initSig := append(initiatorSig, byte(txscript.SigHashAll)) initSig := append(initiatorSig, byte(txscript.SigHashAll))
@ -1039,7 +1033,7 @@ func (p *peer) handleInitClosingSigned(req *closeLinkReq, msg *lnwire.ClosingSig
closeTx, err := channel.CompleteCooperativeClose(initSig, respSig, closeTx, err := channel.CompleteCooperativeClose(initSig, respSig,
feeRate) feeRate)
if err != nil { if err != nil {
req.err <- err req.Err <- err
// TODO(roasbeef): send ErrorGeneric to other side // TODO(roasbeef): send ErrorGeneric to other side
return return
} }
@ -1048,7 +1042,7 @@ func (p *peer) handleInitClosingSigned(req *closeLinkReq, msg *lnwire.ClosingSig
// create a mirrored close signed message with our completed signature. // create a mirrored close signed message with our completed signature.
parsedSig, err := btcec.ParseSignature(initSig, btcec.S256()) parsedSig, err := btcec.ParseSignature(initSig, btcec.S256())
if err != nil { if err != nil {
req.err <- err req.Err <- err
return return
} }
closingSigned := lnwire.NewClosingSigned(chanID, proposedFee, parsedSig) closingSigned := lnwire.NewClosingSigned(chanID, proposedFee, parsedSig)
@ -1062,7 +1056,7 @@ func (p *peer) handleInitClosingSigned(req *closeLinkReq, msg *lnwire.ClosingSig
if err := p.server.lnwallet.PublishTransaction(closeTx); err != nil { if err := p.server.lnwallet.PublishTransaction(closeTx); err != nil {
peerLog.Errorf("channel close tx from "+ peerLog.Errorf("channel close tx from "+
"ChannelPoint(%v) rejected: %v", "ChannelPoint(%v) rejected: %v",
req.chanPoint, err) req.ChanPoint, err)
// TODO(roasbeef): send ErrorGeneric to other side // TODO(roasbeef): send ErrorGeneric to other side
return return
} }
@ -1070,9 +1064,9 @@ func (p *peer) handleInitClosingSigned(req *closeLinkReq, msg *lnwire.ClosingSig
// Once we've completed the cooperative channel closure, we'll wipe the // Once we've completed the cooperative channel closure, we'll wipe the
// channel so we reject any incoming forward or payment requests via // channel so we reject any incoming forward or payment requests via
// this channel. // this channel.
p.server.breachArbiter.settledContracts <- req.chanPoint p.server.breachArbiter.settledContracts <- req.ChanPoint
if err := wipeChannel(p, channel); err != nil { if err := p.WipeChannel(channel); err != nil {
req.err <- err req.Err <- err
return return
} }
@ -1081,7 +1075,7 @@ func (p *peer) handleInitClosingSigned(req *closeLinkReq, msg *lnwire.ClosingSig
closingTxid := closeTx.TxHash() closingTxid := closeTx.TxHash()
chanInfo := channel.StateSnapshot() chanInfo := channel.StateSnapshot()
closeSummary := &channeldb.ChannelCloseSummary{ closeSummary := &channeldb.ChannelCloseSummary{
ChanPoint: *req.chanPoint, ChanPoint: *req.ChanPoint,
ClosingTXID: closingTxid, ClosingTXID: closingTxid,
RemotePub: &chanInfo.RemoteIdentity, RemotePub: &chanInfo.RemoteIdentity,
Capacity: chanInfo.Capacity, Capacity: chanInfo.Capacity,
@ -1090,13 +1084,13 @@ func (p *peer) handleInitClosingSigned(req *closeLinkReq, msg *lnwire.ClosingSig
IsPending: true, IsPending: true,
} }
if err := channel.DeleteState(closeSummary); err != nil { if err := channel.DeleteState(closeSummary); err != nil {
req.err <- err req.Err <- err
return return
} }
// Update the caller with a new event detailing the current pending // Update the caller with a new event detailing the current pending
// state of this request. // state of this request.
req.updates <- &lnrpc.CloseStatusUpdate{ req.Updates <- &lnrpc.CloseStatusUpdate{
Update: &lnrpc.CloseStatusUpdate_ClosePending{ Update: &lnrpc.CloseStatusUpdate_ClosePending{
ClosePending: &lnrpc.PendingUpdate{ ClosePending: &lnrpc.PendingUpdate{
Txid: closingTxid[:], Txid: closingTxid[:],
@ -1106,7 +1100,7 @@ func (p *peer) handleInitClosingSigned(req *closeLinkReq, msg *lnwire.ClosingSig
_, bestHeight, err := p.server.bio.GetBestBlock() _, bestHeight, err := p.server.bio.GetBestBlock()
if err != nil { if err != nil {
req.err <- err req.Err <- err
return return
} }
@ -1114,21 +1108,21 @@ func (p *peer) handleInitClosingSigned(req *closeLinkReq, msg *lnwire.ClosingSig
// ChainNotifier once the closure transaction obtains a single // ChainNotifier once the closure transaction obtains a single
// confirmation. // confirmation.
notifier := p.server.chainNotifier notifier := p.server.chainNotifier
go waitForChanToClose(uint32(bestHeight), notifier, req.err, go waitForChanToClose(uint32(bestHeight), notifier, req.Err,
req.chanPoint, &closingTxid, func() { req.ChanPoint, &closingTxid, func() {
// First, we'll mark the database as being fully closed // First, we'll mark the database as being fully closed
// so we'll no longer watch for its ultimate closure // so we'll no longer watch for its ultimate closure
// upon startup. // upon startup.
err := p.server.chanDB.MarkChanFullyClosed(req.chanPoint) err := p.server.chanDB.MarkChanFullyClosed(req.ChanPoint)
if err != nil { if err != nil {
req.err <- err req.Err <- err
return return
} }
// Respond to the local subsystem which requested the // Respond to the local subsystem which requested the
// channel closure. // channel closure.
req.updates <- &lnrpc.CloseStatusUpdate{ req.Updates <- &lnrpc.CloseStatusUpdate{
Update: &lnrpc.CloseStatusUpdate_ChanClose{ Update: &lnrpc.CloseStatusUpdate_ChanClose{
ChanClose: &lnrpc.ChannelCloseUpdate{ ChanClose: &lnrpc.ChannelCloseUpdate{
ClosingTxid: closingTxid[:], ClosingTxid: closingTxid[:],
@ -1188,7 +1182,7 @@ func (p *peer) handleResponseClosingSigned(msg *lnwire.ClosingSigned,
// we'll wipe the channel from all our local indexes and also signal to // we'll wipe the channel from all our local indexes and also signal to
// the switch that this channel is now closed. // the switch that this channel is now closed.
peerLog.Infof("ChannelPoint(%v) is now closed", chanPoint) peerLog.Infof("ChannelPoint(%v) is now closed", chanPoint)
if err := wipeChannel(p, channel); err != nil { if err := p.WipeChannel(channel); err != nil {
peerLog.Errorf("unable to wipe channel: %v", err) peerLog.Errorf("unable to wipe channel: %v", err)
} }
@ -1288,14 +1282,14 @@ func (p *peer) sendShutdown(channel *lnwallet.LightningChannel) error {
// Finally, we'll unregister the link from the switch in order to // Finally, we'll unregister the link from the switch in order to
// Prevent the HTLC switch from receiving additional HTLCs for this // Prevent the HTLC switch from receiving additional HTLCs for this
// channel. // channel.
p.server.htlcSwitch.UnregisterLink(p.addr.IdentityKey, &chanID) p.server.htlcSwitch.RemoveLink(chanID)
return nil return nil
} }
// wipeChannel removes the passed channel from all indexes associated with the // WipeChannel removes the passed channel from all indexes associated with the
// peer, and deletes the channel from the database. // peer, and deletes the channel from the database.
func wipeChannel(p *peer, channel *lnwallet.LightningChannel) error { func (p *peer) WipeChannel(channel *lnwallet.LightningChannel) error {
channel.Stop() channel.Stop()
chanID := lnwire.NewChanIDFromOutPoint(channel.ChannelPoint()) chanID := lnwire.NewChanIDFromOutPoint(channel.ChannelPoint())
@ -1306,247 +1300,19 @@ func wipeChannel(p *peer, channel *lnwallet.LightningChannel) error {
// Instruct the Htlc Switch to close this link as the channel is no // Instruct the Htlc Switch to close this link as the channel is no
// longer active. // longer active.
p.server.htlcSwitch.UnregisterLink(p.addr.IdentityKey, &chanID)
// Additionally, close up "down stream" link for the htlcManager which if err := p.server.htlcSwitch.RemoveLink(chanID); err != nil {
// has been assigned to this channel. This servers the link between the if err == htlcswitch.ErrChannelLinkNotFound {
// htlcManager and the switch, signalling that the channel is no longer peerLog.Warnf("unable remove channel link with "+
// active. "ChannelPoint(%v): %v", chanID, err)
p.htlcManMtx.RLock() return nil
}
// If the channel can't be found in the map, then this channel has return err
// already been wiped.
htlcWireLink, ok := p.htlcManagers[chanID]
if !ok {
p.htlcManMtx.RUnlock()
return nil
} }
close(htlcWireLink)
p.htlcManMtx.RUnlock()
// Next, we remove the htlcManager from our internal map as the
// goroutine should have exited gracefully due to the channel closure
// above.
p.htlcManMtx.RLock()
delete(p.htlcManagers, chanID)
p.htlcManMtx.RUnlock()
return nil return nil
} }
// pendingPayment represents a pending HTLC which has yet to be settled by the
// upstream peer. A pending payment encapsulates the initial HTLC add request
// additionally coupling the index of the HTLC within the log, and an error
// channel to signal the payment requester once the payment has been fully
// fufilled.
type pendingPayment struct {
htlc *lnwire.UpdateAddHTLC
index uint64
preImage chan [32]byte
err chan error
done chan struct{}
}
// commitmentState is the volatile+persistent state of an active channel's
// commitment update state-machine. This struct is used by htlcManager's to
// save meta-state required for proper functioning.
type commitmentState struct {
// htlcsToSettle is a list of preimages which allow us to settle one or
// many of the pending HTLCs we've received from the upstream peer.
htlcsToSettle map[uint64]*channeldb.Invoice
// htlcsToCancel is a set of HTLCs identified by their log index which
// are to be cancelled upon the next state transition.
htlcsToCancel map[uint64]lnwire.FailCode
// cancelReasons stores the reason why a particular HTLC was cancelled.
// The index of the HTLC within the log is mapped to the cancellation
// reason. This value is used to thread the proper error through to the
// htlcSwitch, or subsystem that initiated the HTLC.
cancelReasons map[uint64]lnwire.FailCode
// pendingBatch is slice of payments which have been added to the
// channel update log, but not yet committed to latest commitment.
pendingBatch []*pendingPayment
// pendingSettle is counter which tracks the current number of settles
// that have been sent, but not yet committed to the commitment.
pendingSettle uint32
// clearedHTCLs is a map of outgoing HTLCs we've committed to in our
// chain which have not yet been settled by the upstream peer.
clearedHTCLs map[uint64]*pendingPayment
// switchChan is a channel used to send packets to the htlc switch for
// forwarding.
switchChan chan<- *htlcPacket
// sphinx is an instance of the Sphinx onion Router for this node. The
// router will be used to process all incoming Sphinx packets embedded
// within HTLC add messages.
sphinx *sphinx.Router
// pendingCircuits tracks the remote log index of the incoming HTLCs,
// mapped to the processed Sphinx packet contained within the HTLC.
// This map is used as a staging area between when an HTLC is added to
// the log, and when it's locked into the commitment state of both
// chains. Once locked in, the processed packet is sent to the switch
// along with the HTLC to forward the packet to the next hop.
pendingCircuits map[uint64]*sphinx.ProcessedPacket
// logCommitTimer is a timer which is sent upon if we go an interval
// without receiving/sending a commitment update. It's role is to
// ensure both chains converge to identical state in a timely manner.
// TODO(roasbeef): timer should be >> then RTT
logCommitTimer *time.Timer
logCommitTick <-chan time.Time
channel *lnwallet.LightningChannel
chanPoint *wire.OutPoint
chanID lnwire.ChannelID
sync.RWMutex
}
// htlcManager is the primary goroutine which drives a channel's commitment
// update state-machine in response to messages received via several channels.
// The htlcManager reads messages from the upstream (remote) peer, and also
// from several possible downstream channels managed by the htlcSwitch. In the
// event that an htlc needs to be forwarded, then send-only htlcPlex chan is
// used which sends htlc packets to the switch for forwarding. Additionally,
// the htlcManager handles acting upon all timeouts for any active HTLCs,
// manages the channel's revocation window, and also the htlc trickle
// queue+timer for this active channels.
func (p *peer) htlcManager(channel *lnwallet.LightningChannel,
htlcPlex chan<- *htlcPacket, downstreamLink <-chan *htlcPacket,
upstreamLink <-chan lnwire.Message) {
chanStats := channel.StateSnapshot()
peerLog.Infof("HTLC manager for ChannelPoint(%v) started, "+
"our_balance=%v, their_balance=%v, chain_height=%v",
channel.ChannelPoint(), chanStats.LocalBalance,
chanStats.RemoteBalance, chanStats.NumUpdates)
// A new session for this active channel has just started, therefore we
// need to send our initial revocation window to the remote peer.
for i := 0; i < lnwallet.InitialRevocationWindow; i++ {
rev, err := channel.ExtendRevocationWindow()
if err != nil {
peerLog.Errorf("unable to expand revocation window: %v", err)
continue
}
p.queueMsg(rev, nil)
}
chanPoint := channel.ChannelPoint()
state := &commitmentState{
channel: channel,
chanPoint: chanPoint,
chanID: lnwire.NewChanIDFromOutPoint(chanPoint),
clearedHTCLs: make(map[uint64]*pendingPayment),
htlcsToSettle: make(map[uint64]*channeldb.Invoice),
htlcsToCancel: make(map[uint64]lnwire.FailCode),
cancelReasons: make(map[uint64]lnwire.FailCode),
pendingCircuits: make(map[uint64]*sphinx.ProcessedPacket),
sphinx: p.server.sphinx,
logCommitTimer: time.NewTimer(300 * time.Millisecond),
switchChan: htlcPlex,
}
// TODO(roasbeef): check to see if able to settle any currently pending
// HTLCs
// * also need signals when new invoices are added by the
// invoiceRegistry
batchTimer := time.NewTicker(50 * time.Millisecond)
defer batchTimer.Stop()
out:
for {
select {
case <-channel.UnilateralCloseSignal:
// TODO(roasbeef): need to send HTLC outputs to nursery
peerLog.Warnf("Remote peer has closed ChannelPoint(%v) on-chain",
state.chanPoint)
if err := wipeChannel(p, channel); err != nil {
peerLog.Errorf("unable to wipe channel %v", err)
}
p.server.breachArbiter.settledContracts <- state.chanPoint
break out
case <-channel.ForceCloseSignal:
// TODO(roasbeef): path never taken now that server
// force closes's directly?
peerLog.Warnf("ChannelPoint(%v) has been force "+
"closed, disconnecting from peerID(%x)",
state.chanPoint, p.id)
break out
case <-state.logCommitTick:
// If we haven't sent or received a new commitment
// update in some time, check to see if we have any
// pending updates we need to commit due to our
// commitment chains being desynchronized.
if state.channel.FullySynced() &&
len(state.htlcsToSettle) == 0 {
continue
}
if err := p.updateCommitTx(state); err != nil {
peerLog.Errorf("unable to update commitment: %v",
err)
p.Disconnect()
break out
}
case <-batchTimer.C:
// If the either batch is empty, then we have no work
// here.
//
// TODO(roasbeef): should be combined, will be fixed by
// andrew's PR
if len(state.pendingBatch) == 0 && state.pendingSettle == 0 {
continue
}
// Otherwise, attempt to extend the remote commitment
// chain including all the currently pending entries.
// If the send was unsuccessful, then abandon the
// update, waiting for the revocation window to open
// up.
if err := p.updateCommitTx(state); err != nil {
peerLog.Errorf("unable to update "+
"commitment: %v", err)
p.Disconnect()
break out
}
case pkt := <-downstreamLink:
p.handleDownStreamPkt(state, pkt)
case msg, ok := <-upstreamLink:
// If the upstream message link is closed, this signals
// that the channel itself is being closed, therefore
// we exit.
if !ok {
break out
}
p.handleUpstreamMsg(state, msg)
case <-p.quit:
break out
}
}
p.wg.Done()
peerLog.Tracef("htlcManager for peer %v done", p)
}
// handleInitMsg handles the incoming init message which contains global and // handleInitMsg handles the incoming init message which contains global and
// local features vectors. If feature vectors are incompatible then disconnect. // local features vectors. If feature vectors are incompatible then disconnect.
func (p *peer) handleInitMsg(msg *lnwire.Init) error { func (p *peer) handleInitMsg(msg *lnwire.Init) error {
@ -1582,557 +1348,21 @@ func (p *peer) sendInitMsg() error {
return p.writeMessage(msg) return p.writeMessage(msg)
} }
// handleDownStreamPkt processes an HTLC packet sent from the downstream HTLC // SendMessage sends message to the remote peer which represented by
// Switch. Possible messages sent by the switch include requests to forward new // this peer.
// HTLCs, timeout previously cleared HTLCs, and finally to settle currently func (p *peer) SendMessage(msg lnwire.Message) error {
// cleared HTLCs with the upstream peer. p.queueMsg(msg, nil)
func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) {
var isSettle bool
switch htlc := pkt.msg.(type) {
case *lnwire.UpdateAddHTLC:
// A new payment has been initiated via the
// downstream channel, so we add the new HTLC
// to our local log, then update the commitment
// chains.
htlc.ChanID = state.chanID
index, err := state.channel.AddHTLC(htlc)
if err != nil {
// TODO: possibly perform fallback/retry logic
// depending on type of error
peerLog.Errorf("Adding HTLC rejected: %v", err)
pkt.err <- err
close(pkt.done)
// The HTLC was unable to be added to the state
// machine, as a result, we'll signal the switch to
// cancel the pending payment.
// TODO(roasbeef): need to update link as well if local
// HTLC?
state.switchChan <- &htlcPacket{
amt: htlc.Amount,
msg: &lnwire.UpdateFailHTLC{
Reason: []byte{byte(0)},
},
srcLink: state.chanID,
}
return
}
p.queueMsg(htlc, nil)
state.pendingBatch = append(state.pendingBatch, &pendingPayment{
htlc: htlc,
index: index,
preImage: pkt.preImage,
err: pkt.err,
done: pkt.done,
})
case *lnwire.UpdateFufillHTLC:
// An HTLC we forward to the switch has just settled somewhere
// upstream. Therefore we settle the HTLC within the our local
// state machine.
pre := htlc.PaymentPreimage
logIndex, err := state.channel.SettleHTLC(pre)
if err != nil {
// TODO(roasbeef): broadcast on-chain
peerLog.Errorf("settle for incoming HTLC rejected: %v", err)
p.Disconnect()
return
}
// With the HTLC settled, we'll need to populate the wire
// message to target the specific channel and HTLC to be
// cancelled.
htlc.ChanID = state.chanID
htlc.ID = logIndex
// Then we send the HTLC settle message to the connected peer
// so we can continue the propagation of the settle message.
p.queueMsg(htlc, nil)
isSettle = true
state.pendingSettle++
case *lnwire.UpdateFailHTLC:
// An HTLC cancellation has been triggered somewhere upstream,
// we'll remove then HTLC from our local state machine.
logIndex, err := state.channel.FailHTLC(pkt.payHash)
if err != nil {
peerLog.Errorf("unable to cancel HTLC: %v", err)
return
}
// With the HTLC removed, we'll need to populate the wire
// message to target the specific channel and HTLC to be
// cancelled. The "Reason" field will have already been set
// within the switch.
htlc.ChanID = state.chanID
htlc.ID = logIndex
// Finally, we send the HTLC message to the peer which
// initially created the HTLC.
p.queueMsg(htlc, nil)
isSettle = true
state.pendingSettle++
}
// If this newly added update exceeds the min batch size for adds, or
// this is a settle request, then initiate an update.
// TODO(roasbeef): enforce max HTLCs in flight limit
if len(state.pendingBatch) >= 10 || isSettle {
if err := p.updateCommitTx(state); err != nil {
peerLog.Errorf("unable to update "+
"commitment: %v", err)
p.Disconnect()
return
}
}
}
// handleUpstreamMsg processes wire messages related to commitment state
// updates from the upstream peer. The upstream peer is the peer whom we have a
// direct channel with, updating our respective commitment chains.
func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
switch htlcPkt := msg.(type) {
// TODO(roasbeef): timeouts
// * fail if can't parse sphinx mix-header
case *lnwire.UpdateAddHTLC:
// Before adding the new HTLC to the state machine, parse the
// onion object in order to obtain the routing information.
blobReader := bytes.NewReader(htlcPkt.OnionBlob[:])
onionPkt := &sphinx.OnionPacket{}
if err := onionPkt.Decode(blobReader); err != nil {
peerLog.Errorf("unable to decode onion pkt: %v", err)
p.Disconnect()
return
}
// We just received an add request from an upstream peer, so we
// add it to our state machine, then add the HTLC to our
// "settle" list in the event that we know the preimage
index, err := state.channel.ReceiveHTLC(htlcPkt)
if err != nil {
peerLog.Errorf("Receiving HTLC rejected: %v", err)
p.Disconnect()
return
}
// TODO(roasbeef): perform sanity checks on per-hop payload
// * time-lock is sane, fee, chain, etc
// Attempt to process the Sphinx packet. We include the payment
// hash of the HTLC as it's authenticated within the Sphinx
// packet itself as associated data in order to thwart attempts
// a replay attacks. In the case of a replay, an attacker is
// *forced* to use the same payment hash twice, thereby losing
// their money entirely.
rHash := htlcPkt.PaymentHash[:]
sphinxPacket, err := state.sphinx.ProcessOnionPacket(onionPkt, rHash)
if err != nil {
// If we're unable to parse the Sphinx packet, then
// we'll cancel the HTLC after the current commitment
// transition.
peerLog.Errorf("unable to process onion pkt: %v", err)
state.htlcsToCancel[index] = lnwire.SphinxParseError
return
}
switch sphinxPacket.Action {
// We're the designated payment destination. Therefore we
// attempt to see if we have an invoice locally which'll allow
// us to settle this HTLC.
case sphinx.ExitNode:
rHash := htlcPkt.PaymentHash
invoice, err := p.server.invoices.LookupInvoice(rHash)
if err != nil {
// If we're the exit node, but don't recognize
// the payment hash, then we'll fail the HTLC
// on the next state transition.
peerLog.Errorf("unable to settle HTLC, "+
"payment hash (%x) unrecognized", rHash[:])
state.htlcsToCancel[index] = lnwire.UnknownPaymentHash
return
}
// If we're not currently in debug mode, and the
// extended HTLC doesn't meet the value requested, then
// we'll fail the HTLC.
if !cfg.DebugHTLC && htlcPkt.Amount < invoice.Terms.Value {
peerLog.Errorf("rejecting HTLC due to incorrect "+
"amount: expected %v, received %v",
invoice.Terms.Value, htlcPkt.Amount)
state.htlcsToCancel[index] = lnwire.IncorrectValue
} else {
// Otherwise, everything is in order and we'll
// settle the HTLC after the current state
// transition.
state.htlcsToSettle[index] = invoice
}
// There are additional hops left within this route, so we
// track the next hop according to the index of this HTLC
// within their log. When forwarding locked-in HLTC's to the
// switch, we'll attach the routing information so the switch
// can finalize the circuit.
case sphinx.MoreHops:
state.Lock()
state.pendingCircuits[index] = sphinxPacket
state.Unlock()
default:
peerLog.Errorf("mal formed onion packet")
state.htlcsToCancel[index] = lnwire.SphinxParseError
}
case *lnwire.UpdateFufillHTLC:
pre := htlcPkt.PaymentPreimage
idx := htlcPkt.ID
if err := state.channel.ReceiveHTLCSettle(pre, idx); err != nil {
// TODO(roasbeef): broadcast on-chain
peerLog.Errorf("settle for outgoing HTLC rejected: %v", err)
p.Disconnect()
return
}
// TODO(roasbeef): add preimage to DB in order to swipe
// repeated r-values
case *lnwire.UpdateFailHTLC:
idx := htlcPkt.ID
if err := state.channel.ReceiveFailHTLC(idx); err != nil {
peerLog.Errorf("unable to recv HTLC cancel: %v", err)
p.Disconnect()
return
}
state.Lock()
state.cancelReasons[idx] = lnwire.FailCode(htlcPkt.Reason[0])
state.Unlock()
case *lnwire.CommitSig:
// We just received a new update to our local commitment chain,
// validate this new commitment, closing the link if invalid.
// TODO(roasbeef): redundant re-serialization
sig := htlcPkt.CommitSig.Serialize()
if err := state.channel.ReceiveNewCommitment(sig); err != nil {
peerLog.Errorf("unable to accept new commitment: %v", err)
p.Disconnect()
return
}
// As we've just just accepted a new state, we'll now
// immediately send the remote peer a revocation for our prior
// state.
nextRevocation, err := state.channel.RevokeCurrentCommitment()
if err != nil {
peerLog.Errorf("unable to revoke commitment: %v", err)
return
}
p.queueMsg(nextRevocation, nil)
if !state.logCommitTimer.Stop() {
select {
case <-state.logCommitTimer.C:
default:
}
}
state.logCommitTimer.Reset(300 * time.Millisecond)
state.logCommitTick = state.logCommitTimer.C
// If both commitment chains are fully synced from our PoV,
// then we don't need to reply with a signature as both sides
// already have a commitment with the latest accepted state.
if state.channel.FullySynced() {
return
}
// Otherwise, the remote party initiated the state transition,
// so we'll reply with a signature to provide them with their
// version of the latest commitment state.
if err := p.updateCommitTx(state); err != nil {
peerLog.Errorf("unable to update commitment: %v", err)
p.Disconnect()
return
}
case *lnwire.RevokeAndAck:
// We've received a revocation from the remote chain, if valid,
// this moves the remote chain forward, and expands our
// revocation window.
htlcsToForward, err := state.channel.ReceiveRevocation(htlcPkt)
if err != nil {
peerLog.Errorf("unable to accept revocation: %v", err)
p.Disconnect()
return
}
// If any of the HTLCs eligible for forwarding are pending
// settling or timing out previous outgoing payments, then we
// can them from the pending set, and signal the requester (if
// existing) that the payment has been fully fulfilled.
var bandwidthUpdate btcutil.Amount
settledPayments := make(map[lnwallet.PaymentHash]struct{})
cancelledHtlcs := make(map[uint64]struct{})
for _, htlc := range htlcsToForward {
parentIndex := htlc.ParentIndex
if p, ok := state.clearedHTCLs[parentIndex]; ok {
switch htlc.EntryType {
// If the HTLC was settled successfully, then
// we return a nil error as well as the payment
// preimage back to the possible caller.
case lnwallet.Settle:
p.preImage <- htlc.RPreimage
p.err <- nil
// Otherwise, the HTLC failed, so we propagate
// the error back to the potential caller.
case lnwallet.Fail:
state.Lock()
errMsg := state.cancelReasons[parentIndex]
state.Unlock()
p.preImage <- [32]byte{}
p.err <- errors.New(errMsg.String())
}
close(p.done)
delete(state.clearedHTCLs, htlc.ParentIndex)
}
// TODO(roasbeef): rework log entries to a shared
// interface.
if htlc.EntryType != lnwallet.Add {
continue
}
// If we can settle this HTLC within our local state
// update log, then send the update entry to the remote
// party.
invoice, ok := state.htlcsToSettle[htlc.Index]
if ok {
preimage := invoice.Terms.PaymentPreimage
logIndex, err := state.channel.SettleHTLC(preimage)
if err != nil {
peerLog.Errorf("unable to settle htlc: %v", err)
p.Disconnect()
continue
}
settleMsg := &lnwire.UpdateFufillHTLC{
ChanID: state.chanID,
ID: logIndex,
PaymentPreimage: preimage,
}
p.queueMsg(settleMsg, nil)
delete(state.htlcsToSettle, htlc.Index)
settledPayments[htlc.RHash] = struct{}{}
bandwidthUpdate += htlc.Amount
continue
}
// Alternatively, if we marked this HTLC for
// cancellation, then immediately cancel the HTLC as
// it's now locked in within both commitment
// transactions.
reason, ok := state.htlcsToCancel[htlc.Index]
if !ok {
continue
}
logIndex, err := state.channel.FailHTLC(htlc.RHash)
if err != nil {
peerLog.Errorf("unable to cancel htlc: %v", err)
p.Disconnect()
continue
}
cancelMsg := &lnwire.UpdateFailHTLC{
ChanID: state.chanID,
ID: logIndex,
Reason: []byte{byte(reason)},
}
p.queueMsg(cancelMsg, nil)
delete(state.htlcsToCancel, htlc.Index)
cancelledHtlcs[htlc.Index] = struct{}{}
}
go func() {
for _, htlc := range htlcsToForward {
// We don't need to forward any HTLCs that we
// just settled or cancelled above.
// TODO(roasbeef): key by index instead?
state.RLock()
if _, ok := settledPayments[htlc.RHash]; ok {
state.RUnlock()
continue
}
if _, ok := cancelledHtlcs[htlc.Index]; ok {
state.RUnlock()
continue
}
state.RUnlock()
state.Lock()
onionPkt := state.pendingCircuits[htlc.Index]
delete(state.pendingCircuits, htlc.Index)
reason := state.cancelReasons[htlc.ParentIndex]
delete(state.cancelReasons, htlc.ParentIndex)
state.Unlock()
// Send this fully activated HTLC to the htlc
// switch to continue the chained clear/settle.
pkt, err := logEntryToHtlcPkt(state.chanID,
htlc, onionPkt, reason)
if err != nil {
peerLog.Errorf("unable to make htlc pkt: %v",
err)
continue
}
state.switchChan <- pkt
}
}()
if len(settledPayments) == 0 && len(cancelledHtlcs) == 0 {
return
}
// Send an update to the htlc switch of our newly available
// payment bandwidth.
// TODO(roasbeef): ideally should wait for next state update.
if bandwidthUpdate != 0 {
p.server.htlcSwitch.UpdateLink(state.chanID,
bandwidthUpdate)
}
// With all the settle updates added to the local and remote
// HTLC logs, initiate a state transition by updating the
// remote commitment chain.
if err := p.updateCommitTx(state); err != nil {
peerLog.Errorf("unable to update commitment: %v", err)
p.Disconnect()
return
}
// Notify the invoiceRegistry of the invoices we just settled
// with this latest commitment update.
// TODO(roasbeef): wait until next transition?
for invoice := range settledPayments {
err := p.server.invoices.SettleInvoice(chainhash.Hash(invoice))
if err != nil {
peerLog.Errorf("unable to settle invoice: %v", err)
}
}
}
}
// updateCommitTx signs, then sends an update to the remote peer adding a new
// commitment to their commitment chain which includes all the latest updates
// we've received+processed up to this point.
func (p *peer) updateCommitTx(state *commitmentState) error {
sigTheirs, err := state.channel.SignNextCommitment()
if err == lnwallet.ErrNoWindow {
peerLog.Tracef("ChannelPoint(%v): revocation window exhausted, unable to send %v",
state.chanPoint, len(state.pendingBatch))
return nil
} else if err != nil {
return err
}
parsedSig, err := btcec.ParseSignature(sigTheirs, btcec.S256())
if err != nil {
return fmt.Errorf("unable to parse sig: %v", err)
}
commitSig := &lnwire.CommitSig{
ChanID: state.chanID,
CommitSig: parsedSig,
}
p.queueMsg(commitSig, nil)
// As we've just cleared out a batch, move all pending updates to the
// map of cleared HTLCs, clearing out the set of pending updates.
for _, update := range state.pendingBatch {
state.clearedHTCLs[update.index] = update
}
// We've just initiated a state transition, attempt to stop the
// logCommitTimer. If the timer already ticked, then we'll consume the
// value, dropping
if state.logCommitTimer != nil && !state.logCommitTimer.Stop() {
select {
case <-state.logCommitTimer.C:
default:
}
}
state.logCommitTick = nil
// Finally, clear our the current batch, and flip the pendingUpdate
// bool to indicate were waiting for a commitment signature.
// TODO(roasbeef): re-slice instead to avoid GC?
state.pendingBatch = nil
state.pendingSettle = 0
return nil return nil
} }
// logEntryToHtlcPkt converts a particular Lightning Commitment Protocol (LCP) // ID returns the lightning network peer id.
// log entry the corresponding htlcPacket with src/dest set along with the func (p *peer) ID() [sha256.Size]byte {
// proper wire message. This helper method is provided in order to aid an return fastsha256.Sum256(p.PubKey())
// htlcManager in forwarding packets to the htlcSwitch. }
func logEntryToHtlcPkt(chanID lnwire.ChannelID, pd *lnwallet.PaymentDescriptor,
onionPkt *sphinx.ProcessedPacket,
reason lnwire.FailCode) (*htlcPacket, error) {
pkt := &htlcPacket{} // PubKey returns the peer public key.
func (p *peer) PubKey() []byte {
// TODO(roasbeef): alter after switch to log entry interface return p.addr.IdentityKey.SerializeCompressed()
var msg lnwire.Message
switch pd.EntryType {
case lnwallet.Add:
// TODO(roasbeef): timeout, onion blob, etc
var b bytes.Buffer
if err := onionPkt.Packet.Encode(&b); err != nil {
return nil, err
}
htlc := &lnwire.UpdateAddHTLC{
Amount: pd.Amount,
PaymentHash: pd.RHash,
}
copy(htlc.OnionBlob[:], b.Bytes())
msg = htlc
case lnwallet.Settle:
msg = &lnwire.UpdateFufillHTLC{
PaymentPreimage: pd.RPreimage,
}
case lnwallet.Fail:
// For cancellation messages, we'll also need to set the rHash
// within the htlcPacket so the switch knows on which outbound
// link to forward the cancellation message
msg = &lnwire.UpdateFailHTLC{
Reason: []byte{byte(reason)},
}
pkt.payHash = pd.RHash
}
pkt.amt = pd.Amount
pkt.msg = msg
pkt.srcLink = chanID
pkt.onion = onionPkt
return pkt, nil
} }
// TODO(roasbeef): make all start/stop mutexes a CAS // TODO(roasbeef): make all start/stop mutexes a CAS

@ -19,6 +19,7 @@ import (
"github.com/boltdb/bolt" "github.com/boltdb/bolt"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
@ -527,7 +528,7 @@ func (r *rpcServer) OpenChannelSync(ctx context.Context,
} }
} }
// CloseChannel attempts to close an active channel identified by its channel // CloseLink attempts to close an active channel identified by its channel
// point. The actions of this method can additionally be augmented to attempt // point. The actions of this method can additionally be augmented to attempt
// a force close after a timeout period in the case of an inactive peer. // a force close after a timeout period in the case of an inactive peer.
func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest, func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest,
@ -577,10 +578,10 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest,
// TODO(roasbeef): actually get the active channel // TODO(roasbeef): actually get the active channel
// instead too? // instead too?
// * so only need to grab from database // * so only need to grab from database
wipeChannel(peer, channel) peer.WipeChannel(channel)
} else { } else {
chanID := lnwire.NewChanIDFromOutPoint(channel.ChannelPoint()) chanID := lnwire.NewChanIDFromOutPoint(channel.ChannelPoint())
r.server.htlcSwitch.UnregisterLink(remotePub, &chanID) r.server.htlcSwitch.RemoveLink(chanID)
} }
r.server.breachArbiter.settledContracts <- chanPoint r.server.breachArbiter.settledContracts <- chanPoint
@ -628,7 +629,7 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest,
// the htlc switch which will handle the negotiation and // the htlc switch which will handle the negotiation and
// broadcast details. // broadcast details.
updateChan, errChan = r.server.htlcSwitch.CloseLink(chanPoint, updateChan, errChan = r.server.htlcSwitch.CloseLink(chanPoint,
CloseRegular) htlcswitch.CloseRegular)
} }
out: out:
for { for {

@ -5,7 +5,6 @@ import (
"crypto/rand" "crypto/rand"
"crypto/sha256" "crypto/sha256"
"encoding/hex" "encoding/hex"
"errors"
"fmt" "fmt"
"net" "net"
"strconv" "strconv"
@ -25,9 +24,11 @@ import (
"github.com/lightningnetwork/lnd/routing" "github.com/lightningnetwork/lnd/routing"
"github.com/lightningnetwork/lnd/routing/chainview" "github.com/lightningnetwork/lnd/routing/chainview"
"github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/chaincfg/chainhash"
"github.com/roasbeef/btcd/connmgr" "github.com/roasbeef/btcd/connmgr"
"github.com/roasbeef/btcutil" "github.com/roasbeef/btcutil"
"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/htlcswitch"
) )
// server is the main server of the Lightning Network Daemon. The server houses // server is the main server of the Lightning Network Daemon. The server houses
@ -69,7 +70,7 @@ type server struct {
fundingMgr *fundingManager fundingMgr *fundingManager
chanDB *channeldb.DB chanDB *channeldb.DB
htlcSwitch *htlcSwitch htlcSwitch *htlcswitch.Switch
invoices *invoiceRegistry invoices *invoiceRegistry
breachArbiter *breachArbiter breachArbiter *breachArbiter
@ -136,7 +137,6 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier,
invoices: newInvoiceRegistry(chanDB), invoices: newInvoiceRegistry(chanDB),
utxoNursery: newUtxoNursery(chanDB, notifier, wallet), utxoNursery: newUtxoNursery(chanDB, notifier, wallet),
htlcSwitch: newHtlcSwitch(),
identityPriv: privKey, identityPriv: privKey,
nodeSigner: newNodeSigner(privKey), nodeSigner: newNodeSigner(privKey),
@ -177,6 +177,23 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier,
debugPre[:], debugHash[:]) debugPre[:], debugHash[:])
} }
s.htlcSwitch = htlcswitch.New(htlcswitch.Config{
LocalChannelClose: func(pubKey []byte,
request *htlcswitch.ChanClose) {
s.peersMtx.RLock()
peer, ok := s.peersByPub[string(pubKey)]
s.peersMtx.RUnlock()
if !ok {
srvrLog.Error("unable to close channel, peer"+
" with %v id can't be found", pubKey)
return
}
peer.localCloseChanReqs <- request
},
})
// If external IP addresses have been specified, add those to the list // If external IP addresses have been specified, add those to the list
// of this server's addresses. // of this server's addresses.
selfAddrs := make([]net.Addr, 0, len(cfg.ExternalIPs)) selfAddrs := make([]net.Addr, 0, len(cfg.ExternalIPs))
@ -237,14 +254,8 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier,
ChainView: chainView, ChainView: chainView,
SendToSwitch: func(firstHop *btcec.PublicKey, SendToSwitch: func(firstHop *btcec.PublicKey,
htlcAdd *lnwire.UpdateAddHTLC) ([32]byte, error) { htlcAdd *lnwire.UpdateAddHTLC) ([32]byte, error) {
firstHopPub := firstHop.SerializeCompressed() firstHopPub := firstHop.SerializeCompressed()
destInterface := chainhash.Hash(sha256.Sum256(firstHopPub)) return s.htlcSwitch.SendHTLC(firstHopPub, htlcAdd)
return s.htlcSwitch.SendHTLC(&htlcPacket{
dest: destInterface,
msg: htlcAdd,
})
}, },
}) })
if err != nil { if err != nil {
@ -657,10 +668,22 @@ func (s *server) peerTerminationWatcher(p *peer) {
srvrLog.Debugf("Peer %v has been disconnected", p) srvrLog.Debugf("Peer %v has been disconnected", p)
// Tell the switch to unregister all links associated with this peer. // Tell the switch to remove all links associated with this peer.
// Passing nil as the target link indicates that all links associated // Passing nil as the target link indicates that all links associated
// with this interface should be closed. // with this interface should be closed.
p.server.htlcSwitch.UnregisterLink(p.addr.IdentityKey, nil) hop := htlcswitch.NewHopID(p.addr.IdentityKey.SerializeCompressed())
links, err := p.server.htlcSwitch.GetLinks(hop)
if err != nil {
srvrLog.Errorf("unable to get channel links: %v", err)
}
for _, link := range links {
err := p.server.htlcSwitch.RemoveLink(link.ChanID())
if err != nil {
srvrLog.Errorf("unable to remove channel link: %v",
err)
}
}
// Send the peer to be garbage collected by the server. // Send the peer to be garbage collected by the server.
p.server.donePeers <- p p.server.donePeers <- p