Merge pull request #2411 from cfromknecht/chan-status-manager

netann: channel status manager
This commit is contained in:
Olaoluwa Osuntokun 2019-02-18 17:16:30 -08:00 committed by GitHub
commit c8b5e1f55e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 2122 additions and 359 deletions

@ -31,30 +31,32 @@ import (
)
const (
defaultConfigFilename = "lnd.conf"
defaultDataDirname = "data"
defaultChainSubDirname = "chain"
defaultGraphSubDirname = "graph"
defaultTLSCertFilename = "tls.cert"
defaultTLSKeyFilename = "tls.key"
defaultAdminMacFilename = "admin.macaroon"
defaultReadMacFilename = "readonly.macaroon"
defaultInvoiceMacFilename = "invoice.macaroon"
defaultLogLevel = "info"
defaultLogDirname = "logs"
defaultLogFilename = "lnd.log"
defaultRPCPort = 10009
defaultRESTPort = 8080
defaultPeerPort = 9735
defaultRPCHost = "localhost"
defaultMaxPendingChannels = 1
defaultNoSeedBackup = false
defaultTrickleDelay = 90 * 1000
defaultInactiveChanTimeout = 20 * time.Minute
defaultMaxLogFiles = 3
defaultMaxLogFileSize = 10
defaultMinBackoff = time.Second
defaultMaxBackoff = time.Hour
defaultConfigFilename = "lnd.conf"
defaultDataDirname = "data"
defaultChainSubDirname = "chain"
defaultGraphSubDirname = "graph"
defaultTLSCertFilename = "tls.cert"
defaultTLSKeyFilename = "tls.key"
defaultAdminMacFilename = "admin.macaroon"
defaultReadMacFilename = "readonly.macaroon"
defaultInvoiceMacFilename = "invoice.macaroon"
defaultLogLevel = "info"
defaultLogDirname = "logs"
defaultLogFilename = "lnd.log"
defaultRPCPort = 10009
defaultRESTPort = 8080
defaultPeerPort = 9735
defaultRPCHost = "localhost"
defaultMaxPendingChannels = 1
defaultNoSeedBackup = false
defaultTrickleDelay = 90 * 1000
defaultChanStatusSampleInterval = time.Minute
defaultChanEnableTimeout = 19 * time.Minute
defaultChanDisableTimeout = 20 * time.Minute
defaultMaxLogFiles = 3
defaultMaxLogFileSize = 10
defaultMinBackoff = time.Second
defaultMaxBackoff = time.Hour
defaultTorSOCKSPort = 9050
defaultTorDNSHost = "soa.nodes.lightning.directory"
@ -234,8 +236,10 @@ type config struct {
NoSeedBackup bool `long:"noseedbackup" description:"If true, NO SEED WILL BE EXPOSED AND THE WALLET WILL BE ENCRYPTED USING THE DEFAULT PASSPHRASE -- EVER. THIS FLAG IS ONLY FOR TESTING AND IS BEING DEPRECATED."`
TrickleDelay int `long:"trickledelay" description:"Time in milliseconds between each release of announcements to the network"`
InactiveChanTimeout time.Duration `long:"inactivechantimeout" description:"If a channel has been inactive for the set time, send a ChannelUpdate disabling it."`
TrickleDelay int `long:"trickledelay" description:"Time in milliseconds between each release of announcements to the network"`
ChanEnableTimeout time.Duration `long:"chan-enable-timeout" description:"The duration that a peer connection must be stable before attempting to send a channel update to reenable or cancel a pending disables of the peer's channels on the network (default: 19m)."`
ChanDisableTimeout time.Duration `long:"chan-disable-timeout" description:"The duration that must elapse after first detecting that an already active channel is actually inactive and sending channel update disabling it to the network. The pending disable can be canceled if the peer reconnects and becomes stable for chan-enable-timeout before the disable update is sent. (default: 20m)"`
ChanStatusSampleInterval time.Duration `long:"chan-status-sample-interval" description:"The polling interval between attempts to detect if an active channel has become inactive due to its peer going offline. (default: 1m)"`
Alias string `long:"alias" description:"The node alias. Used as a moniker by peers and intelligence services"`
Color string `long:"color" description:"The color of the node in hex format (i.e. '#3399FF'). Used to customize node appearance in intelligence services"`
@ -317,11 +321,13 @@ func loadConfig() (*config, error) {
"preferential": 1.0,
},
},
TrickleDelay: defaultTrickleDelay,
InactiveChanTimeout: defaultInactiveChanTimeout,
Alias: defaultAlias,
Color: defaultColor,
MinChanSize: int64(minChanFundingSize),
TrickleDelay: defaultTrickleDelay,
ChanStatusSampleInterval: defaultChanStatusSampleInterval,
ChanEnableTimeout: defaultChanEnableTimeout,
ChanDisableTimeout: defaultChanDisableTimeout,
Alias: defaultAlias,
Color: defaultColor,
MinChanSize: int64(minChanFundingSize),
Tor: &torConfig{
SOCKS: defaultTorSOCKS,
DNS: defaultTorDNS,

@ -1051,6 +1051,32 @@ out:
}
}
// assertNoChannelUpdates ensures that no ChannelUpdates are sent via the
// graphSubscription. This method will block for the provided duration before
// returning to the caller if successful.
func assertNoChannelUpdates(t *harnessTest, subscription graphSubscription,
duration time.Duration) {
timeout := time.After(duration)
for {
select {
case graphUpdate := <-subscription.updateChan:
if len(graphUpdate.ChannelUpdates) > 0 {
t.Fatalf("received %d channel updates when "+
"none were expected",
len(graphUpdate.ChannelUpdates))
}
case err := <-subscription.errChan:
t.Fatalf("graph subscription failure: %v", err)
case <-timeout:
// No updates received, success.
return
}
}
}
// assertChannelPolicy asserts that the passed node's known channel policy for
// the passed chanPoint is consistent with the expected policy values.
func assertChannelPolicy(t *harnessTest, node *lntest.HarnessNode,
@ -12734,7 +12760,13 @@ func testSendUpdateDisableChannel(net *lntest.NetworkHarness, t *harnessTest) {
},
)
carol, err := net.NewNode("Carol", nil)
carol, err := net.NewNode("Carol", []string{
"--minbackoff=10s",
"--unsafe-disconnect",
"--chan-enable-timeout=1.5s",
"--chan-disable-timeout=3s",
"--chan-status-sample-interval=.5s",
})
if err != nil {
t.Fatalf("unable to create carol's node: %v", err)
}
@ -12755,7 +12787,12 @@ func testSendUpdateDisableChannel(net *lntest.NetworkHarness, t *harnessTest) {
// We create a new node Eve that has an inactive channel timeout of
// just 2 seconds (down from the default 20m). It will be used to test
// channel updates for channels going inactive.
eve, err := net.NewNode("Eve", []string{"--inactivechantimeout=2s"})
eve, err := net.NewNode("Eve", []string{
"--minbackoff=10s",
"--chan-enable-timeout=1.5s",
"--chan-disable-timeout=3s",
"--chan-status-sample-interval=.5s",
})
if err != nil {
t.Fatalf("unable to create eve's node: %v", err)
}
@ -12840,6 +12877,60 @@ func testSendUpdateDisableChannel(net *lntest.NetworkHarness, t *harnessTest) {
},
)
// Now we'll test a long disconnection. Disconnect Carol and Eve and
// ensure they both detect each other as disabled. Their min backoffs
// are high enough to not interfere with disabling logic.
ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)
if err := net.DisconnectNodes(ctxt, carol, eve); err != nil {
t.Fatalf("unable to disconnect Carol from Eve: %v", err)
}
// Wait for a disable from both Carol and Eve to come through.
expectedPolicy.Disabled = true
waitForChannelUpdate(
t, daveSub,
[]expectedChanUpdate{
{eve.PubKeyStr, expectedPolicy, chanPointEveCarol},
{carol.PubKeyStr, expectedPolicy, chanPointEveCarol},
},
)
// Reconnect Carol and Eve, this should cause them to reenable the
// channel from both ends after a short delay.
ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)
if err := net.EnsureConnected(ctxt, carol, eve); err != nil {
t.Fatalf("unable to reconnect Carol to Eve: %v", err)
}
expectedPolicy.Disabled = false
waitForChannelUpdate(
t, daveSub,
[]expectedChanUpdate{
{eve.PubKeyStr, expectedPolicy, chanPointEveCarol},
{carol.PubKeyStr, expectedPolicy, chanPointEveCarol},
},
)
// Now we'll test a short disconnection. Disconnect Carol and Eve, then
// reconnect them after one second so that their scheduled disables are
// aborted. One second is twice the status sample interval, so this
// should allow for the disconnect to be detected, but still leave time
// to cancel the announcement before the 3 second inactive timeout is
// hit.
ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)
if err := net.DisconnectNodes(ctxt, carol, eve); err != nil {
t.Fatalf("unable to disconnect Carol from Eve: %v", err)
}
time.Sleep(time.Second)
ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)
if err := net.EnsureConnected(ctxt, eve, carol); err != nil {
t.Fatalf("unable to reconnect Carol to Eve: %v", err)
}
// Since the disable should have been canceled by both Carol and Eve, we
// expect no channel updates to appear on the network.
assertNoChannelUpdates(t, daveSub, 4*time.Second)
// Close Alice's channels with Bob and Carol cooperatively and
// unilaterally respectively.
ctxt, _ = context.WithTimeout(ctxb, channelCloseTimeout)

@ -0,0 +1,595 @@
package netann
import (
"errors"
"sync"
"sync/atomic"
"time"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
)
var (
// ErrChanStatusManagerExiting signals that a shutdown of the
// ChanStatusManager has already been requested.
ErrChanStatusManagerExiting = errors.New("chan status manager exiting")
// ErrInvalidTimeoutConstraints signals that the ChanStatusManager could
// not be initialized because the timeouts and sample intervals were
// malformed.
ErrInvalidTimeoutConstraints = errors.New("active_timeout + " +
"sample_interval must be less than or equal to " +
"inactive_timeout and be positive integers")
// ErrEnableInactiveChan signals that a request to enable a channel
// could not be completed because the channel isn't actually active at
// the time of the request.
ErrEnableInactiveChan = errors.New("unable to enable channel which " +
"is not currently active")
)
// ChanStatusConfig holds parameters and resources required by the
// ChanStatusManager to perform its duty.
type ChanStatusConfig struct {
// OurPubKey is the public key identifying this node on the network.
OurPubKey *btcec.PublicKey
// MessageSigner signs messages that validate under OurPubKey.
MessageSigner lnwallet.MessageSigner
// IsChannelActive checks whether the channel identified by the provided
// ChannelID is considered active. This should only return true if the
// channel has been sufficiently confirmed, the channel has received
// FundingLocked, and the remote peer is online.
IsChannelActive func(lnwire.ChannelID) bool
// ApplyChannelUpdate processes new ChannelUpdates signed by our node by
// updating our local routing table and broadcasting the update to our
// peers.
ApplyChannelUpdate func(*lnwire.ChannelUpdate) error
// DB stores the set of channels that are to be monitored.
DB DB
// Graph stores the channel info and policies for channels in DB.
Graph ChannelGraph
// ChanEnableTimeout is the duration a peer's connect must remain stable
// before attempting to reenable the channel.
//
// NOTE: This value is only used to verify that the relation between
// itself, ChanDisableTimeout, and ChanStatusSampleInterval is correct.
// The user is still responsible for ensuring that the same duration
// elapses before attempting to reenable a channel.
ChanEnableTimeout time.Duration
// ChanDisableTimeout is the duration the manager will wait after
// detecting that a channel has become inactive before broadcasting an
// update to disable the channel.
ChanDisableTimeout time.Duration
// ChanStatusSampleInterval is the long-polling interval used by the
// manager to check if the channels being monitored have become
// inactive.
ChanStatusSampleInterval time.Duration
}
// ChanStatusManager facilitates requests to enable or disable a channel via a
// network announcement that sets the disable bit on the ChannelUpdate
// accordingly. The manager will periodically sample to detect cases where a
// link has become inactive, and facilitate the process of disabling the channel
// passively. The ChanStatusManager state machine is designed to reduce the
// likelihood of spamming the network with updates for flapping peers.
type ChanStatusManager struct {
started uint32 // to be used atomically
stopped uint32 // to be used atomically
cfg *ChanStatusConfig
// ourPubKeyBytes is the serialized compressed pubkey of our node.
ourPubKeyBytes []byte
// chanStates contains the set of channels being monitored for status
// updates. Access to the map is serialized by the statusManager's event
// loop.
chanStates channelStates
// enableRequests pipes external requests to enable a channel into the
// primary event loop.
enableRequests chan statusRequest
// disableRequests pipes external requests to disable a channel into the
// primary event loop.
disableRequests chan statusRequest
// statusSampleTicker fires at the interval prescribed by
// ChanStatusSampleInterval to check if channels in chanStates have
// become inactive.
statusSampleTicker *time.Ticker
wg sync.WaitGroup
quit chan struct{}
}
// NewChanStatusManager initializes a new ChanStatusManager using the given
// configuration. An error is returned if the timeouts and sample interval fail
// to meet do not satisfy the equation:
// ChanEnableTimeout + ChanStatusSampleInterval > ChanDisableTimeout.
func NewChanStatusManager(cfg *ChanStatusConfig) (*ChanStatusManager, error) {
// Assert that the config timeouts are properly formed. We require the
// enable_timeout + sample_interval to be less than or equal to the
// disable_timeout and that all are positive values. A peer that
// disconnects and reconnects quickly may cause a disable update to be
// sent, shortly followed by a reenable. Ensuring a healthy separation
// helps dampen the possibility of spamming updates that toggle the
// disable bit for such events.
if cfg.ChanStatusSampleInterval <= 0 {
return nil, ErrInvalidTimeoutConstraints
}
if cfg.ChanEnableTimeout <= 0 {
return nil, ErrInvalidTimeoutConstraints
}
if cfg.ChanDisableTimeout <= 0 {
return nil, ErrInvalidTimeoutConstraints
}
if cfg.ChanEnableTimeout+cfg.ChanStatusSampleInterval >
cfg.ChanDisableTimeout {
return nil, ErrInvalidTimeoutConstraints
}
return &ChanStatusManager{
cfg: cfg,
ourPubKeyBytes: cfg.OurPubKey.SerializeCompressed(),
chanStates: make(channelStates),
statusSampleTicker: time.NewTicker(cfg.ChanStatusSampleInterval),
enableRequests: make(chan statusRequest),
disableRequests: make(chan statusRequest),
quit: make(chan struct{}),
}, nil
}
// Start safely starts the ChanStatusManager.
func (m *ChanStatusManager) Start() error {
if !atomic.CompareAndSwapUint32(&m.started, 0, 1) {
return nil
}
channels, err := m.fetchChannels()
if err != nil {
return err
}
// Populate the initial states of all confirmed, public channels.
for _, c := range channels {
_, err := m.getOrInitChanStatus(c.FundingOutpoint)
switch {
// If we can't retrieve the edge info for this channel, it may
// have been pruned from the channel graph but not yet from our
// set of channels. We'll skip it as we can't determine its
// initial state.
case err == channeldb.ErrEdgeNotFound:
log.Warnf("Unable to find channel policies for %v, "+
"skipping. This is typical if the channel is "+
"in the process of closing.", c.FundingOutpoint)
continue
case err != nil:
return err
}
}
m.wg.Add(1)
go m.statusManager()
return nil
}
// Stop safely shuts down the ChanStatusManager.
func (m *ChanStatusManager) Stop() error {
if !atomic.CompareAndSwapUint32(&m.stopped, 0, 1) {
return nil
}
close(m.quit)
m.wg.Wait()
return nil
}
// RequestEnable submits a request to immediately enable a channel identified by
// the provided outpoint. If the channel is already enabled, no action will be
// taken. If the channel is marked pending-disable the channel will be returned
// to an active status as the scheduled disable was never sent. Otherwise if the
// channel is found to be disabled, a new announcement will be signed with the
// disabled bit cleared and broadcast to the network.
//
// NOTE: RequestEnable should only be called after a stable connection with the
// channel's peer has lasted at least the ChanEnableTimeout. Failure to do so
// may result in behavior that deviates from the expected behavior of the state
// machine.
func (m *ChanStatusManager) RequestEnable(outpoint wire.OutPoint) error {
return m.submitRequest(m.enableRequests, outpoint)
}
// RequestDisable submits a request to immediately disable a channel identified
// by the provided outpoint. If the channel is already disabled, no action will
// be taken. Otherwise, a new announcement will be signed with the disabled bit
// set and broadcast to the network.
func (m *ChanStatusManager) RequestDisable(outpoint wire.OutPoint) error {
return m.submitRequest(m.disableRequests, outpoint)
}
// statusRequest is passed to the statusManager to request a change in status
// for a particular channel point. The exact action is governed by passing the
// request through one of the enableRequests or disableRequests channels.
type statusRequest struct {
outpoint wire.OutPoint
errChan chan error
}
// submitRequest sends a request for either enabling or disabling a particular
// outpoint and awaits an error response. The request type is dictated by the
// reqChan passed in, which can be either of the enableRequests or
// disableRequests channels.
func (m *ChanStatusManager) submitRequest(reqChan chan statusRequest,
outpoint wire.OutPoint) error {
req := statusRequest{
outpoint: outpoint,
errChan: make(chan error, 1),
}
select {
case reqChan <- req:
case <-m.quit:
return ErrChanStatusManagerExiting
}
select {
case err := <-req.errChan:
return err
case <-m.quit:
return ErrChanStatusManagerExiting
}
}
// statusManager is the primary event loop for the ChanStatusManager, providing
// the necessary synchronization primitive to protect access to the chanStates
// map. All requests to explicitly enable or disable a channel are processed
// within this method. The statusManager will also periodically poll the active
// status of channels within the htlcswitch to see if a disable announcement
// should be scheduled or broadcast.
//
// NOTE: This method MUST be run as a goroutine.
func (m *ChanStatusManager) statusManager() {
defer m.wg.Done()
for {
select {
// Process any requests to mark channel as enabled.
case req := <-m.enableRequests:
req.errChan <- m.processEnableRequest(req.outpoint)
// Process any requests to mark channel as disabled.
case req := <-m.disableRequests:
req.errChan <- m.processDisableRequest(req.outpoint)
// Use long-polling to detect when channels become inactive.
case <-m.statusSampleTicker.C:
// First, do a sweep and mark any ChanStatusEnabled
// channels that are not active within the htlcswitch as
// ChanStatusPendingDisabled. The channel will then be
// disabled if no request to enable is received before
// the ChanDisableTimeout expires.
m.markPendingInactiveChannels()
// Now, do another sweep to disable any channels that
// were marked in a prior iteration as pending inactive
// if the inactive chan timeout has elapsed.
m.disableInactiveChannels()
case <-m.quit:
return
}
}
}
// processEnableRequest attempts to enable the given outpoint. If the method
// returns nil, the status of the channel in chanStates will be
// ChanStatusEnabled. If the channel is not active at the time of the request,
// ErrEnableInactiveChan will be returned. An update will be broadcast only if
// the channel is currently disabled, otherwise no update will be sent on the
// network.
func (m *ChanStatusManager) processEnableRequest(outpoint wire.OutPoint) error {
curState, err := m.getOrInitChanStatus(outpoint)
if err != nil {
return err
}
// Quickly check to see if the requested channel is active within the
// htlcswitch and return an error if it isn't.
chanID := lnwire.NewChanIDFromOutPoint(&outpoint)
if !m.cfg.IsChannelActive(chanID) {
return ErrEnableInactiveChan
}
switch curState.Status {
// Channel is already enabled, nothing to do.
case ChanStatusEnabled:
return nil
// The channel is enabled, though we are now canceling the scheduled
// disable.
case ChanStatusPendingDisabled:
log.Debugf("Channel(%v) already enabled, canceling scheduled "+
"disable", outpoint)
// We'll sign a new update if the channel is still disabled.
case ChanStatusDisabled:
log.Infof("Announcing channel(%v) enabled", outpoint)
err := m.signAndSendNextUpdate(outpoint, false)
if err != nil {
return err
}
}
m.chanStates.markEnabled(outpoint)
return nil
}
// processDisableRequest attempts to disable the given outpoint. If the method
// returns nil, the status of the channel in chanStates will be
// ChanStatusDisabled. An update will only be sent if the channel has a status
// other than ChanStatusEnabled, otherwise no update will be sent on the
// network.
func (m *ChanStatusManager) processDisableRequest(outpoint wire.OutPoint) error {
curState, err := m.getOrInitChanStatus(outpoint)
if err != nil {
return err
}
switch curState.Status {
// Channel is already disabled, nothing to do.
case ChanStatusDisabled:
return nil
// We'll sign a new update disabling the channel if the current status
// is enabled or pending-inactive.
case ChanStatusEnabled, ChanStatusPendingDisabled:
log.Infof("Announcing channel(%v) disabled [requested]",
outpoint)
err := m.signAndSendNextUpdate(outpoint, true)
if err != nil {
return err
}
}
// If the disable was requested via the manager's public interface, we
// will remove the output from our map of channel states. Typically this
// signals that the channel is being closed, so this frees up the space
// in the map. If for some reason the channel isn't closed, the state
// will be repopulated on subsequent calls to RequestEnable or
// RequestDisable via a db lookup, or on startup.
delete(m.chanStates, outpoint)
return nil
}
// markPendingInactiveChannels performs a sweep of the database's active
// channels and determines which, if any, should have a disable announcement
// scheduled. Once an active channel is determined to be pending-inactive, one
// of two transitions can follow. Either the channel is disabled because no
// request to enable is received before the scheduled disable is broadcast, or
// the channel is successfully reenabled and channel is returned to an active
// state from the POV of the ChanStatusManager.
func (m *ChanStatusManager) markPendingInactiveChannels() {
channels, err := m.fetchChannels()
if err != nil {
log.Errorf("Unable to load active channels: %v", err)
return
}
for _, c := range channels {
// Determine the initial status of the active channel, and
// populate the entry in the chanStates map.
curState, err := m.getOrInitChanStatus(c.FundingOutpoint)
if err != nil {
log.Errorf("Unable to retrieve chan status for "+
"Channel(%v): %v", c.FundingOutpoint, err)
continue
}
// If the channel's status is not ChanStatusEnabled, we are
// done. Either it is already disabled, or it has been marked
// ChanStatusPendingDisable meaning that we have already
// scheduled the time at which it will be disabled.
if curState.Status != ChanStatusEnabled {
continue
}
// If our bookkeeping shows the channel as active, sample the
// htlcswitch to see if it believes the link is also active. If
// so, we will skip marking it as ChanStatusPendingDisabled.
chanID := lnwire.NewChanIDFromOutPoint(&c.FundingOutpoint)
if m.cfg.IsChannelActive(chanID) {
continue
}
// Otherwise, we discovered that this link was inactive within
// the switch. Compute the time at which we will send out a
// disable if the peer is unable to reestablish a stable
// connection.
disableTime := time.Now().Add(m.cfg.ChanDisableTimeout)
log.Debugf("Marking channel(%v) pending-inactive",
c.FundingOutpoint)
m.chanStates.markPendingDisabled(c.FundingOutpoint, disableTime)
}
}
// disableInactiveChannels scans through the set of monitored channels, and
// broadcast a disable update for any pending inactive channels whose
// SendDisableTime has been superseded by the current time.
func (m *ChanStatusManager) disableInactiveChannels() {
// Now, disable any channels whose inactive chan timeout has elapsed.
now := time.Now()
for outpoint, state := range m.chanStates {
// Ignore statuses that are not in the pending-inactive state.
if state.Status != ChanStatusPendingDisabled {
continue
}
// Ignore statuses for which the disable timeout has not
// expired.
if state.SendDisableTime.After(now) {
continue
}
log.Infof("Announcing channel(%v) disabled "+
"[detected]", outpoint)
// Sign an update disabling the channel.
err := m.signAndSendNextUpdate(outpoint, true)
if err != nil {
log.Errorf("Unable to sign update disabling "+
"channel(%v): %v", outpoint, err)
continue
}
// Record that the channel has now been disabled.
m.chanStates.markDisabled(outpoint)
}
}
// fetchChannels returns the working set of channels managed by the
// ChanStatusManager. The returned channels are filtered to only contain public
// channels.
func (m *ChanStatusManager) fetchChannels() ([]*channeldb.OpenChannel, error) {
allChannels, err := m.cfg.DB.FetchAllOpenChannels()
if err != nil {
return nil, err
}
// Filter out private channels.
var channels []*channeldb.OpenChannel
for _, c := range allChannels {
// We'll skip any private channels, as they aren't used for
// routing within the network by other nodes.
if c.ChannelFlags&lnwire.FFAnnounceChannel == 0 {
continue
}
channels = append(channels, c)
}
return channels, nil
}
// signAndSendNextUpdate computes and signs a valid update for the passed
// outpoint, with the ability to toggle the disabled bit. The new update will
// use the current time as the update's timestamp, or increment the old
// timestamp by 1 to ensure the update can propagate. If signing is successful,
// the new update will be sent out on the network.
func (m *ChanStatusManager) signAndSendNextUpdate(outpoint wire.OutPoint,
disabled bool) error {
// Retrieve the latest update for this channel. We'll use this
// as our starting point to send the new update.
chanUpdate, err := m.fetchLastChanUpdateByOutPoint(outpoint)
if err != nil {
return err
}
err = SignChannelUpdate(
m.cfg.MessageSigner, m.cfg.OurPubKey, chanUpdate,
ChannelUpdateSetDisable(disabled),
)
if err != nil {
return err
}
return m.cfg.ApplyChannelUpdate(chanUpdate)
}
// fetchLastChanUpdateByOutPoint fetches the latest policy for our direction of
// a channel, and crafts a new ChannelUpdate with this policy. Returns an error
// in case our ChannelEdgePolicy is not found in the database.
func (m *ChanStatusManager) fetchLastChanUpdateByOutPoint(op wire.OutPoint) (
*lnwire.ChannelUpdate, error) {
// Get the edge info and policies for this channel from the graph.
info, edge1, edge2, err := m.cfg.Graph.FetchChannelEdgesByOutpoint(&op)
if err != nil {
return nil, err
}
return ExtractChannelUpdate(m.ourPubKeyBytes, info, edge1, edge2)
}
// loadInitialChanState determines the initial ChannelState for a particular
// outpoint. The initial ChanStatus for a given outpoint will either be
// ChanStatusEnabled or ChanStatusDisabled, determined by inspecting the bits on
// the most recent announcement. An error is returned if the latest update could
// not be retrieved.
func (m *ChanStatusManager) loadInitialChanState(
outpoint *wire.OutPoint) (ChannelState, error) {
lastUpdate, err := m.fetchLastChanUpdateByOutPoint(*outpoint)
if err != nil {
return ChannelState{}, err
}
// Determine the channel's starting status by inspecting the disable bit
// on last announcement we sent out.
var initialStatus ChanStatus
if lastUpdate.ChannelFlags&lnwire.ChanUpdateDisabled == 0 {
initialStatus = ChanStatusEnabled
} else {
initialStatus = ChanStatusDisabled
}
return ChannelState{
Status: initialStatus,
}, nil
}
// getOrInitChanStatus retrieves the current ChannelState for a particular
// outpoint. If the chanStates map already contains an entry for the outpoint,
// the value in the map is returned. Otherwise, the outpoint's initial status is
// computed and updated in the chanStates map before being returned.
func (m *ChanStatusManager) getOrInitChanStatus(
outpoint wire.OutPoint) (ChannelState, error) {
// Return the current ChannelState from the chanStates map if it is
// already known to the ChanStatusManager.
if curState, ok := m.chanStates[outpoint]; ok {
return curState, nil
}
// Otherwise, determine the initial state based on the last update we
// sent for the outpoint.
initialState, err := m.loadInitialChanState(&outpoint)
if err != nil {
return ChannelState{}, err
}
// Finally, store the initial state in the chanStates map. This will
// serve as are up-to-date view of the outpoint's current status, in
// addition to making the channel eligible for detecting inactivity.
m.chanStates[outpoint] = initialState
return initialState, nil
}

@ -0,0 +1,816 @@
package netann_test
import (
"bytes"
"crypto/rand"
"encoding/binary"
"fmt"
"io"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/netann"
)
// randOutpoint creates a random wire.Outpoint.
func randOutpoint(t *testing.T) wire.OutPoint {
t.Helper()
var buf [36]byte
_, err := io.ReadFull(rand.Reader, buf[:])
if err != nil {
t.Fatalf("unable to generate random outpoint: %v", err)
}
op := wire.OutPoint{}
copy(op.Hash[:], buf[:32])
op.Index = binary.BigEndian.Uint32(buf[32:])
return op
}
var shortChanIDs uint64
// createChannel generates a channeldb.OpenChannel with a random chanpoint and
// short channel id.
func createChannel(t *testing.T) *channeldb.OpenChannel {
t.Helper()
sid := atomic.AddUint64(&shortChanIDs, 1)
return &channeldb.OpenChannel{
ShortChannelID: lnwire.NewShortChanIDFromInt(sid),
ChannelFlags: lnwire.FFAnnounceChannel,
FundingOutpoint: randOutpoint(t),
}
}
// createEdgePolicies generates an edge info and two directional edge policies.
// The remote party's public key is generated randomly, and then sorted against
// our `pubkey` with the direction bit set appropriately in the policies. Our
// update will be created with the disabled bit set if startEnabled is false.
func createEdgePolicies(t *testing.T, channel *channeldb.OpenChannel,
pubkey *btcec.PublicKey, startEnabled bool) (*channeldb.ChannelEdgeInfo,
*channeldb.ChannelEdgePolicy, *channeldb.ChannelEdgePolicy) {
var (
pubkey1 [33]byte
pubkey2 [33]byte
dir1 lnwire.ChanUpdateChanFlags
dir2 lnwire.ChanUpdateChanFlags
)
// Set pubkey1 to OUR pubkey.
copy(pubkey1[:], pubkey.SerializeCompressed())
// Set the disabled bit appropriately on our update.
if !startEnabled {
dir1 |= lnwire.ChanUpdateDisabled
}
// Generate and set pubkey2 for THEIR pubkey.
privKey2, err := btcec.NewPrivateKey(btcec.S256())
if err != nil {
t.Fatalf("unable to generate key pair: %v", err)
}
copy(pubkey2[:], privKey2.PubKey().SerializeCompressed())
// Set pubkey1 to the lower of the two pubkeys.
if bytes.Compare(pubkey2[:], pubkey1[:]) < 0 {
pubkey1, pubkey2 = pubkey2, pubkey1
dir1, dir2 = dir2, dir1
}
// Now that the ordering has been established, set pubkey2's direction
// bit.
dir2 |= lnwire.ChanUpdateDirection
return &channeldb.ChannelEdgeInfo{
ChannelPoint: channel.FundingOutpoint,
NodeKey1Bytes: pubkey1,
NodeKey2Bytes: pubkey2,
},
&channeldb.ChannelEdgePolicy{
ChannelID: channel.ShortChanID().ToUint64(),
ChannelFlags: dir1,
LastUpdate: time.Now(),
SigBytes: make([]byte, 64),
},
&channeldb.ChannelEdgePolicy{
ChannelID: channel.ShortChanID().ToUint64(),
ChannelFlags: dir2,
LastUpdate: time.Now(),
SigBytes: make([]byte, 64),
}
}
type mockGraph struct {
pubKey *btcec.PublicKey
mu sync.Mutex
channels []*channeldb.OpenChannel
chanInfos map[wire.OutPoint]*channeldb.ChannelEdgeInfo
chanPols1 map[wire.OutPoint]*channeldb.ChannelEdgePolicy
chanPols2 map[wire.OutPoint]*channeldb.ChannelEdgePolicy
sidToCid map[lnwire.ShortChannelID]wire.OutPoint
updates chan *lnwire.ChannelUpdate
}
func newMockGraph(t *testing.T, numChannels int,
startActive, startEnabled bool, pubKey *btcec.PublicKey) *mockGraph {
g := &mockGraph{
channels: make([]*channeldb.OpenChannel, 0, numChannels),
chanInfos: make(map[wire.OutPoint]*channeldb.ChannelEdgeInfo),
chanPols1: make(map[wire.OutPoint]*channeldb.ChannelEdgePolicy),
chanPols2: make(map[wire.OutPoint]*channeldb.ChannelEdgePolicy),
sidToCid: make(map[lnwire.ShortChannelID]wire.OutPoint),
updates: make(chan *lnwire.ChannelUpdate, 2*numChannels),
}
for i := 0; i < numChannels; i++ {
c := createChannel(t)
info, pol1, pol2 := createEdgePolicies(
t, c, pubKey, startEnabled,
)
g.addChannel(c)
g.addEdgePolicy(c, info, pol1, pol2)
}
return g
}
func (g *mockGraph) FetchAllOpenChannels() ([]*channeldb.OpenChannel, error) {
return g.chans(), nil
}
func (g *mockGraph) FetchChannelEdgesByOutpoint(
op *wire.OutPoint) (*channeldb.ChannelEdgeInfo,
*channeldb.ChannelEdgePolicy, *channeldb.ChannelEdgePolicy, error) {
g.mu.Lock()
defer g.mu.Unlock()
info, ok := g.chanInfos[*op]
if !ok {
return nil, nil, nil, channeldb.ErrEdgeNotFound
}
pol1 := g.chanPols1[*op]
pol2 := g.chanPols2[*op]
return info, pol1, pol2, nil
}
func (g *mockGraph) ApplyChannelUpdate(update *lnwire.ChannelUpdate) error {
g.mu.Lock()
defer g.mu.Unlock()
outpoint, ok := g.sidToCid[update.ShortChannelID]
if !ok {
return fmt.Errorf("unknown short channel id: %v",
update.ShortChannelID)
}
pol1 := g.chanPols1[outpoint]
pol2 := g.chanPols2[outpoint]
// Determine which policy we should update by making the flags on the
// policies and updates, and seeing which match up.
var update1 bool
switch {
case update.ChannelFlags&lnwire.ChanUpdateDirection ==
pol1.ChannelFlags&lnwire.ChanUpdateDirection:
update1 = true
case update.ChannelFlags&lnwire.ChanUpdateDirection ==
pol2.ChannelFlags&lnwire.ChanUpdateDirection:
update1 = false
default:
return fmt.Errorf("unable to find policy to update")
}
timestamp := time.Unix(int64(update.Timestamp), 0)
policy := &channeldb.ChannelEdgePolicy{
ChannelID: update.ShortChannelID.ToUint64(),
ChannelFlags: update.ChannelFlags,
LastUpdate: timestamp,
SigBytes: make([]byte, 64),
}
if update1 {
g.chanPols1[outpoint] = policy
} else {
g.chanPols2[outpoint] = policy
}
// Send the update to network. This channel should be sufficiently
// buffered to avoid deadlocking.
g.updates <- update
return nil
}
func (g *mockGraph) chans() []*channeldb.OpenChannel {
g.mu.Lock()
defer g.mu.Unlock()
channels := make([]*channeldb.OpenChannel, 0, len(g.channels))
for _, channel := range g.channels {
channels = append(channels, channel)
}
return channels
}
func (g *mockGraph) addChannel(channel *channeldb.OpenChannel) {
g.mu.Lock()
defer g.mu.Unlock()
g.channels = append(g.channels, channel)
}
func (g *mockGraph) addEdgePolicy(c *channeldb.OpenChannel,
info *channeldb.ChannelEdgeInfo,
pol1, pol2 *channeldb.ChannelEdgePolicy) {
g.mu.Lock()
defer g.mu.Unlock()
g.chanInfos[c.FundingOutpoint] = info
g.chanPols1[c.FundingOutpoint] = pol1
g.chanPols2[c.FundingOutpoint] = pol2
g.sidToCid[c.ShortChanID()] = c.FundingOutpoint
}
func (g *mockGraph) removeChannel(channel *channeldb.OpenChannel) {
g.mu.Lock()
defer g.mu.Unlock()
for i, c := range g.channels {
if c.FundingOutpoint != channel.FundingOutpoint {
continue
}
g.channels = append(g.channels[:i], g.channels[i+1:]...)
delete(g.chanInfos, c.FundingOutpoint)
delete(g.chanPols1, c.FundingOutpoint)
delete(g.chanPols2, c.FundingOutpoint)
delete(g.sidToCid, c.ShortChanID())
return
}
}
type mockSwitch struct {
mu sync.Mutex
isActive map[lnwire.ChannelID]bool
}
func newMockSwitch() *mockSwitch {
return &mockSwitch{
isActive: make(map[lnwire.ChannelID]bool),
}
}
func (s *mockSwitch) HasActiveLink(chanID lnwire.ChannelID) bool {
s.mu.Lock()
defer s.mu.Unlock()
// If the link is found, we will returns it's active status. In the
// real switch, it returns EligibleToForward().
active, ok := s.isActive[chanID]
if ok {
return active
}
return false
}
func (s *mockSwitch) SetStatus(chanID lnwire.ChannelID, active bool) {
s.mu.Lock()
defer s.mu.Unlock()
s.isActive[chanID] = active
}
func newManagerCfg(t *testing.T, numChannels int,
startEnabled bool) (*netann.ChanStatusConfig, *mockGraph, *mockSwitch) {
t.Helper()
privKey, err := btcec.NewPrivateKey(btcec.S256())
if err != nil {
t.Fatalf("unable to generate key pair: %v", err)
}
graph := newMockGraph(
t, numChannels, startEnabled, startEnabled, privKey.PubKey(),
)
htlcSwitch := newMockSwitch()
cfg := &netann.ChanStatusConfig{
ChanStatusSampleInterval: 50 * time.Millisecond,
ChanEnableTimeout: 500 * time.Millisecond,
ChanDisableTimeout: time.Second,
OurPubKey: privKey.PubKey(),
MessageSigner: netann.NewNodeSigner(privKey),
IsChannelActive: htlcSwitch.HasActiveLink,
ApplyChannelUpdate: graph.ApplyChannelUpdate,
DB: graph,
Graph: graph,
}
return cfg, graph, htlcSwitch
}
type testHarness struct {
t *testing.T
numChannels int
graph *mockGraph
htlcSwitch *mockSwitch
mgr *netann.ChanStatusManager
ourPubKey *btcec.PublicKey
safeDisableTimeout time.Duration
}
// newHarness returns a new testHarness for testing a ChanStatusManager. The
// mockGraph will be populated with numChannels channels. The startActive and
// startEnabled govern the initial state of the channels wrt the htlcswitch and
// the network, respectively.
func newHarness(t *testing.T, numChannels int,
startActive, startEnabled bool) testHarness {
cfg, graph, htlcSwitch := newManagerCfg(t, numChannels, startEnabled)
mgr, err := netann.NewChanStatusManager(cfg)
if err != nil {
t.Fatalf("unable to create chan status manager: %v", err)
}
err = mgr.Start()
if err != nil {
t.Fatalf("unable to start chan status manager: %v", err)
}
h := testHarness{
t: t,
numChannels: numChannels,
graph: graph,
htlcSwitch: htlcSwitch,
mgr: mgr,
ourPubKey: cfg.OurPubKey,
safeDisableTimeout: (3 * cfg.ChanDisableTimeout) / 2, // 1.5x
}
// Initialize link status as requested.
if startActive {
h.markActive(h.graph.channels)
} else {
h.markInactive(h.graph.channels)
}
return h
}
// markActive updates the active status of the passed channels within the mock
// switch to active.
func (h *testHarness) markActive(channels []*channeldb.OpenChannel) {
h.t.Helper()
for _, channel := range channels {
chanID := lnwire.NewChanIDFromOutPoint(&channel.FundingOutpoint)
h.htlcSwitch.SetStatus(chanID, true)
}
}
// markInactive updates the active status of the passed channels within the mock
// switch to inactive.
func (h *testHarness) markInactive(channels []*channeldb.OpenChannel) {
h.t.Helper()
for _, channel := range channels {
chanID := lnwire.NewChanIDFromOutPoint(&channel.FundingOutpoint)
h.htlcSwitch.SetStatus(chanID, false)
}
}
// assertEnables requests enables for all of the passed channels, and asserts
// that the errors returned from RequestEnable matches expErr.
func (h *testHarness) assertEnables(channels []*channeldb.OpenChannel, expErr error) {
h.t.Helper()
for _, channel := range channels {
h.assertEnable(channel.FundingOutpoint, expErr)
}
}
// assertDisables requests disables for all of the passed channels, and asserts
// that the errors returned from RequestDisable matches expErr.
func (h *testHarness) assertDisables(channels []*channeldb.OpenChannel, expErr error) {
h.t.Helper()
for _, channel := range channels {
h.assertDisable(channel.FundingOutpoint, expErr)
}
}
// assertEnable requests an enable for the given outpoint, and asserts that the
// returned error matches expErr.
func (h *testHarness) assertEnable(outpoint wire.OutPoint, expErr error) {
h.t.Helper()
err := h.mgr.RequestEnable(outpoint)
if err != expErr {
h.t.Fatalf("expected enable error: %v, got %v", expErr, err)
}
}
// assertDisable requests a disable for the given outpoint, and asserts that the
// returned error matches expErr.
func (h *testHarness) assertDisable(outpoint wire.OutPoint, expErr error) {
h.t.Helper()
err := h.mgr.RequestDisable(outpoint)
if err != expErr {
h.t.Fatalf("expected disable error: %v, got %v", expErr, err)
}
}
// assertNoUpdates waits for the specified duration, and asserts that no updates
// are announced on the network.
func (h *testHarness) assertNoUpdates(duration time.Duration) {
h.t.Helper()
h.assertUpdates(nil, false, duration)
}
// assertUpdates waits for the specified duration, asserting that an update
// are receive on the network for each of the passed OpenChannels, and that all
// of their disable bits are set to match expEnabled. The expEnabled parameter
// is ignored if channels is nil.
func (h *testHarness) assertUpdates(channels []*channeldb.OpenChannel,
expEnabled bool, duration time.Duration) {
h.t.Helper()
// Compute an index of the expected short channel ids for which we want
// to received updates.
expSids := sidsFromChans(channels)
timeout := time.After(duration)
recvdSids := make(map[lnwire.ShortChannelID]struct{})
for {
select {
case upd := <-h.graph.updates:
// Assert that the received short channel id is one that
// we expect. If no updates were expected, this will
// always fail on the first update received.
if _, ok := expSids[upd.ShortChannelID]; !ok {
h.t.Fatalf("received update for unexpected "+
"short chan id: %v", upd.ShortChannelID)
}
// Assert that the disabled bit is set properly.
enabled := upd.ChannelFlags&lnwire.ChanUpdateDisabled !=
lnwire.ChanUpdateDisabled
if expEnabled != enabled {
h.t.Fatalf("expected enabled: %v, actual: %v",
expEnabled, enabled)
}
recvdSids[upd.ShortChannelID] = struct{}{}
case <-timeout:
// Time is up, assert that the correct number of unique
// updates was received.
if len(recvdSids) == len(channels) {
return
}
h.t.Fatalf("expected %d updates, got %d",
len(channels), len(recvdSids))
}
}
}
// sidsFromChans returns an index contain the short channel ids of each channel
// provided in the list of OpenChannels.
func sidsFromChans(
channels []*channeldb.OpenChannel) map[lnwire.ShortChannelID]struct{} {
sids := make(map[lnwire.ShortChannelID]struct{})
for _, channel := range channels {
sids[channel.ShortChanID()] = struct{}{}
}
return sids
}
type stateMachineTest struct {
name string
startEnabled bool
startActive bool
fn func(testHarness)
}
var stateMachineTests = []stateMachineTest{
{
name: "active and enabled is stable",
startActive: true,
startEnabled: true,
fn: func(h testHarness) {
// No updates should be sent because being active and
// enabled should be a stable state.
h.assertNoUpdates(h.safeDisableTimeout)
},
},
{
name: "inactive and disabled is stable",
startActive: false,
startEnabled: false,
fn: func(h testHarness) {
// No updates should be sent because being inactive and
// disabled should be a stable state.
h.assertNoUpdates(h.safeDisableTimeout)
},
},
{
name: "start disabled request enable",
startActive: true, // can't request enable unless active
startEnabled: false,
fn: func(h testHarness) {
// Request enables for all channels.
h.assertEnables(h.graph.chans(), nil)
// Expect to see them all enabled on the network.
h.assertUpdates(
h.graph.chans(), true, h.safeDisableTimeout,
)
},
},
{
name: "start enabled request disable",
startActive: true,
startEnabled: true,
fn: func(h testHarness) {
// Request disables for all channels.
h.assertDisables(h.graph.chans(), nil)
// Expect to see them all disabled on the network.
h.assertUpdates(
h.graph.chans(), false, h.safeDisableTimeout,
)
},
},
{
name: "request enable already enabled",
startActive: true,
startEnabled: true,
fn: func(h testHarness) {
// Request enables for already enabled channels.
h.assertEnables(h.graph.chans(), nil)
// Manager shouldn't send out any updates.
h.assertNoUpdates(h.safeDisableTimeout)
},
},
{
name: "request disabled already disabled",
startActive: false,
startEnabled: false,
fn: func(h testHarness) {
// Request disables for already enabled channels.
h.assertDisables(h.graph.chans(), nil)
// Manager shouldn't sent out any updates.
h.assertNoUpdates(h.safeDisableTimeout)
},
},
{
name: "detect and disable inactive",
startActive: true,
startEnabled: true,
fn: func(h testHarness) {
// Simulate disconnection and have links go inactive.
h.markInactive(h.graph.chans())
// Should see all channels passively disabled.
h.assertUpdates(
h.graph.chans(), false, h.safeDisableTimeout,
)
},
},
{
name: "quick flap stays active",
startActive: true,
startEnabled: true,
fn: func(h testHarness) {
// Simulate disconnection and have links go inactive.
h.markInactive(h.graph.chans())
// Allow 2 sample intervals to pass, but not long
// enough for a disable to occur.
time.Sleep(100 * time.Millisecond)
// Simulate reconnect by making channels active.
h.markActive(h.graph.chans())
// Request that all channels be reenabled.
h.assertEnables(h.graph.chans(), nil)
// Pending disable should have been canceled, and
// no updates sent. Channels remain enabled on the
// network.
h.assertNoUpdates(h.safeDisableTimeout)
},
},
{
name: "no passive enable from becoming active",
startActive: false,
startEnabled: false,
fn: func(h testHarness) {
// Simulate reconnect by making channels active.
h.markActive(h.graph.chans())
// No updates should be sent without explicit enable.
h.assertNoUpdates(h.safeDisableTimeout)
},
},
{
name: "enable inactive channel fails",
startActive: false,
startEnabled: false,
fn: func(h testHarness) {
// Request enable of inactive channels, expect error
// indicating that channel was not active.
h.assertEnables(
h.graph.chans(), netann.ErrEnableInactiveChan,
)
// No updates should be sent as a result of the failure.
h.assertNoUpdates(h.safeDisableTimeout)
},
},
{
name: "enable unknown channel fails",
startActive: false,
startEnabled: false,
fn: func(h testHarness) {
// Create channels unknown to the graph.
unknownChans := []*channeldb.OpenChannel{
createChannel(h.t),
createChannel(h.t),
createChannel(h.t),
}
// Request that they be enabled, which should return an
// error as the graph doesn't have an edge for them.
h.assertEnables(
unknownChans, channeldb.ErrEdgeNotFound,
)
// No updates should be sent as a result of the failure.
h.assertNoUpdates(h.safeDisableTimeout)
},
},
{
name: "disable unknown channel fails",
startActive: false,
startEnabled: false,
fn: func(h testHarness) {
// Create channels unknown to the graph.
unknownChans := []*channeldb.OpenChannel{
createChannel(h.t),
createChannel(h.t),
createChannel(h.t),
}
// Request that they be disabled, which should return an
// error as the graph doesn't have an edge for them.
h.assertDisables(
unknownChans, channeldb.ErrEdgeNotFound,
)
// No updates should be sent as a result of the failure.
h.assertNoUpdates(h.safeDisableTimeout)
},
},
{
name: "add new channels",
startActive: false,
startEnabled: false,
fn: func(h testHarness) {
// Allow the manager to enter a steady state for the
// initial channel set.
h.assertNoUpdates(h.safeDisableTimeout)
// Add a new channels to the graph, but don't yet add
// the edge policies. We should see no updates sent
// since the manager can't access the policies.
newChans := []*channeldb.OpenChannel{
createChannel(h.t),
createChannel(h.t),
createChannel(h.t),
}
for _, c := range newChans {
h.graph.addChannel(c)
}
h.assertNoUpdates(h.safeDisableTimeout)
// Check that trying to enable the channel with unknown
// edges results in a failure.
h.assertEnables(newChans, channeldb.ErrEdgeNotFound)
// Now, insert edge policies for the channel into the
// graph, starting with the channel enabled, and mark
// the link active.
for _, c := range newChans {
info, pol1, pol2 := createEdgePolicies(
h.t, c, h.ourPubKey, true,
)
h.graph.addEdgePolicy(c, info, pol1, pol2)
}
h.markActive(newChans)
// We expect no updates to be sent since the channel is
// enabled and active.
h.assertNoUpdates(h.safeDisableTimeout)
// Finally, assert that enabling the channel doesn't
// return an error now that everything is in place.
h.assertEnables(newChans, nil)
},
},
{
name: "remove channels then disable",
startActive: true,
startEnabled: true,
fn: func(h testHarness) {
// Allow the manager to enter a steady state for the
// initial channel set.
h.assertNoUpdates(h.safeDisableTimeout)
// Select half of the current channels to remove.
channels := h.graph.chans()
rmChans := channels[:len(channels)/2]
// Mark the channel inactive and remove them from the
// graph. This should trigger the manager to attempt a
// mark the channel disabled, but will unable to do so
// because it can't find the edge policies.
h.markInactive(rmChans)
for _, c := range rmChans {
h.graph.removeChannel(c)
}
h.assertNoUpdates(h.safeDisableTimeout)
// Check that trying to enable the channel with unknown
// edges results in a failure.
h.assertDisables(rmChans, channeldb.ErrEdgeNotFound)
},
},
{
name: "disable channels then remove",
startActive: true,
startEnabled: true,
fn: func(h testHarness) {
// Allow the manager to enter a steady state for the
// initial channel set.
h.assertNoUpdates(h.safeDisableTimeout)
// Select half of the current channels to remove.
channels := h.graph.chans()
rmChans := channels[:len(channels)/2]
// Check that trying to enable the channel with unknown
// edges results in a failure.
h.assertDisables(rmChans, nil)
// Since the channels are still in the graph, we expect
// these channels to be disabled on the network.
h.assertUpdates(rmChans, false, h.safeDisableTimeout)
// Finally, remove the channels from the graph and
// assert no more updates are sent.
for _, c := range rmChans {
h.graph.removeChannel(c)
}
h.assertNoUpdates(h.safeDisableTimeout)
},
},
}
// TestChanStatusManagerStateMachine tests the possible state transitions that
// can be taken by the ChanStatusManager.
func TestChanStatusManagerStateMachine(t *testing.T) {
t.Parallel()
for _, test := range stateMachineTests {
tc := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
const numChannels = 10
h := newHarness(
t, numChannels, tc.startActive, tc.startEnabled,
)
defer h.mgr.Stop()
tc.fn(h)
})
}
}

75
netann/channel_state.go Normal file

@ -0,0 +1,75 @@
package netann
import (
"time"
"github.com/btcsuite/btcd/wire"
)
// ChanStatus is a type that enumerates the possible states a ChanStatusManager
// tracks for its known channels.
type ChanStatus uint8
const (
// ChanStatusEnabled indicates that the channel's last announcement has
// the disabled bit cleared.
ChanStatusEnabled ChanStatus = iota
// ChanStatusPendingDisabled indicates that the channel's last
// announcement has the disabled bit cleared, but that the channel was
// detected in an inactive state. Channels in this state will have a
// disabling announcement sent after the ChanInactiveTimeout expires
// from the time of the first detection--unless the channel is
// explicitly reenabled before the disabling occurs.
ChanStatusPendingDisabled
// ChanStatusDisabled indicates that the channel's last announcement has
// the disabled bit set.
ChanStatusDisabled
)
// ChannelState describes the ChanStatusManager's view of a channel, and
// describes the current state the channel's disabled status on the network.
type ChannelState struct {
// Status is the channel's current ChanStatus from the POV of the
// ChanStatusManager.
Status ChanStatus
// SendDisableTime is the earliest time at which the ChanStatusManager
// will passively send a new disable announcement on behalf of this
// channel.
//
// NOTE: This field is only non-zero if status is
// ChanStatusPendingDisabled.
SendDisableTime time.Time
}
// channelStates is a map of channel outpoints to their channelState. All
// changes made after setting an entry initially should be made using receiver
// methods below.
type channelStates map[wire.OutPoint]ChannelState
// markEnabled creates a channelState using ChanStatusEnabled.
func (s *channelStates) markEnabled(outpoint wire.OutPoint) {
(*s)[outpoint] = ChannelState{
Status: ChanStatusEnabled,
}
}
// markDisabled creates a channelState using ChanStatusDisabled.
func (s *channelStates) markDisabled(outpoint wire.OutPoint) {
(*s)[outpoint] = ChannelState{
Status: ChanStatusDisabled,
}
}
// markPendingDisabled creates a channelState using ChanStatusPendingDisabled
// and sets the ChannelState's SendDisableTime to sendDisableTime.
func (s *channelStates) markPendingDisabled(outpoint wire.OutPoint,
sendDisableTime time.Time) {
(*s)[outpoint] = ChannelState{
Status: ChanStatusPendingDisabled,
SendDisableTime: sendDisableTime,
}
}

141
netann/channel_update.go Normal file

@ -0,0 +1,141 @@
package netann
import (
"bytes"
"fmt"
"time"
"github.com/btcsuite/btcd/btcec"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
)
// ChannelUpdateModifier is a closure that makes in-place modifications to an
// lnwire.ChannelUpdate.
type ChannelUpdateModifier func(*lnwire.ChannelUpdate)
// ChannelUpdateSetDisable sets the disabled channel flag if disabled is true,
// and clears the bit otherwise.
func ChannelUpdateSetDisable(disabled bool) ChannelUpdateModifier {
return func(update *lnwire.ChannelUpdate) {
if disabled {
// Set the bit responsible for marking a channel as
// disabled.
update.ChannelFlags |= lnwire.ChanUpdateDisabled
} else {
// Clear the bit responsible for marking a channel as
// disabled.
update.ChannelFlags &= ^lnwire.ChanUpdateDisabled
}
}
}
// SignChannelUpdate applies the given modifiers to the passed
// lnwire.ChannelUpdate, then signs the resulting update. The provided update
// should be the most recent, valid update, otherwise the timestamp may not
// monotonically increase from the prior.
//
// NOTE: This method modifies the given update.
func SignChannelUpdate(signer lnwallet.MessageSigner, pubKey *btcec.PublicKey,
update *lnwire.ChannelUpdate, mods ...ChannelUpdateModifier) error {
// Apply the requested changes to the channel update.
for _, modifier := range mods {
modifier(update)
}
// Update the message's timestamp to the current time. If the update's
// current time is already in the future, we increment the prior value
// to ensure the timestamps monotonically increase, otherwise the
// update won't propagate.
newTimestamp := uint32(time.Now().Unix())
if newTimestamp <= update.Timestamp {
newTimestamp = update.Timestamp + 1
}
update.Timestamp = newTimestamp
chanUpdateMsg, err := update.DataToSign()
if err != nil {
return err
}
// Create the DER-encoded ECDSA signature over the message digest.
sig, err := signer.SignMessage(pubKey, chanUpdateMsg)
if err != nil {
return err
}
// Parse the DER-encoded signature into a fixed-size 64-byte array.
update.Signature, err = lnwire.NewSigFromSignature(sig)
if err != nil {
return err
}
return nil
}
// ExtractChannelUpdate attempts to retrieve a lnwire.ChannelUpdate message from
// an edge's info and a set of routing policies.
//
// NOTE: The passed policies can be nil.
func ExtractChannelUpdate(ownerPubKey []byte,
info *channeldb.ChannelEdgeInfo,
policies ...*channeldb.ChannelEdgePolicy) (
*lnwire.ChannelUpdate, error) {
// Helper function to extract the owner of the given policy.
owner := func(edge *channeldb.ChannelEdgePolicy) []byte {
var pubKey *btcec.PublicKey
if edge.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
pubKey, _ = info.NodeKey1()
} else {
pubKey, _ = info.NodeKey2()
}
// If pubKey was not found, just return nil.
if pubKey == nil {
return nil
}
return pubKey.SerializeCompressed()
}
// Extract the channel update from the policy we own, if any.
for _, edge := range policies {
if edge != nil && bytes.Equal(ownerPubKey, owner(edge)) {
return ChannelUpdateFromEdge(info, edge)
}
}
return nil, fmt.Errorf("unable to extract ChannelUpdate for channel %v",
info.ChannelPoint)
}
// ChannelUpdateFromEdge reconstructs a signed ChannelUpdate from the given edge
// info and policy.
func ChannelUpdateFromEdge(info *channeldb.ChannelEdgeInfo,
policy *channeldb.ChannelEdgePolicy) (*lnwire.ChannelUpdate, error) {
update := &lnwire.ChannelUpdate{
ChainHash: info.ChainHash,
ShortChannelID: lnwire.NewShortChanIDFromInt(policy.ChannelID),
Timestamp: uint32(policy.LastUpdate.Unix()),
ChannelFlags: policy.ChannelFlags,
MessageFlags: policy.MessageFlags,
TimeLockDelta: policy.TimeLockDelta,
HtlcMinimumMsat: policy.MinHTLC,
HtlcMaximumMsat: policy.MaxHTLC,
BaseFee: uint32(policy.FeeBaseMSat),
FeeRate: uint32(policy.FeeProportionalMillionths),
ExtraOpaqueData: policy.ExtraOpaqueData,
}
var err error
update.Signature, err = lnwire.NewSigFromRawSignature(policy.SigBytes)
if err != nil {
return nil, err
}
return update, nil
}

@ -0,0 +1,189 @@
package netann_test
import (
"errors"
"testing"
"time"
"github.com/btcsuite/btcd/btcec"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/netann"
"github.com/lightningnetwork/lnd/routing"
)
type mockSigner struct {
err error
}
func (m *mockSigner) SignMessage(pk *btcec.PublicKey,
msg []byte) (*btcec.Signature, error) {
if m.err != nil {
return nil, m.err
}
return nil, nil
}
var _ lnwallet.MessageSigner = (*mockSigner)(nil)
var (
privKey, _ = btcec.NewPrivateKey(btcec.S256())
pubKey = privKey.PubKey()
errFailedToSign = errors.New("unable to sign message")
)
type updateDisableTest struct {
name string
startEnabled bool
disable bool
startTime time.Time
signer lnwallet.MessageSigner
expErr error
}
var updateDisableTests = []updateDisableTest{
{
name: "working signer enabled to disabled",
startEnabled: true,
disable: true,
startTime: time.Now(),
signer: netann.NewNodeSigner(privKey),
},
{
name: "working signer enabled to enabled",
startEnabled: true,
disable: false,
startTime: time.Now(),
signer: netann.NewNodeSigner(privKey),
},
{
name: "working signer disabled to enabled",
startEnabled: false,
disable: false,
startTime: time.Now(),
signer: netann.NewNodeSigner(privKey),
},
{
name: "working signer disabled to disabled",
startEnabled: false,
disable: true,
startTime: time.Now(),
signer: netann.NewNodeSigner(privKey),
},
{
name: "working signer future monotonicity",
startEnabled: true,
disable: true,
startTime: time.Now().Add(time.Hour), // must increment
signer: netann.NewNodeSigner(privKey),
},
{
name: "failing signer",
startTime: time.Now(),
signer: &mockSigner{err: errFailedToSign},
expErr: errFailedToSign,
},
{
name: "invalid sig from signer",
startTime: time.Now(),
signer: &mockSigner{}, // returns a nil signature
expErr: errors.New("cannot decode empty signature"),
},
}
// TestUpdateDisableFlag checks the behavior of UpdateDisableFlag, asserting
// that the proper channel flags are set, the timestamp always increases
// monotonically, and that the correct errors are returned in the event that the
// signer is unable to produce a signature.
func TestUpdateDisableFlag(t *testing.T) {
t.Parallel()
for _, tc := range updateDisableTests {
t.Run(tc.name, func(t *testing.T) {
// Create the initial update, the only fields we are
// concerned with in this test are the timestamp and the
// channel flags.
ogUpdate := &lnwire.ChannelUpdate{
Timestamp: uint32(tc.startTime.Unix()),
}
if !tc.startEnabled {
ogUpdate.ChannelFlags |= lnwire.ChanUpdateDisabled
}
// Create new update to sign using the same fields as
// the original. UpdateDisableFlag will mutate the
// passed channel update, so we keep the old one to test
// against.
newUpdate := &lnwire.ChannelUpdate{
Timestamp: ogUpdate.Timestamp,
ChannelFlags: ogUpdate.ChannelFlags,
}
// Attempt to update and sign the new update, specifying
// disabled or enabled as prescribed in the test case.
err := netann.SignChannelUpdate(
tc.signer, pubKey, newUpdate,
netann.ChannelUpdateSetDisable(tc.disable),
)
var fail bool
switch {
// Both nil, pass.
case tc.expErr == nil && err == nil:
// Both non-nil, compare error strings since some
// methods don't return concrete error types.
case tc.expErr != nil && err != nil:
if err.Error() != tc.expErr.Error() {
fail = true
}
// Otherwise, one is nil and one is non-nil.
default:
fail = true
}
if fail {
t.Fatalf("expected error: %v, got %v",
tc.expErr, err)
}
// Exit early if the test expected a failure.
if tc.expErr != nil {
return
}
// Verify that the timestamp has increased from the
// original update.
if newUpdate.Timestamp <= ogUpdate.Timestamp {
t.Fatalf("update timestamp should be "+
"monotonically increasing, "+
"original: %d, new %d",
ogUpdate.Timestamp, newUpdate.Timestamp)
}
// Verify that the disabled flag is properly set.
disabled := newUpdate.ChannelFlags&
lnwire.ChanUpdateDisabled != 0
if disabled != tc.disable {
t.Fatalf("expected disable:%v, found:%v",
tc.disable, disabled)
}
// Finally, validate the signature using the router's
// verification logic.
err = routing.ValidateChannelUpdateAnn(
pubKey, 0, newUpdate,
)
if err != nil {
t.Fatalf("channel update failed to "+
"validate: %v", err)
}
})
}
}

23
netann/interface.go Normal file

@ -0,0 +1,23 @@
package netann
import (
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/channeldb"
)
// DB abstracts the required database functionality needed by the
// ChanStatusManager.
type DB interface {
// FetchAllOpenChannels returns a slice of all open channels known to
// the daemon. This may include private or pending channels.
FetchAllOpenChannels() ([]*channeldb.OpenChannel, error)
}
// ChannelGraph abstracts the required channel graph queries used by the
// ChanStatusManager.
type ChannelGraph interface {
// FetchChannelEdgesByOutpoint returns the channel edge info and most
// recent channel edge policies for a given outpoint.
FetchChannelEdgesByOutpoint(*wire.OutPoint) (*channeldb.ChannelEdgeInfo,
*channeldb.ChannelEdgePolicy, *channeldb.ChannelEdgePolicy, error)
}

130
peer.go

@ -142,12 +142,21 @@ type peer struct {
// objects to queue messages to be sent out on the wire.
outgoingQueue chan outgoingMsg
// activeChanMtx protects access to the activeChannels and
// addeddChannels maps.
activeChanMtx sync.RWMutex
// activeChannels is a map which stores the state machines of all
// active channels. Channels are indexed into the map by the txid of
// the funding transaction which opened the channel.
activeChanMtx sync.RWMutex
activeChannels map[lnwire.ChannelID]*lnwallet.LightningChannel
// addedChannels tracks any new channels opened during this peer's
// lifecycle. We use this to filter out these new channels when the time
// comes to request a reenable for active channels, since they will have
// waited a shorter duration.
addedChannels map[lnwire.ChannelID]struct{}
// newChannels is used by the fundingManager to send fully opened
// channels to the source peer which handled the funding workflow.
newChannels chan *newChannelMsg
@ -172,6 +181,11 @@ type peer struct {
// well as lnwire.ClosingSigned messages.
chanCloseMsgs chan *closeMsg
// chanActiveTimeout specifies the duration the peer will wait to
// request a channel reenable, beginning from the time the peer was
// started.
chanActiveTimeout time.Duration
server *server
// localFeatures is the set of local features that we advertised to the
@ -212,7 +226,8 @@ var _ lnpeer.Peer = (*peer)(nil)
// pointer to the main server.
func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server,
addr *lnwire.NetAddress, inbound bool,
localFeatures *lnwire.RawFeatureVector) (*peer, error) {
localFeatures *lnwire.RawFeatureVector,
chanActiveTimeout time.Duration) (*peer, error) {
nodePub := addr.IdentityKey
@ -230,6 +245,7 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server,
sendQueue: make(chan outgoingMsg),
outgoingQueue: make(chan outgoingMsg),
addedChannels: make(map[lnwire.ChannelID]struct{}),
activeChannels: make(map[lnwire.ChannelID]*lnwallet.LightningChannel),
newChannels: make(chan *newChannelMsg, 1),
@ -239,6 +255,8 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server,
chanCloseMsgs: make(chan *closeMsg),
failedChannels: make(map[lnwire.ChannelID]struct{}),
chanActiveTimeout: chanActiveTimeout,
writeBuf: server.writeBufferPool.Take(),
queueQuit: make(chan struct{}),
@ -392,7 +410,6 @@ func (p *peer) QuitSignal() <-chan struct{} {
// loadActiveChannels creates indexes within the peer for tracking all active
// channels returned by the database.
func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
var activePublicChans []wire.OutPoint
for _, dbChan := range chans {
lnChan, err := lnwallet.NewLightningChannel(
p.server.cc.signer, p.server.witnessBeacon, dbChan,
@ -499,33 +516,8 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
p.activeChanMtx.Lock()
p.activeChannels[chanID] = lnChan
p.activeChanMtx.Unlock()
// To ensure we can route through this channel now that the peer
// is back online, we'll attempt to send an update to enable it.
// This will only be used for non-pending public channels, as
// they are the only ones capable of routing.
chanIsPublic := dbChan.ChannelFlags&lnwire.FFAnnounceChannel != 0
if chanIsPublic && !dbChan.IsPending {
activePublicChans = append(activePublicChans, *chanPoint)
}
}
// As a final measure we launch a goroutine that will ensure the newly
// loaded public channels are not currently disabled, as that will make
// us skip it during path finding.
go func() {
for _, chanPoint := range activePublicChans {
// Set the channel disabled=false by sending out a new
// ChannelUpdate. If this channel is already active,
// the update won't be sent.
err := p.server.announceChanStatus(chanPoint, false)
if err != nil && err != channeldb.ErrEdgeNotFound {
srvrLog.Errorf("Unable to enable channel %v: %v",
chanPoint, err)
}
}
}()
return nil
}
@ -1580,6 +1572,11 @@ func (p *peer) genDeliveryScript() ([]byte, error) {
func (p *peer) channelManager() {
defer p.wg.Done()
// reenableTimeout will fire once after the configured channel status
// interval has elapsed. This will trigger us to sign new channel
// updates and broadcast them with the "disabled" flag unset.
reenableTimeout := time.After(p.chanActiveTimeout)
out:
for {
select {
@ -1641,6 +1638,7 @@ out:
}
p.activeChannels[chanID] = lnChan
p.addedChannels[chanID] = struct{}{}
p.activeChanMtx.Unlock()
peerLog.Infof("New channel active ChannelPoint(%v) "+
@ -1782,6 +1780,25 @@ out:
// relevant sub-systems and launching a goroutine to
// wait for close tx conf.
p.finalizeChanClosure(chanCloser)
// The channel reannounce delay has elapsed, broadcast the
// reenabled channel updates to the network. This should only
// fire once, so we set the reenableTimeout channel to nil to
// mark it for garbage collection. If the peer is torn down
// before firing, reenabling will not be attempted.
// TODO(conner): consolidate reenables timers inside chan status
// manager
case <-reenableTimeout:
p.reenableActiveChannels()
// Since this channel will never fire again during the
// lifecycle of the peer, we nil the channel to mark it
// eligible for garbage collection, and make this
// explicity ineligible to receive in future calls to
// select. This also shaves a few CPU cycles since the
// select will ignore this case entirely.
reenableTimeout = nil
case <-p.quit:
// As, we've been signalled to exit, we'll reset all
@ -1797,6 +1814,49 @@ out:
}
}
// reenableActiveChannels searches the index of channels maintained with this
// peer, and reenables each public, non-pending channel. This is done at the
// gossip level by broadcasting a new ChannelUpdate with the disabled bit unset.
// No message will be sent if the channel is already enabled.
func (p *peer) reenableActiveChannels() {
// First, filter all known channels with this peer for ones that are
// both public and not pending.
var activePublicChans []wire.OutPoint
p.activeChanMtx.RLock()
for chanID, lnChan := range p.activeChannels {
dbChan := lnChan.State()
isPublic := dbChan.ChannelFlags&lnwire.FFAnnounceChannel != 0
if !isPublic || dbChan.IsPending {
continue
}
// We'll also skip any channels added during this peer's
// lifecycle since they haven't waited out the timeout. Their
// first announcement will be enabled, and the chan status
// manager will begin monitoring them passively since they exist
// in the database.
if _, ok := p.addedChannels[chanID]; ok {
continue
}
activePublicChans = append(
activePublicChans, dbChan.FundingOutpoint,
)
}
p.activeChanMtx.RUnlock()
// For each of the public, non-pending channels, set the channel
// disabled bit to false and send out a new ChannelUpdate. If this
// channel is already active, the update won't be sent.
for _, chanPoint := range activePublicChans {
err := p.server.chanStatusMgr.RequestEnable(chanPoint)
if err != nil {
srvrLog.Errorf("Unable to enable channel %v: %v",
chanPoint, err)
}
}
}
// fetchActiveChanCloser attempts to fetch the active chan closer state machine
// for the target channel ID. If the channel isn't active an error is returned.
// Otherwise, either an existing state machine will be returned, or a new one
@ -1854,11 +1914,8 @@ func (p *peer) fetchActiveChanCloser(chanID lnwire.ChannelID) (*channelCloser, e
channel: channel,
unregisterChannel: p.server.htlcSwitch.RemoveLink,
broadcastTx: p.server.cc.wallet.PublishTransaction,
disableChannel: func(op wire.OutPoint) error {
return p.server.announceChanStatus(op,
true)
},
quit: p.quit,
disableChannel: p.server.chanStatusMgr.RequestDisable,
quit: p.quit,
},
deliveryAddr,
feePerKw,
@ -1917,11 +1974,8 @@ func (p *peer) handleLocalCloseReq(req *htlcswitch.ChanClose) {
channel: channel,
unregisterChannel: p.server.htlcSwitch.RemoveLink,
broadcastTx: p.server.cc.wallet.PublishTransaction,
disableChannel: func(op wire.OutPoint) error {
return p.server.announceChanStatus(op,
true)
},
quit: p.quit,
disableChannel: p.server.chanStatusMgr.RequestDisable,
quit: p.quit,
},
deliveryAddr,
req.TargetFeePerKw,

320
server.go

@ -89,6 +89,8 @@ type server struct {
// that's backed by the identity private key of the running lnd node.
nodeSigner *netann.NodeSigner
chanStatusMgr *netann.ChanStatusManager
// listenAddrs is the list of addresses the server is currently
// listening on.
listenAddrs []net.Addr
@ -179,11 +181,6 @@ type server struct {
// changed since last start.
currentNodeAnn *lnwire.NodeAnnouncement
// sendDisabled is used to keep track of the disabled flag of the last
// sent ChannelUpdate from announceChanStatus.
sentDisabled map[wire.OutPoint]bool
sentDisabledMtx sync.Mutex
quit chan struct{}
wg sync.WaitGroup
@ -300,7 +297,6 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
outboundPeers: make(map[string]*peer),
peerConnectedListeners: make(map[string][]chan<- lnpeer.Peer),
peerDisconnectedListeners: make(map[string][]chan<- struct{}),
sentDisabled: make(map[wire.OutPoint]bool),
globalFeatures: lnwire.NewFeatureVector(globalFeatures,
lnwire.GlobalFeatures),
@ -369,6 +365,24 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
return nil, err
}
chanStatusMgrCfg := &netann.ChanStatusConfig{
ChanStatusSampleInterval: cfg.ChanStatusSampleInterval,
ChanEnableTimeout: cfg.ChanEnableTimeout,
ChanDisableTimeout: cfg.ChanDisableTimeout,
OurPubKey: privKey.PubKey(),
MessageSigner: s.nodeSigner,
IsChannelActive: s.htlcSwitch.HasActiveLink,
ApplyChannelUpdate: s.applyChannelUpdate,
DB: chanDB,
Graph: chanDB.ChannelGraph(),
}
chanStatusMgr, err := netann.NewChanStatusManager(chanStatusMgrCfg)
if err != nil {
return nil, err
}
s.chanStatusMgr = chanStatusMgr
// If enabled, use either UPnP or NAT-PMP to automatically configure
// port forwarding for users behind a NAT.
if cfg.NAT {
@ -739,9 +753,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
return ErrServerShuttingDown
}
},
DisableChannel: func(op wire.OutPoint) error {
return s.announceChanStatus(op, true)
},
DisableChannel: s.chanStatusMgr.RequestDisable,
Sweeper: s.sweeper,
SettleInvoice: s.invoices.SettleInvoice,
NotifyClosedChannel: s.channelNotifier.NotifyClosedChannelEvent,
@ -1030,6 +1042,9 @@ func (s *server) Start() error {
if err := s.invoices.Start(); err != nil {
return err
}
if err := s.chanStatusMgr.Start(); err != nil {
return err
}
// With all the relevant sub-systems started, we'll now attempt to
// establish persistent connections to our direct channel collaborators
@ -1060,11 +1075,6 @@ func (s *server) Start() error {
srvrLog.Infof("Auto peer bootstrapping is disabled")
}
// Start a goroutine that will periodically send out ChannelUpdates
// based on a channel's status.
s.wg.Add(1)
go s.watchChannelStatus()
return nil
}
@ -1085,6 +1095,7 @@ func (s *server) Stop() error {
}
// Shutdown the wallet, funding manager, and the rpc server.
s.chanStatusMgr.Stop()
s.sigPool.Stop()
s.cc.chainNotifier.Stop()
s.chanRouter.Stop()
@ -2414,7 +2425,10 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
// Now that we've established a connection, create a peer, and it to
// the set of currently active peers.
p, err := newPeer(conn, connReq, s, peerAddr, inbound, localFeatures)
p, err := newPeer(
conn, connReq, s, peerAddr, inbound, localFeatures,
cfg.ChanEnableTimeout,
)
if err != nil {
srvrLog.Errorf("unable to create peer %v", err)
return
@ -2995,91 +3009,6 @@ func (s *server) fetchNodeAdvertisedAddr(pub *btcec.PublicKey) (net.Addr, error)
return node.Addresses[0], nil
}
// announceChanStatus disables a channel if disabled=true, otherwise activates
// it. This is done by sending a new channel update across the network with the
// disabled flag set accordingly. The result of disabling the channel is it not
// being able to forward payments.
func (s *server) announceChanStatus(op wire.OutPoint, disabled bool) error {
s.sentDisabledMtx.Lock()
defer s.sentDisabledMtx.Unlock()
// If we have already sent out an update reflecting the current status,
// skip this channel.
alreadyDisabled, ok := s.sentDisabled[op]
if ok && alreadyDisabled == disabled {
return nil
}
// Retrieve the latest update for this channel. We'll use this
// as our starting point to send the new update.
chanUpdate, err := s.fetchLastChanUpdateByOutPoint(op)
if err != nil {
return err
}
if disabled {
// Set the bit responsible for marking a channel as disabled.
chanUpdate.ChannelFlags |= lnwire.ChanUpdateDisabled
} else {
// Clear the bit responsible for marking a channel as disabled.
chanUpdate.ChannelFlags &= ^lnwire.ChanUpdateDisabled
}
// We must now update the message's timestamp and generate a new
// signature.
newTimestamp := uint32(time.Now().Unix())
if newTimestamp <= chanUpdate.Timestamp {
// Timestamp must increase for message to propagate.
newTimestamp = chanUpdate.Timestamp + 1
}
chanUpdate.Timestamp = newTimestamp
chanUpdateMsg, err := chanUpdate.DataToSign()
if err != nil {
return err
}
pubKey := s.identityPriv.PubKey()
sig, err := s.nodeSigner.SignMessage(pubKey, chanUpdateMsg)
if err != nil {
return err
}
chanUpdate.Signature, err = lnwire.NewSigFromSignature(sig)
if err != nil {
return err
}
srvrLog.Debugf("Announcing channel(%v) disabled=%v", op, disabled)
// Once signed, we'll send the new update to all of our peers.
if err := s.applyChannelUpdate(chanUpdate); err != nil {
return err
}
// We'll keep track of the status set in the last update we sent, to
// avoid sending updates if nothing has changed.
s.sentDisabled[op] = disabled
return nil
}
// fetchLastChanUpdateByOutPoint fetches the latest policy for our direction of
// a channel, and crafts a new ChannelUpdate with this policy. Returns an error
// in case our ChannelEdgePolicy is not found in the database.
func (s *server) fetchLastChanUpdateByOutPoint(op wire.OutPoint) (
*lnwire.ChannelUpdate, error) {
// Get the edge info and policies for this channel from the graph.
graph := s.chanDB.ChannelGraph()
info, edge1, edge2, err := graph.FetchChannelEdgesByOutpoint(&op)
if err != nil {
return nil, err
}
pubKey := s.identityPriv.PubKey().SerializeCompressed()
return extractChannelUpdate(pubKey, info, edge1, edge2)
}
// fetchLastChanUpdate returns a function which is able to retrieve our latest
// channel update for a target channel.
func (s *server) fetchLastChanUpdate() func(lnwire.ShortChannelID) (
@ -3091,75 +3020,13 @@ func (s *server) fetchLastChanUpdate() func(lnwire.ShortChannelID) (
if err != nil {
return nil, err
}
return extractChannelUpdate(ourPubKey[:], info, edge1, edge2)
return netann.ExtractChannelUpdate(
ourPubKey[:], info, edge1, edge2,
)
}
}
// extractChannelUpdate attempts to retrieve a lnwire.ChannelUpdate message
// from an edge's info and a set of routing policies.
// NOTE: the passed policies can be nil.
func extractChannelUpdate(ownerPubKey []byte,
info *channeldb.ChannelEdgeInfo,
policies ...*channeldb.ChannelEdgePolicy) (
*lnwire.ChannelUpdate, error) {
// Helper function to extract the owner of the given policy.
owner := func(edge *channeldb.ChannelEdgePolicy) []byte {
var pubKey *btcec.PublicKey
switch {
case edge.ChannelFlags&lnwire.ChanUpdateDirection == 0:
pubKey, _ = info.NodeKey1()
case edge.ChannelFlags&lnwire.ChanUpdateDirection == 1:
pubKey, _ = info.NodeKey2()
}
// If pubKey was not found, just return nil.
if pubKey == nil {
return nil
}
return pubKey.SerializeCompressed()
}
// Extract the channel update from the policy we own, if any.
for _, edge := range policies {
if edge != nil && bytes.Equal(ownerPubKey, owner(edge)) {
return createChannelUpdate(info, edge)
}
}
return nil, fmt.Errorf("unable to extract ChannelUpdate for channel %v",
info.ChannelPoint)
}
// createChannelUpdate reconstructs a signed ChannelUpdate from the given edge
// info and policy.
func createChannelUpdate(info *channeldb.ChannelEdgeInfo,
policy *channeldb.ChannelEdgePolicy) (*lnwire.ChannelUpdate, error) {
update := &lnwire.ChannelUpdate{
ChainHash: info.ChainHash,
ShortChannelID: lnwire.NewShortChanIDFromInt(policy.ChannelID),
Timestamp: uint32(policy.LastUpdate.Unix()),
MessageFlags: policy.MessageFlags,
ChannelFlags: policy.ChannelFlags,
TimeLockDelta: policy.TimeLockDelta,
HtlcMinimumMsat: policy.MinHTLC,
HtlcMaximumMsat: policy.MaxHTLC,
BaseFee: uint32(policy.FeeBaseMSat),
FeeRate: uint32(policy.FeeProportionalMillionths),
ExtraOpaqueData: policy.ExtraOpaqueData,
}
var err error
update.Signature, err = lnwire.NewSigFromRawSignature(policy.SigBytes)
if err != nil {
return nil, err
}
return update, nil
}
// applyChannelUpdate applies the channel update to the different sub-systems of
// the server.
func (s *server) applyChannelUpdate(update *lnwire.ChannelUpdate) error {
@ -3172,124 +3039,3 @@ func (s *server) applyChannelUpdate(update *lnwire.ChannelUpdate) error {
return ErrServerShuttingDown
}
}
// watchChannelStatus periodically queries the Switch for the status of the
// open channels, and sends out ChannelUpdates to the network indicating their
// active status. Currently we'll send out either a Disabled or Active update
// if the channel has been in the same status over a given amount of time.
//
// NOTE: This MUST be run as a goroutine.
func (s *server) watchChannelStatus() {
defer s.wg.Done()
// A map with values activeStatus is used to keep track of the first
// time we saw a link changing to the current active status.
type activeStatus struct {
active bool
time time.Time
}
status := make(map[wire.OutPoint]activeStatus)
// We'll check in on the channel statuses every 1/4 of the timeout.
unchangedTimeout := cfg.InactiveChanTimeout
tickerTimeout := unchangedTimeout / 4
if unchangedTimeout == 0 || tickerTimeout == 0 {
srvrLog.Debugf("Won't watch channel statuses")
return
}
ticker := time.NewTicker(tickerTimeout)
defer ticker.Stop()
for {
select {
case <-ticker.C:
channels, err := s.chanDB.FetchAllOpenChannels()
if err != nil {
srvrLog.Errorf("Unable to fetch open "+
"channels: %v", err)
continue
}
// For each open channel, update the status. We'll copy
// the updated statuses to a new map, to avoid keeping
// the status of closed channels around.
newStatus := make(map[wire.OutPoint]activeStatus)
for _, c := range channels {
// We'll skip any private channels, as they
// aren't used for routing within the network by
// other nodes.
if c.ChannelFlags&lnwire.FFAnnounceChannel == 0 {
continue
}
chanID := lnwire.NewChanIDFromOutPoint(
&c.FundingOutpoint)
// Get the current active stauts from the
// Switch.
active := s.htlcSwitch.HasActiveLink(chanID)
var currentStatus activeStatus
// If this link is not in the map, or the
// status has changed, set an updated active
// status.
st, ok := status[c.FundingOutpoint]
if !ok || st.active != active {
currentStatus = activeStatus{
active: active,
time: time.Now(),
}
} else {
// The status is unchanged, we'll keep
// it as is.
currentStatus = st
}
newStatus[c.FundingOutpoint] = currentStatus
}
// Set the status map to the map of new statuses.
status = newStatus
// If no change in status has happened during the last
// interval, we'll send out an update. Note that we add
// the negative of the timeout to set our limit in the
// past.
limit := time.Now().Add(-unchangedTimeout)
// We'll send out an update for all channels that have
// had their status unchanged for longer than the limit.
// NOTE: We also make sure to activate any channel when
// we connect to a peer, to make them available for
// path finding immediately.
for op, st := range status {
disable := !st.active
if st.time.Before(limit) {
// Before we attempt to announce the
// status of the channel, we remove it
// from the status map such that it
// will need a full unchaged interval
// before we attempt to announce its
// status again.
delete(status, op)
err = s.announceChanStatus(op, disable)
if err != nil &&
err != channeldb.ErrEdgeNotFound {
srvrLog.Errorf("Unable to "+
"disable channel %v: %v",
op, err)
}
}
}
case <-s.quit:
return
}
}
}

@ -9,6 +9,7 @@ import (
"math/rand"
"net"
"os"
"time"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/chaincfg/chainhash"
@ -22,6 +23,7 @@ import (
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/netann"
"github.com/lightningnetwork/lnd/shachain"
"github.com/lightningnetwork/lnd/ticker"
)
@ -367,8 +369,30 @@ func createTestPeer(notifier chainntnfs.ChainNotifier,
if err != nil {
return nil, nil, nil, nil, err
}
if err = htlcSwitch.Start(); err != nil {
return nil, nil, nil, nil, err
}
s.htlcSwitch = htlcSwitch
s.htlcSwitch.Start()
nodeSignerAlice := netann.NewNodeSigner(aliceKeyPriv)
const chanActiveTimeout = time.Minute
chanStatusMgr, err := netann.NewChanStatusManager(&netann.ChanStatusConfig{
ChanStatusSampleInterval: 30 * time.Second,
ChanEnableTimeout: chanActiveTimeout,
ChanDisableTimeout: 2 * time.Minute,
DB: dbAlice,
Graph: dbAlice.ChannelGraph(),
MessageSigner: nodeSignerAlice,
OurPubKey: aliceKeyPub,
IsChannelActive: s.htlcSwitch.HasActiveLink,
ApplyChannelUpdate: func(*lnwire.ChannelUpdate) error { return nil },
})
if err != nil {
return nil, nil, nil, nil, err
}
s.chanStatusMgr = chanStatusMgr
alicePeer := &peer{
addr: &lnwire.NetAddress{
@ -387,6 +411,8 @@ func createTestPeer(notifier chainntnfs.ChainNotifier,
localCloseChanReqs: make(chan *htlcswitch.ChanClose),
chanCloseMsgs: make(chan *closeMsg),
chanActiveTimeout: chanActiveTimeout,
queueQuit: make(chan struct{}),
quit: make(chan struct{}),
}
@ -394,6 +420,7 @@ func createTestPeer(notifier chainntnfs.ChainNotifier,
chanID := lnwire.NewChanIDFromOutPoint(channelAlice.ChannelPoint())
alicePeer.activeChannels[chanID] = channelAlice
alicePeer.wg.Add(1)
go alicePeer.channelManager()
return alicePeer, channelAlice, channelBob, cleanUpFunc, nil